基于 Crossplane Composition 构建声明式云资源交付 API 的 Go 实现


Terraform state 文件的漂移与冲突,几乎是每个运维团队都经历过的噩梦。当多个流水线或工程师同时尝试修改同一套基础设施时,state lock 机制能防止并发写入,但无法从根本上解决声明式目标与真实世界状态的同步难题。更深层次的问题在于,基础设施的生命周期管理与应用的生命周期管理是割裂的。应用在 Kubernetes 中以一种高度声明式、自动化的方式运行,而它依赖的数据库、消息队列、对象存储桶,却需要另一套独立的、通常由平台团队手动触发的 IaC 工具链来维护。这种割裂是运维瓶颈和交付效率低下的根源。

Crossplane 试图从根本上解决这个问题,它的核心理念是将 Kubernetes API 延伸为一个能够管理一切的通用控制平面。不仅仅是 Pods 和 Services,也包括 AWS RDS 实例、GCP Cloud SQL 或 Azure Blob Storage。通过将外部资源抽象为 Kubernetes CRD,我们可以用同样的方式 (kubectl apply) 来管理基础设施。

但直接让所有开发者去编写 Crossplane 的 Claim YAML 文件并不现实。这不仅要求他们理解 Crossplane 的内部机制,也暴露了过多的基础设施细节。一个更成熟的模式是构建一个内部开发者平台(IDP),提供一个简化的 API 接口。开发者通过这个 API 声明他们的需求——比如“我需要一个具备备份和监控功能的 100GB PostgreSQL 数据库”,而无需关心它背后是哪个云厂商的哪款产品。

本文的目标就是实现这个 API 服务的核心。我们将使用 Go 语言和 Echo 框架构建一个服务端点,它接收简化的 JSON 请求,然后动态生成并应用 Crossplane 的资源 Claim,从而触发底层云资源的创建。这套机制是构建自服务基础设施平台的基础。

核心概念:从具体资源到抽象能力

在深入代码之前,必须理解 Crossplane 的两个核心概念:CompositionCompositeResourceDefinition (XRD)。

  1. CompositeResourceDefinition (XRD): 这是我们定义给开发者的“抽象能力” API。它定义了一个新的 CRD,比如 ProductionDatabase,并规定了开发者可以配置的参数,如 storageGBengineVersion。这相当于我们设计了一个面向内部开发者的 API Schema。

  2. Composition: 这是实现 XRD 的具体方式。一个 Composition 会声明,当一个 ProductionDatabase 资源被创建时,它应该被“组合”成哪些具体的云资源。例如,它可以由一个 AWS RDSInstance、一个 DBSubnetGroup 和一个用于存储密码的 Kubernetes Secret 共同组成。Composition 负责将 XRD 中开发者指定的参数(如 storageGB)映射到底层具体资源的相应字段上。

这种分离使得平台团队可以在不影响开发者使用方式的前提下,更换或升级底层的云资源实现。例如,我们可以添加一个新的 Composition,将 ProductionDatabase 从 AWS RDS 无缝切换到 GCP Cloud SQL。

下面是整个流程的架构图:

sequenceDiagram
    participant Developer
    participant IDP API (Echo)
    participant Kubernetes API Server
    participant Crossplane Controller
    participant AWS Provider

    Developer->>+IDP API (Echo): POST /v1/databases (JSON payload)
    IDP API (Echo)->>IDP API (Echo): 1. Validate request
    IDP API (Echo)->>IDP API (Echo): 2. Generate ProductionDatabase Claim (YAML/Go struct)
    IDP API (Echo)->>+Kubernetes API Server: 3. Apply ProductionDatabase Claim
    Kubernetes API Server-->>-IDP API (Echo): Claim created/updated
    IDP API (Echo)-->>-Developer: 202 Accepted
    Kubernetes API Server->>+Crossplane Controller: Notifies of new Claim
    Crossplane Controller->>Crossplane Controller: 4. Select matching Composition
    Crossplane Controller->>+Kubernetes API Server: 5. Create managed resources (RDSInstance, etc.)
    Kubernetes API Server->>+AWS Provider: Notifies of new managed resource
    AWS Provider->>+AWS API: 6. Provision RDS, Subnet Group...
    AWS API-->>-AWS Provider: Resources being created
    AWS Provider-->>-Kubernetes API Server: Update status of managed resources
    Kubernetes API Server-->>-Crossplane Controller: Status updated
    Crossplane Controller-->>-Kubernetes API Server: Update status of Claim

我们的工作重点是实现上图中的 IDP API (Echo) 部分。

平台定义:抽象数据库资源

首先,我们需要在 Kubernetes 集群中定义我们的抽象资源。这需要管理员权限,通常由平台团队完成。假设 Crossplane 和 AWS Provider 已经安装完毕。

1. 定义 CompositeResourceDefinition (XRD)

这个 YAML 文件定义了一个名为 ProductionDatabase 的新资源类型,它属于 db.idp.jetify.com API 组。它暴露了两个可配置的参数:storageGBengineVersion

# x-database.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: productiondatabases.db.idp.jetify.com
spec:
  group: db.idp.jetify.com
  names:
    kind: ProductionDatabase
    plural: productiondatabases
  claimNames:
    kind: DatabaseClaim # For namespaced claims, which our app will create
    plural: databaseclaims
  versions:
  - name: v1alpha1
    served: true
    referenceable: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              parameters:
                type: object
                properties:
                  storageGB:
                    type: integer
                    description: "Storage size in GB for the database."
                  engineVersion:
                    type: string
                    description: "Database engine version, e.g., '13.7'."
                required:
                - storageGB
                - engineVersion
            required:
            - parameters
          status:
            type: object
            properties:
              connectionSecretName:
                type: string
                description: "Name of the Kubernetes secret holding connection details."

应用它: kubectl apply -f x-database.yaml

2. 定义 Composition

现在我们来定义如何实现这个 ProductionDatabase。在这个例子中,我们使用 AWS RDS。Composition 的关键在于 patches,它将 ProductionDatabase 的参数“贴”到具体的 AWS 资源上。

# composition-aws-postgres.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: aws.postgres.productiondatabases.db.idp.jetify.com
  labels:
    provider: aws
    db: postgres
spec:
  compositeTypeRef:
    apiVersion: db.idp.jetify.com/v1alpha1
    kind: ProductionDatabase
  writeConnectionSecretsToNamespace: default # For simplicity, secrets will be created in 'default' ns
  resources:
    - name: dbsubnetgroup
      base:
        apiVersion: rds.aws.crossplane.io/v1alpha1
        kind: DBSubnetGroup
        spec:
          forProvider:
            region: us-east-1
            description: "Subnet group for our production database."
            subnetIds:
              - subnet-0123456789abcdef0
              - subnet-fedcba9876543210f
    - name: rdsinstance
      base:
        apiVersion: rds.aws.crossplane.io/v1beta1
        kind: RDSInstance
        spec:
          forProvider:
            region: us-east-1
            dbSubnetGroupNameSelector:
              matchControllerRef: true # Automatically link to the subnet group created above
            instanceClass: db.t3.small
            masterUsername: masteruser
            engine: postgres
            skipFinalSnapshot: true
            publiclyAccessible: false
          writeConnectionSecretToRef:
            namespace: default # The secret will be created here.
      patches:
        - fromFieldPath: "spec.parameters.storageGB"
          toFieldPath: "spec.forProvider.allocatedStorage"
        - fromFieldPath: "spec.parameters.engineVersion"
          toFieldPath: "spec.forProvider.engineVersion"
        - fromFieldPath: "metadata.uid"
          toFieldPath: "spec.writeConnectionSecretToRef.name"
          transforms:
            - type: string
              string:
                fmt: "%s-db-conn" # Create a unique secret name based on the claim's UID
      connectionDetails:
        - fromConnectionSecretKey: username
        - fromConnectionSecretKey: password
        - fromConnectionSecretKey: endpoint
        - fromConnectionSecretKey: port

应用它: kubectl apply -f composition-aws-postgres.yaml

至此,平台侧的准备工作完成。任何人在集群的 default 命名空间创建一个 DatabaseClaim 资源,Crossplane 就会自动根据这个 Composition 去 AWS 创建一个 RDS 实例和一个子网组。

API 服务实现:使用 Go 和 Echo

现在进入我们的核心任务:构建一个 Go 服务来代替开发者手动编写 YAML。

项目结构

/idp-api
|-- go.mod
|-- go.sum
|-- main.go
|-- api/
|   |-- handler.go       # Echo request handlers
|   |-- types.go         # API request/response types
|-- k8s/
|   |-- client.go        # Kubernetes client initialization
|   |-- claim.go         # Logic for creating the DatabaseClaim resource
|   |-- types.go         # Go structs for our CRD (DatabaseClaim)

1. 定义 CRD 的 Go 类型

为了以编程方式创建 DatabaseClaim,我们需要为其定义 Go 结构体。这些结构体必须与我们之前在 XRD 中定义的 Schema 完全匹配。一个常见的错误是这里的字段名或类型与 YAML 定义不匹配,导致序列化失败。

// k8s/types.go
package k8s

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DatabaseParameters matches the 'parameters' block in our XRD.
type DatabaseParameters struct {
	StorageGB     int    `json:"storageGB"`
	EngineVersion string `json:"engineVersion"`
}

// DatabaseClaimSpec defines the desired state of DatabaseClaim.
type DatabaseClaimSpec struct {
	Parameters DatabaseParameters `json:"parameters"`

	// CompositionSelector is used to select the Composition.
	// We will set this based on API input later if needed.
	CompositionSelector *metav1.LabelSelector `json:"compositionSelector,omitempty"`
	
	// A reference to the abstract resource definition.
	CompositeResourceClaimSpec `json:",inline"`
}

// CompositeResourceClaimSpec contains the common fields for a resource claim.
// It's a simplified version for our purpose.
type CompositeResourceClaimSpec struct {
	ResourceRef *corev1.ObjectReference `json:"resourceRef,omitempty"`
}


// DatabaseClaimStatus defines the observed state of DatabaseClaim.
type DatabaseClaimStatus struct {
	// Add status fields if your API needs to report them.
	// For example, connection secret name.
	ConnectionSecretName string `json:"connectionSecretName,omitempty"`
}

// DatabaseClaim is the Schema for the databaseclaims API
type DatabaseClaim struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   DatabaseClaimSpec   `json:"spec,omitempty"`
	Status DatabaseClaimStatus `json:"status,omitempty"`
}

注意:在真实项目中,使用 kubebuildercode-generator 工具可以自动生成这些类型定义和客户端代码,这里为了清晰手动编写。

2. Kubernetes 客户端

我们将使用 client-go 与 Kubernetes API Server 交互。由于我们操作的是 CRD,DynamicClient 是一个非常灵活的选择,它允许我们与任何资源类型交互,而无需为其生成静态的 typed client。

// k8s/client.go
package k8s

import (
	"log"
	"os"
	"path/filepath"

	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

func NewDynamicClient() (dynamic.Interface, error) {
	kubeconfigPath := os.Getenv("KUBECONFIG")
	if kubeconfigPath == "" {
		home := homedir.HomeDir()
		if home != "" {
			kubeconfigPath = filepath.Join(home, ".kube", "config")
		}
	}

	config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
	if err != nil {
		log.Printf("Error building kubeconfig: %v", err)
		return nil, err
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		log.Printf("Error creating dynamic client: %v", err)
		return nil, err
	}

	return dynamicClient, nil
}

这段代码实现了标准的 client-go 初始化逻辑,它会尝试从环境变量或默认的 ~/.kube/config 文件加载配置。在生产环境中,服务应该使用 rest.InClusterConfig() 来通过 ServiceAccount 与 API Server 通信。

3. 核心业务逻辑:创建 Claim

这是将 API 请求转化为 Kubernetes 资源的核心部分。

// k8s/claim.go
package k8s

import (
	"context"
	"fmt"
	"log"
	"time"

	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// GVR (Group, Version, Resource) for our DatabaseClaim
var dbClaimGVR = schema.GroupVersionResource{
	Group:    "db.idp.jetify.com",
	Version:  "v1alpha1",
	Resource: "databaseclaims",
}

type CreateDatabaseClaimArgs struct {
	Name          string
	Namespace     string
	StorageGB     int
	EngineVersion string
}

func CreateDatabaseClaim(client dynamic.Interface, args CreateDatabaseClaimArgs) error {
	dbClaim := &DatabaseClaim{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "db.idp.jetify.com/v1alpha1",
			Kind:       "DatabaseClaim",
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:      args.Name,
			Namespace: args.Namespace,
			// It's a good practice to add labels for selection
			Labels: map[string]string{
				"provider": "aws",
				"engine":   "postgres",
			},
		},
		Spec: DatabaseClaimSpec{
			Parameters: DatabaseParameters{
				StorageGB:     args.StorageGB,
				EngineVersion: args.EngineVersion,
			},
			// Here we explicitly select the composition we want.
			// This gives us more control from the API side.
			CompositionSelector: &metav1.LabelSelector{
				MatchLabels: map[string]string{
					"provider": "aws",
					"db":       "postgres",
				},
			},
		},
	}
	
	// We need to convert our typed struct to an Unstructured object for the dynamic client.
	unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(dbClaim)
	if err != nil {
		return fmt.Errorf("failed to convert DatabaseClaim to unstructured: %w", err)
	}

	obj := &unstructured.Unstructured{
		Object: unstructuredObj,
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	log.Printf("Creating DatabaseClaim '%s' in namespace '%s'...", args.Name, args.Namespace)
	_, err = client.Resource(dbClaimGVR).Namespace(args.Namespace).Create(ctx, obj, metav1.CreateOptions{})
	if err != nil {
		log.Printf("Error creating DatabaseClaim: %v", err)
		return err
	}
	
	log.Printf("Successfully submitted DatabaseClaim '%s'. Crossplane will now provision the resources.", args.Name)
	return nil
}

这里的关键点:

  • unstructured.Unstructured: DynamicClient 操作的是 unstructured.Unstructured 类型,它本质上是一个 map[string]interface{},可以表示任何 Kubernetes 资源。我们使用 runtime.DefaultUnstructuredConverter 将我们强类型的 DatabaseClaim 结构体转换为这种通用格式。
  • Asynchronous Operation: 这个函数在向 Kubernetes API Server 提交创建请求后就立刻返回了。它不等待 RDS 实例创建完成,因为那可能需要几分钟。这是正确的行为模式,API 应只负责提交“意图”。

4. Echo API 层

最后,我们将上述逻辑包装成一个 HTTP 服务。

// api/types.go
package api

// CreateDatabaseRequest represents the JSON payload for our API.
// Note how it's simpler than the full Kubernetes object.
type CreateDatabaseRequest struct {
	Name          string `json:"name" validate:"required,alphanum,min=3,max=30"`
	StorageGB     int    `json:"storageGB" validate:"required,min=20,max=1000"`
	EngineVersion string `json:"engineVersion" validate:"required"`
}

type APIResponse struct {
	Message string `json:"message"`
	RequestID string `json:"requestId"`
}
// api/handler.go
package api

import (
	"idp-api/k8s"
	"log"
	"net/http"

	"github.com/go-playground/validator/v10"
	"github.com/labstack/echo/v4"
	"github.comcom/google/uuid"
	"k8s.io/client-go/dynamic"
)

type CustomValidator struct {
	validator *validator.Validate
}

func (cv *CustomValidator) Validate(i interface{}) error {
	return cv.validator.Struct(i)
}

type DatabaseHandler struct {
	K8sClient dynamic.Interface
}

func (h *DatabaseHandler) CreateDatabase(c echo.Context) error {
	reqID := uuid.New().String()
	log.Printf("[ReqID: %s] Received request to create database", reqID)

	var req CreateDatabaseRequest
	if err := c.Bind(&req); err != nil {
		log.Printf("[ReqID: %s] Error binding request: %v", reqID, err)
		return c.JSON(http.StatusBadRequest, APIResponse{Message: "Invalid request body", RequestID: reqID})
	}

	if err := c.Validate(&req); err != nil {
		log.Printf("[ReqID: %s] Error validating request: %v", reqID, err)
		return c.JSON(http.StatusBadRequest, APIResponse{Message: err.Error(), RequestID: reqID})
	}
	
	// In a real application, namespace would likely come from user context or team mapping.
	const namespace = "default"

	args := k8s.CreateDatabaseClaimArgs{
		Name:          req.Name,
		Namespace:     namespace,
		StorageGB:     req.StorageGB,
		EngineVersion: req.EngineVersion,
	}

	if err := k8s.CreateDatabaseClaim(h.K8sClient, args); err != nil {
		// Here, we need to check if the error is because the resource already exists.
		// For simplicity, we just return a generic server error.
		log.Printf("[ReqID: %s] Failed to create Kubernetes resource: %v", reqID, err)
		return c.JSON(http.StatusInternalServerError, APIResponse{Message: "Failed to schedule resource creation", RequestID: reqID})
	}

	log.Printf("[ReqID: %s] Successfully processed request for database '%s'", reqID, req.Name)
	return c.JSON(http.StatusAccepted, APIResponse{
		Message: "Database provisioning has been initiated. It may take several minutes to become available.",
		RequestID: reqID,
	})
}

func RegisterRoutes(e *echo.Echo, k8sClient dynamic.Interface) {
	e.Validator = &CustomValidator{validator: validator.New()}
	h := &DatabaseHandler{K8sClient: k8sClient}
	
	v1 := e.Group("/v1")
	v1.POST("/databases", h.CreateDatabase)
}
// main.go
package main

import (
	"idp-api/api"
	"idp-api/k8s"
	"log"

	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

func main() {
	k8sClient, err := k8s.NewDynamicClient()
	if err != nil {
		log.Fatalf("Failed to initialize Kubernetes client: %v", err)
	}

	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	api.RegisterRoutes(e, k8sClient)
	
	log.Println("Starting IDP API server on :8080")
	if err := e.Start(":8080"); err != nil {
		log.Fatalf("Server failed to start: %v", err)
	}
}

测试与验证

启动 API 服务: go run main.go

然后,使用 curl 发送一个请求:

curl -X POST http://localhost:8080/v1/databases \
-H "Content-Type: application/json" \
-d '{
    "name": "my-app-db-01",
    "storageGB": 20,
    "engineVersion": "13.7"
}'

如果一切顺利,你会收到一个 202 Accepted 响应。此时,在 Kubernetes 集群中,你可以看到我们创建的 DatabaseClaim

kubectl get databaseclaim -n default my-app-db-01 -o yaml

你会看到它的 status 字段中 READY 条件为 False,并有消息提示正在等待底层资源就绪。几分钟后,当 AWS RDS 实例创建完成,这个状态会变为 True。同时,一个名为 my-app-db-01-db-conn 的 Secret 会出现在 default 命名空间中,包含数据库的连接信息。

方案局限性与未来展望

这套方案的强大之处在于将基础设施的管理范式与应用统一到了 Kubernetes 控制平面,并通过一个简单的 API 屏蔽了底层复杂性。然而,它并非没有边界。

首先,该架构强依赖于 Kubernetes。对于没有或不打算深度使用 Kubernetes 的团队,引入 Crossplane 和一整套 CRD 体系的成本是高昂的。

其次,调试Compositionpatchtransform 逻辑可能非常复杂。当资源无法创建时,需要深入 Crossplane 和 Provider 的日志进行排查,对平台团队的技能要求较高。

当前的 API 实现是“一次性触发”,它不提供查询资源状态或删除资源的功能。一个完整的 IDP 需要实现:

  1. 状态查询端点: GET /v1/databases/{name},该端点会去查询对应 DatabaseClaimstatus,并返回给用户一个友好的状态信息。
  2. 删除端点: DELETE /v1/databases/{name},该端点会删除 Kubernetes 中的 DatabaseClaim 资源,Crossplane 会随之执行垃圾回收,删除对应的 AWS 资源。
  3. 与 GitOps 集成: 一个更稳健的做法是,API 服务不直接调用 client-go,而是生成 YAML 清单并提交到一个 Git 仓库。由 ArgoCD 或 Flux 这样的 GitOps 工具负责将 DatabaseClaim 同步到集群中。这提供了完整的审计追踪和变更控制能力。

  目录