基于 Go 实现 Trino Operator 并通过 Flux CD 进行声明式管理


在 Kubernetes 环境中手动管理 Trino 集群是一项繁琐且易错的任务。协调器(Coordinator)和工作节点(Worker)的部署、配置文件的同步、尤其是数据源目录(Catalog)的动态增删,都需要大量的 kubectl apply 操作和对 YAML 文件的细致维护。当集群规模扩大或环境增多时,这种手动运维模式的成本会急剧上升,配置漂移的风险也随之而来。我们的目标是消除这种手动干预,实现一个完全声明式的、由 Git 驱动的 Trino 集群生命周期管理模型。

最初的构想是利用 Helm Chart 来打包 Trino 的部署清单,但这只能解决初次部署的问题。对于后续的配置变更,尤其是频繁的 Catalog 更新,单纯的 Helm 升级流程依然笨重。真正的解决方案需要一个能够理解 Trino 拓扑结构并能自动化响应状态变化的控制器。这自然地指向了 Kubernetes Operator 模式。

我们将使用 Go 和 Kubebuilder 框架来构建一个 Trino Operator。这个 Operator 的核心职责是监听一个自定义资源(Custom Resource, CR)TrinoCluster 的变化,并根据其 spec 中定义的期望状态,在 Kubernetes 中创建、配置或调整 Trino 集群所需的各种原生资源,如 StatefulSetServiceConfigMap 等。最后,我们将使用 Flux CD 来管理这个 Operator 本身及其定义的 TrinoCluster 资源,将整个管理流程闭环在 GitOps 工作流中。

定义 TrinoCluster API

Operator 的核心是其自定义资源定义(CRD)。这是我们与用户交互的接口,必须设计得既灵活又直观。我们定义一个名为 TrinoCluster 的 CRD,它将封装一个完整 Trino 集群的配置。

在 Go 项目中,这对应于 api/v1/trinocluster_types.go 文件。

// api/v1/trinocluster_types.go

package v1

import (
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TrinoClusterSpec defines the desired state of TrinoCluster
type TrinoClusterSpec struct {
	// Image specifies the Trino container image.
	// +kubebuilder:validation:Required
	Image string `json:"image"`

	// Coordinator defines the configuration for the Trino coordinator.
	// +kubebuilder:validation:Required
	Coordinator CoordinatorSpec `json:"coordinator"`

	// Worker defines the configuration for the Trino workers.
	// +kubebuilder:validation:Required
	Worker WorkerSpec `json:"worker"`

	// Catalogs contains a map of catalog configurations.
	// The key is the catalog name, and the value is the content of the properties file.
	// +kubebuilder:validation:Optional
	Catalogs map[string]string `json:"catalogs,omitempty"`
}

// CoordinatorSpec defines the Trino coordinator specific configuration.
type CoordinatorSpec struct {
	// Resources specifies the compute resources for the coordinator pod.
	// +kubebuilder:validation:Required
	Resources corev1.ResourceRequirements `json:"resources"`

	// Config represents key-value pairs for the coordinator's config.properties.
	// +kubebuilder:validation:Required
	Config map[string]string `json:"config"`
}

// WorkerSpec defines the Trino worker specific configuration.
type WorkerSpec struct {
	// Replicas is the number of worker pods.
	// +kubebuilder:validation:Required
	// +kubebuilder:validation:Minimum=1
	Replicas int32 `json:"replicas"`

	// Resources specifies the compute resources for the worker pods.
	// +kubebuilder:validation:Required
	Resources corev1.ResourceRequirements `json:"resources"`

	// Config represents key-value pairs for the worker's config.properties.
	// +kubebuilder:validation:Required
	Config map[string]string `json:"config"`
}


// TrinoClusterStatus defines the observed state of TrinoCluster
type TrinoClusterStatus struct {
	// Conditions represent the latest available observations of the TrinoCluster's state.
	Conditions []metav1.Condition `json:"conditions,omitempty"`

	// ReadyWorkers is the number of worker pods that are ready.
	ReadyWorkers int32 `json:"readyWorkers"`

	// CoordinatorURL is the address of the coordinator service.
	CoordinatorURL string `json:"coordinatorURL"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
//+kubebuilder:printcolumn:name="Workers",type="integer",JSONPath=".spec.worker.replicas"
//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyWorkers"
//+kubebuilder:printcolumn:name="Coordinator URL",type="string",JSONPath=".status.coordinatorURL"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// TrinoCluster is the Schema for the trinoclusters API
type TrinoCluster struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   TrinoClusterSpec   `json:"spec,omitempty"`
	Status TrinoClusterStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// TrinoClusterList contains a list of TrinoCluster
type TrinoClusterList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []TrinoCluster `json:"items"`
}

func init() {
	SchemeBuilder.Register(&TrinoCluster{}, &TrinoClusterList{})
}

这个 Spec 结构是 Operator 的核心驱动力。它暴露了 Trino 镜像、协调器和工作节点的资源配置、config.properties 的内容,以及一个关键字段 CatalogsCatalogs 是一个 map[string]string,允许用户直接在 CR YAML 中定义 catalog 文件的内容,例如 jmx.propertiestpch.properties。Operator 的职责就是将这些定义转化为挂载到 Trino Pods 中的 ConfigMap

实现调谐循环 (Reconciliation Loop)

调谐逻辑位于 controllers/trinocluster_controller.goReconcile 方法中。这是 Operator 的大脑,它会不断地将集群的实际状态驱动到 TrinoCluster CR 中定义的期望状态。

graph TD
    A(Start Reconciliation) --> B{Fetch TrinoCluster CR};
    B --> C{CR Found?};
    C -- No --> D(End, CR deleted);
    C -- Yes --> E{Manage Catalogs ConfigMap};
    E --> F{Reconcile Coordinator};
    F --> G{Reconcile Worker};
    G --> H{Update Status};
    H --> I(End);

下面是 Reconcile 方法的核心实现,包含了详细的注释和生产级的错误处理。

// controllers/trinocluster_controller.go

package controllers

import (
	// ... imports
	"context"
	"fmt"
	"strings"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/log"

	datav1 "github.com/your-org/trino-operator/api/v1"
)

// TrinoClusterReconciler reconciles a TrinoCluster object
type TrinoClusterReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=data.my.domain,resources=trinoclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=data.my.domain,resources=trinoclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=data.my.domain,resources=trinoclusters/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps;services,verbs=get;list;watch;create;update;patch;delete

func (r *TrinoClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	// 1. Fetch the TrinoCluster instance
	trinoCluster := &datav1.TrinoCluster{}
	err := r.Get(ctx, req.NamespacedName, trinoCluster)
	if err != nil {
		if errors.IsNotFound(err) {
			// CR has been deleted, no need to requeue.
			logger.Info("TrinoCluster resource not found. Ignoring since object must be deleted.")
			return ctrl.Result{}, nil
		}
		// Error reading the object - requeue the request.
		logger.Error(err, "Failed to get TrinoCluster")
		return ctrl.Result{}, err
	}

	// 2. Reconcile Catalogs ConfigMap
	// This ConfigMap will hold all catalog property files.
	// One ConfigMap per TrinoCluster instance.
	cmName := fmt.Sprintf("%s-catalogs", trinoCluster.Name)
	catalogsCM := &corev1.ConfigMap{}
	err = r.Get(ctx, types.NamespacedName{Name: cmName, Namespace: trinoCluster.Namespace}, catalogsCM)
	if err != nil && errors.IsNotFound(err) {
		// Create the ConfigMap if it does not exist
		cm := r.configMapForCatalogs(trinoCluster)
		logger.Info("Creating a new Catalog ConfigMap", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
		if err = r.Create(ctx, cm); err != nil {
			logger.Error(err, "Failed to create new Catalog ConfigMap")
			return ctrl.Result{}, err
		}
	} else if err == nil {
		// Update existing ConfigMap if data has changed.
		// A simple check of data length can be a first-pass optimization.
		if len(catalogsCM.Data) != len(trinoCluster.Spec.Catalogs) {
			catalogsCM.Data = trinoCluster.Spec.Catalogs
			logger.Info("Updating Catalog ConfigMap", "ConfigMap.Namespace", catalogsCM.Namespace, "ConfigMap.Name", catalogsCM.Name)
			if err = r.Update(ctx, catalogsCM); err != nil {
				logger.Error(err, "Failed to update Catalog ConfigMap")
				return ctrl.Result{}, err
			}
		}
	} else {
		logger.Error(err, "Failed to get Catalog ConfigMap")
		return ctrl.Result{}, err
	}


	// 3. Reconcile Coordinator StatefulSet and Service
	coordSvc, err := r.reconcileCoordinatorService(ctx, trinoCluster)
	if err != nil {
		return ctrl.Result{}, err
	}

	_, err = r.reconcileCoordinatorStatefulSet(ctx, trinoCluster, coordSvc.Name)
	if err != nil {
		return ctrl.Result{}, err
	}

	// 4. Reconcile Worker StatefulSet
	workerSts, err := r.reconcileWorkerStatefulSet(ctx, trinoCluster, coordSvc.Name)
	if err != nil {
		return ctrl.Result{}, err
	}

	// 5. Update Status subresource
	trinoCluster.Status.ReadyWorkers = workerSts.Status.ReadyReplicas
	trinoCluster.Status.CoordinatorURL = fmt.Sprintf("http://%s:%d", coordSvc.Name, 8080)
	// Here you would implement more sophisticated status conditions.
	// For now, we keep it simple.
	err = r.Status().Update(ctx, trinoCluster)
	if err != nil {
		logger.Error(err, "Failed to update TrinoCluster status")
		return ctrl.Result{}, err
	}

	// Requeue after a minute to periodically check status.
	// In a real-world scenario, you would only requeue if a state is not yet stable.
	return ctrl.Result{RequeueAfter: time.Minute}, nil
}

// ... helper functions for creating StatefulSet, Service, ConfigMap ...

func (r *TrinoClusterReconciler) configMapForCatalogs(tc *datav1.TrinoCluster) *corev1.ConfigMap {
	cm := &corev1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%s-catalogs", tc.Name),
			Namespace: tc.Namespace,
		},
		Data: tc.Spec.Catalogs,
	}
	// Set TrinoCluster instance as the owner and controller
	ctrl.SetControllerReference(tc, cm, r.Scheme)
	return cm
}

func (r *TrinoClusterReconciler) reconcileCoordinatorStatefulSet(ctx context.Context, tc *datav1.TrinoCluster, serviceName string) (*appsv1.StatefulSet, error) {
	logger := log.FromContext(ctx)
	stsName := fmt.Sprintf("%s-coordinator", tc.Name)
	sts := &appsv1.StatefulSet{}
	err := r.Get(ctx, types.NamespacedName{Name: stsName, Namespace: tc.Namespace}, sts)
	
	desiredSts := r.statefulSetForCoordinator(tc, serviceName)

	if err != nil && errors.IsNotFound(err) {
		logger.Info("Creating new Coordinator StatefulSet", "StatefulSet.Namespace", desiredSts.Namespace, "StatefulSet.Name", desiredSts.Name)
		err = r.Create(ctx, desiredSts)
		if err != nil {
			logger.Error(err, "Failed to create Coordinator StatefulSet")
			return nil, err
		}
		return desiredSts, nil
	} else if err != nil {
		logger.Error(err, "Failed to get Coordinator StatefulSet")
		return nil, err
	}

	// In a real operator, you'd perform a deep comparison here and update if necessary.
	// For this example, we assume no updates for brevity.
	// controllerutil.CreateOrUpdate can be used for more robust logic.

	return sts, nil
}


func (r *TrinoClusterReconciler) statefulSetForCoordinator(tc *datav1.TrinoCluster, serviceName string) *appsv1.StatefulSet {
	// A helper function to build the StatefulSet object.
	// It's crucial to mount the catalogs ConfigMap into the pod.
	labels := map[string]string{"app": "trino", "component": "coordinator", "cluster": tc.Name}
	var replicas int32 = 1

	// Convert config map to command-line args for config.properties
	configProps := ""
	for k, v := range tc.Spec.Coordinator.Config {
		configProps += fmt.Sprintf("%s=%s\n", k, v)
	}

	sts := &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%s-coordinator", tc.Name),
			Namespace: tc.Namespace,
			Labels:    labels,
		},
		Spec: appsv1.StatefulSetSpec{
			Replicas:    &replicas,
			ServiceName: serviceName,
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{{
						Name:  "trino",
						Image: tc.Spec.Image,
						Ports: []corev1.ContainerPort{{
							ContainerPort: 8080,
							Name:          "http",
						}},
						Resources: tc.Spec.Coordinator.Resources,
						VolumeMounts: []corev1.VolumeMount{
							{
								Name:      "config",
								MountPath: "/etc/trino",
							},
							{
								Name:      "catalogs",
								MountPath: "/etc/trino/catalog",
							},
						},
						// Readiness probe is essential for production
						ReadinessProbe: &corev1.Probe{
							ProbeHandler: corev1.ProbeHandler{
								HTTPGet: &corev1.HTTPGetAction{
									Path: "/v1/info/state",
									Port: intstr.FromString("http"),
								},
							},
							InitialDelaySeconds: 15,
							PeriodSeconds:       10,
						},
					}},
					Volumes: []corev1.Volume{
						{
							Name: "config",
							VolumeSource: corev1.VolumeSource{
								ConfigMap: &corev1.ConfigMapVolumeSource{
									LocalObjectReference: corev1.LocalObjectReference{
										Name: fmt.Sprintf("%s-coordinator-config", tc.Name), // Assumes another ConfigMap for node.properties, etc.
									},
								},
							},
						},
						{
							Name: "catalogs",
							VolumeSource: corev1.VolumeSource{
								ConfigMap: &corev1.ConfigMapVolumeSource{
									LocalObjectReference: corev1.LocalObjectReference{
										Name: fmt.Sprintf("%s-catalogs", tc.Name),
									},
								},
							},
						},
					},
				},
			},
		},
	}
	// Set controller reference for garbage collection
	ctrl.SetControllerReference(tc, sts, r.Scheme)
	return sts
}
// Similar functions `reconcileWorkerStatefulSet` and `reconcileCoordinatorService` would be implemented.
// They follow the same pattern: define the desired state, get the current state, and create or update if they differ.
// ...

// SetupWithManager sets up the controller with the Manager.
func (r *TrinoClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&datav1.TrinoCluster{}).
		Owns(&appsv1.StatefulSet{}).
		Owns(&corev1.ConfigMap{}).
		Owns(&corev1.Service{}).
		Complete(r)
}

关键点在于:

  1. 所有权 (Ownership): 使用 ctrl.SetControllerReferenceTrinoCluster CR 设置为所有子资源(StatefulSet, ConfigMap, Service)的所有者。这样,当 TrinoCluster CR 被删除时,Kubernetes 的垃圾回收机制会自动清理所有关联的资源。
  2. 幂等性 (Idempotency): 调谐逻辑必须是幂等的。无论执行多少次,对于相同的输入(TrinoCluster Spec),都应该产生相同的输出(Kubernetes 资源状态)。CreateOrUpdate 模式是实现幂等性的常用方法。
  3. Catalog 管理: 这是此 Operator 的核心价值之一。reconcile 逻辑首先确保一个名为 [cluster-name]-catalogsConfigMap 存在,并且其 data 字段与 TrinoCluster.Spec.Catalogs 完全匹配。然后,statefulSetForCoordinatorstatefulSetForWorker 函数将这个 ConfigMap 作为一个卷(Volume)挂载到每个 Pod 的 /etc/trino/catalog 目录下。当用户在 CR 中添加或修改一个 catalog,Operator 会更新 ConfigMap,Kubernetes 会自动将变更同步到 Pod 的文件系统中,Trino 会动态加载新的 catalog,无需重启整个集群。

与 Flux CD 集成:GitOps 工作流

现在我们有了一个功能完备的 Operator,下一步是将其纳入 GitOps 流程。Flux CD 会监控一个 Git 仓库,并将仓库中的 Kubernetes 清单同步到集群中。

我们的 Git 仓库结构如下:

trino-platform-repo/
├── clusters/
│   └── production/          # Target cluster
│       ├── flux-system/     # Flux CD components
│       │   ├── gotk-components.yaml
│       │   ├── gotk-sync.yaml
│       │   └── kustomization.yaml
│       └── trino-system/    # Our Trino workloads
│           ├── operator/
│           │   ├── kustomization.yaml
│           │   └── release.yaml # Flux HelmRelease for the operator
│           └── cluster/
│               ├── kustomization.yaml
│               └── production-trino-cluster.yaml # Our TrinoCluster CR instance
└── infrastructure/
    └── trino-operator/      # Source for the Operator deployment (e.g., its Helm chart)
        ├── Chart.yaml
        ├── values.yaml
        └── templates/
            └── ... (CRD, Deployment, RBAC manifests)
  1. 部署 Operator: 我们将 Trino Operator 打包成一个 Helm Chart,存放在 infrastructure/trino-operator。在 clusters/production/trino-system/operator/release.yaml 中,我们定义一个 Flux HelmRelease 资源来部署它。

    # clusters/production/trino-system/operator/release.yaml
    apiVersion: helm.toolkit.fluxcd.io/v2beta1
    kind: HelmRelease
    metadata:
      name: trino-operator
      namespace: trino-system
    spec:
      interval: 5m
      chart:
        spec:
          chart: ./infrastructure/trino-operator
          sourceRef:
            kind: GitRepository
            name: flux-system
            namespace: flux-system
      # values can be specified here if needed

    Flux 会检测到这个 HelmRelease,并从 Git 仓库的 infrastructure/trino-operator 路径渲染并应用 Helm Chart,从而在集群中安装我们的 Operator。

  2. 声明 Trino 集群: 这是最终用户体验的核心。要创建一个 Trino 集群,工程师只需在 clusters/production/trino-system/cluster/ 目录下创建一个 YAML 文件。

    # clusters/production/trino-system/cluster/production-trino-cluster.yaml
    apiVersion: data.my.domain/v1
    kind: TrinoCluster
    metadata:
      name: production-data-warehouse
      namespace: trino-system
    spec:
      image: trinodb/trino:426
      coordinator:
        resources:
          requests:
            cpu: "2"
            memory: "8Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
        config:
          "coordinator": "true"
          "node-scheduler.include-coordinator": "false"
          "http-server.http.port": "8080"
          "discovery-server.enabled": "true"
          "discovery.uri": "http://production-data-warehouse-coordinator:8080"
      worker:
        replicas: 5
        resources:
          requests:
            cpu: "4"
            memory: "16Gi"
          limits:
            cpu: "4"
            memory: "16Gi"
        config:
          "coordinator": "false"
          "http-server.http.port": "8080"
      catalogs:
        # Catalogs are defined directly here.
        # Adding a new catalog is just adding a new key-value pair.
        jmx.properties: |
          connector.name=jmx
        tpch.properties: |
          connector.name=tpch
          tpch.splits-per-node=4

当这个文件被推送到 Git 仓库的主分支后,Flux 的 Kustomization 控制器会发现这个新文件,并将其应用到集群中。我们的 Trino Operator 随后会检测到这个新的 TrinoCluster 资源,并开始它的调谐循环,自动创建出包含一个协调器、五个工作节点,以及 JMX 和 TPCH 两个 catalog 的完整 Trino 集群。

如果需要添加一个新的数据源,比如 PostgreSQL,工程师只需修改 production-trino-cluster.yaml,在 catalogs 字段下增加一项:

# ... previous content ...
      catalogs:
        jmx.properties: |
          connector.name=jmx
        tpch.properties: |
          connector.name=tpch
        postgres.properties: |
          connector.name=postgresql
          connection-url=jdbc:postgresql://db.example.com:5432/mydatabase
          connection-user=readonly_user
          # Secrets should be handled via a proper mechanism like sealed-secrets or external-secrets
          connection-password=${POSTGRES_PASSWORD}

提交并推送这个改动,Flux 会同步 TrinoCluster CR 的变更。Trino Operator 会检测到 Spec 的变化,具体来说是 Catalogs 字段的变化。它会更新 production-data-warehouse-catalogs 这个 ConfigMap,将 postgres.properties 添加进去。Kubernetes 会将这个更新的文件同步到所有 Trino Pods 的 /etc/trino/catalog/ 目录中。Trino 会自动加载这个新的 catalog,无需任何手动操作或服务重启。

方案的局限性与未来迭代

这个方案提供了一个健壮的、自动化的 Trino 集群管理框架,但它并非没有局限性。当前的实现是基础的,在生产环境中还需要进一步强化。

首先,更新策略过于简单。当 Spec 中的镜像版本或资源配置发生变化时,Operator 需要实现更智能的滚动更新策略,以确保服务的可用性,例如先更新工作节点,验证健康后,再更新协调器。

其次,缺乏自动伸缩能力。当前工作节点的数量是固定的。未来的迭代可以集成 KEDA (Kubernetes Event-driven Autoscaling),允许根据查询队列长度、CPU 负载等自定义指标来动态调整工作节点的数量。

最后,安全性方面有待加强。配置中的敏感信息(如数据库密码)直接写在 YAML 中是不可接受的。需要集成 ExternalSecrets OperatorSealed Secrets 等工具,从安全的 secret store 中拉取凭证,并由 Operator 注入到 Trino 的配置中。同样,目前没有处理 TLS 加密或用户认证,这些都是生产级部署必须考虑的。


  目录