基于 Java Operator 实现 Kubernetes 自定义动态路由网关的架构决策


在 Kubernetes 环境中管理 API 路由,本质上是在与声明式 API 和最终一致性模型打交道。标准的 Ingress Controller 通过解析 Ingress 资源或其 CRD(如 Gateway API)中的注解来生成代理配置,这在多数场景下工作良好。但当路由逻辑变得复杂,或需要与内部系统进行深度集成时,这种基于“注解”或通用 CRD 的配置方式会显得笨拙且缺乏表达力。一个常见的痛点是,业务团队需要一种更贴合其领域模型的、更安全的、且能无缝融入 GitOps 流程的方式来发布和管理 API。

我们的目标是构建一个动态 API 网关,其路由规则本身就是 Kubernetes 的一等公民,能够通过 kubectl apply 进行管理,并且对网关的更新是动态的,无需重启或中断服务。

定义问题:声明式路由的两种实现路径

面对这个挑战,团队内部初步形成了两种截然不同的架构方案。

  • 方案 A: Sidecar 模式与配置热加载

    • 核心思路是在网关 Pod 中注入一个 Sidecar 容器。这个 Sidecar 专门负责监听 Kubernetes ConfigMap 的变化。当路由配置(存储在 ConfigMap 中)更新时,Sidecar 会下载最新的配置文件,并向主网关容器发送一个信号(例如 SIGHUP),或者直接通过一个管理端口触发其热加载逻辑。网关应用本身需要实现一套能够响应信号并无缝重载路由规则的机制。
  • 方案 B: Operator 模式与自定义资源 (CRD)

    • 该方案更为彻底。我们首先定义一个 CRD,用以精确描述我们的路由模型,例如 DynamicRoute。然后,我们编写一个 Kubernetes Operator,它是一个持续运行的控制器,专门监听 DynamicRoute 资源的变化。当有 DynamicRoute 资源被创建、更新或删除时,Operator 会执行调谐逻辑(Reconciliation Logic):它会从 Kubernetes API Server 获取所有相关的路由资源,将它们聚合成一份完整的、可供网关消费的配置,最后将这份配置写入一个集中的 ConfigMap。网关 Pod 仅需挂载并监听这一个 ConfigMap 的变化即可。

架构权衡:为何选择 Operator 模式

方案 A 的吸引力在于其简单性。实现一个监听 ConfigMap 的 Sidecar 脚本并不复杂,对网关本身的侵入性也相对较低。然而,在真实项目中,这种简单性背后隐藏着脆弱性。

  1. 配置表达力有限: ConfigMap 本质上是键值对存储,用于表达复杂的、结构化的路由规则(例如,包含权重、断路器、重试策略的路由)时,通常只能将整个 YAML 或 JSON 作为一个巨大的字符串塞入其中。这失去了 Kubernetes 原生的结构化验证能力。
  2. 可靠性问题: 依赖文件系统监听或进程信号的机制,在容器化环境中可能存在微妙的竞态条件和失败场景。例如,信号处理不当可能导致进程僵死,文件系统原子性更新也需要额外保障。
  3. 职责不清: 网关应用被迫要关心“如何”重载配置,这部分逻辑与核心的代理转发逻辑耦合在一起。

相比之下,方案 B(Operator 模式)虽然初期开发成本更高,但它提供了一个更健壮、更符合云原生理念的长期解决方案。

  1. 声明式 API: 通过 CRD,我们将路由规则提升为 Kubernetes 的原生 API 资源。这意味着我们可以利用 OpenAPI schema 对其进行强类型校验,kubectl explain DynamicRoute.spec 能够清晰地展示所有可用字段。团队成员可以像管理 Deployment 或 Service 一样管理路由。
  2. 职责分离: Operator 承担了所有与 Kubernetes API 交互、配置聚合、验证和分发的“脏活累活”。网关应用本身可以极度简化,它唯一需要做的事情就是从一个固定的地方(比如一个挂载的 ConfigMap 文件)读取并应用配置。这种关注点分离使得两个组件都可以独立演进和测试。
  3. 健壮性与可扩展性: Operator 的调谐循环(Reconcile Loop)模型天生具有幂等性。无论因为什么原因错过了一次事件,下一次调谐都会将系统驱动到期望的状态。此外,未来如果需要增加更复杂的逻辑,比如根据路由状态更新其 .status 字段,或者与其他控制器交互,Operator 提供了完美的扩展点。

最终,我们选择了方案 B。对于一个旨在成为平台级基础设施的组件而言,前期的投入换来长期的稳定性、可维护性和更佳的开发者体验是值得的。

核心实现概览

我们的系统由三个核心部分组成:DynamicRoute CRD、Java Operator 控制器、以及轻量级 Java 网关应用。

graph TD
    subgraph GitOps Pipeline
        A[Developer: git push route.yaml] --> B{CI/CD};
        B --> C[kubectl apply -f route.yaml];
    end

    subgraph Kubernetes Cluster
        C --> D[API Server];
        D -- Watch Event --> E[Java Operator];
        D -- Stores --> F[etcd: DynamicRoute CR];
        E -- 1. Reads All --> F;
        E -- 2. Aggregates & Writes --> G[ConfigMap: gateway-routes-config];
        
        subgraph Gateway Pod
            H[Java Gateway Container] -- Mounts & Watches --> G;
            I[File Watcher Thread] -- Detects Change --> H;
        end
        
        H -- Proxies Traffic --> J[Upstream Service];
    end

    subgraph User Traffic
        K[End User] --> H;
    end

    style E fill:#f9f,stroke:#333,stroke-width:2px
    style H fill:#ccf,stroke:#333,stroke-width:2px

1. 定义 DynamicRoute CRD

这是我们声明式 API 的基石。它定义了路由规则的结构。

crd/dynamicroute.yaml:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: dynamicroutes.gateway.tech.example.com
spec:
  group: gateway.tech.example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                pathPrefix:
                  type: string
                  description: "The URL path prefix to match."
                targetService:
                  type: string
                  description: "The name of the upstream service."
                targetPort:
                  type: integer
                  description: "The port of the upstream service."
                stripPrefix:
                  type: boolean
                  default: true
                  description: "Whether to strip the prefix before forwarding."
              required: ["pathPrefix", "targetService", "targetPort"]
            status:
              type: object
              properties:
                lastUpdated:
                  type: string
                  format: date-time
                state:
                  type: string
                  enum: ["Active", "Error"]
  scope: Namespaced
  names:
    plural: dynamicroutes
    singular: dynamicroute
    kind: DynamicRoute
    shortNames:
    - dr

这个 CRD 定义了一个 DynamicRoute 资源,包含了路由匹配的前缀、目标服务和端口等关键信息。通过 kubectl apply -f crd/dynamicroute.yaml 应用后,我们就可以在集群中创建 DynamicRoute 类型的对象了。

2. Java 网关应用实现

我们选择 Spring WebFlux 和 Netty 来构建一个轻量级、非阻塞的代理。核心在于它需要一个机制来动态更新路由表。这里我们采用一种简单可靠的方式:挂载包含路由配置的 ConfigMap,并使用一个后台线程监控文件变化。

gateway/pom.xml (关键依赖):

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-yaml</artifactId>
    </dependency>
</dependencies>

gateway/src/main/java/com/example/gateway/routing/DynamicRouteLocator.java:

package com.example.gateway.routing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicReference;

// 这是一个简化的Spring Cloud Gateway风格的路由定位器
// 它的路由来源是一个可以被原子更新的引用
@Component
public class DynamicRouteLocator implements RouteLocator {

    private static final Logger log = LoggerFactory.getLogger(DynamicRouteLocator.class);

    private final RouteLocatorBuilder builder;
    private final AtomicReference<RouteLocator> delegate;

    public DynamicRouteLocator(RouteLocatorBuilder builder, RouteConfigWatcher configWatcher) {
        this.builder = builder;
        // 初始时,设置一个空的路由定位器
        this.delegate = new AtomicReference<>(buildRoutes(new RouteDefinitionList()));
        
        // 订阅配置观察者的更新事件
        configWatcher.getRouteUpdates()
            .doOnNext(definitions -> log.info("Received new route definitions, count: {}", definitions.getRoutes().size()))
            .subscribe(this::updateRoutes);
    }

    private void updateRoutes(RouteDefinitionList definitions) {
        try {
            RouteLocator newLocator = buildRoutes(definitions);
            delegate.set(newLocator);
            log.info("Successfully updated routes.");
        } catch (Exception e) {
            log.error("Failed to update routes from new definitions", e);
            // 在真实项目中,这里应该有更复杂的错误处理,比如告警或维持旧配置
        }
    }

    private RouteLocator buildRoutes(RouteDefinitionList definitions) {
        RouteLocatorBuilder.Builder routesBuilder = builder.routes();
        if (definitions != null && definitions.getRoutes() != null) {
            for (RouteDefinition def : definitions.getRoutes()) {
                routesBuilder.route(def.getId(),
                    p -> p.path(def.getPathPrefix() + "/**")
                         .filters(f -> {
                             if (def.isStripPrefix()) {
                                 // Spring Cloud Gateway的StripPrefixFilter需要一个参数,表示剥离几层路径
                                 // 这里简化为剥离路径的第一层
                                 f.stripPrefix(1);
                             }
                             return f;
                         })
                         .uri("http://" + def.getTargetService() + ":" + def.getTargetPort()));
            }
        }
        return routesBuilder.build();
    }

    @Override
    public Flux<Route> getRoutes() {
        return delegate.get().getRoutes();
    }
}

gateway/src/main/java/com/example/gateway/routing/RouteConfigWatcher.java:

package com.example.gateway.routing;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class RouteConfigWatcher {

    private static final Logger log = LoggerFactory.getLogger(RouteConfigWatcher.class);

    @Value("${gateway.config.path}")
    private String configPath;

    private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
    // Sinks.many().replay().latest() 保证新订阅者能立即收到最新的路由定义
    private final Sinks.Many<RouteDefinitionList> routeUpdatesSink = Sinks.many().replay().latest();

    @PostConstruct
    public void init() {
        Path path = Paths.get(configPath);
        // 启动时先加载一次
        loadConfig(path);

        // 使用一个单独的线程来监听文件变化,避免阻塞主线程
        ExecutorService executor = Executors.newSingleThreadExecutor(r -> {
            Thread t = new Thread(r, "config-watcher-thread");
            t.setDaemon(true);
            return t;
        });

        executor.submit(() -> {
            try {
                WatchService watchService = FileSystems.getDefault().newWatchService();
                // 监听文件所在的目录
                path.getParent().register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY);
                log.info("Started watching for config changes at: {}", path.getParent());
                
                WatchKey key;
                while ((key = watchService.take()) != null) {
                    for (WatchEvent<?> event : key.pollEvents()) {
                        // K8s更新ConfigMap是通过创建一个新的符号链接来原子地替换旧文件
                        // 所以我们只需要关心事件发生的文件名是否是我们的目标文件名
                        if (Paths.get(event.context().toString()).getFileName().toString().equals(path.getFileName().toString())) {
                            log.info("Config file change detected: event kind={}, file={}", event.kind(), event.context());
                            loadConfig(path);
                        }
                    }
                    key.reset();
                }
            } catch (IOException | InterruptedException e) {
                log.error("Error watching config file", e);
                Thread.currentThread().interrupt();
            }
        });
    }

    private void loadConfig(Path path) {
        try {
            if (Files.exists(path)) {
                RouteDefinitionList definitions = objectMapper.readValue(path.toFile(), RouteDefinitionList.class);
                routeUpdatesSink.tryEmitNext(definitions);
            } else {
                log.warn("Config file not found at {}. Using empty route list.", path);
                routeUpdatesSink.tryEmitNext(new RouteDefinitionList());
            }
        } catch (IOException e) {
            log.error("Failed to read or parse config file at {}", path, e);
        }
    }

    public Flux<RouteDefinitionList> getRouteUpdates() {
        return routeUpdatesSink.asFlux();
    }
}
// DTOs (RouteDefinition.java, RouteDefinitionList.java) 省略,它们是配置文件的POJO映射

3. Java Operator 实现

这是整个方案的大脑。我们使用 Java Operator SDK 来简化开发。

operator/pom.xml (关键依赖):

<dependency>
    <groupId>io.javaoperatorsdk</groupId>
    <artifactId>operator-framework</artifactId>
    <version>4.5.0</version> <!-- 请使用最新版本 -->
</dependency>
<dependency>
    <groupId>io.fabric8</groupId>
    <artifactId>kubernetes-client</artifactId>
    <version>6.8.0</version> <!-- 请使用与SDK兼容的版本 -->
</dependency>

operator/src/main/java/com/example/operator/crd/DynamicRoute.java (CRD 对应的 POJO):

package com.example.operator.crd;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("gateway.tech.example.com")
@Version("v1")
public class DynamicRoute extends CustomResource<DynamicRouteSpec, DynamicRouteStatus> implements Namespaced {
    // POJO Body is auto-generated or can be empty if using default spec/status fields.
}

DynamicRouteSpec.javaDynamicRouteStatus.java 包含了 CRD 中 specstatus 字段的定义。

operator/src/main/java/com/example/operator/DynamicRouteReconciler.java:

package com.example.operator;

import com.example.gateway.routing.RouteDefinition;
import com.example.gateway.routing.RouteDefinitionList;
import com.example.operator.crd.DynamicRoute;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@ControllerConfiguration
public class DynamicRouteReconciler implements Reconciler<DynamicRoute>, ErrorStatusHandler<DynamicRoute> {

    private static final Logger log = LoggerFactory.getLogger(DynamicRouteReconciler.class);
    public static final String CONFIGMAP_NAME = "gateway-dynamic-routes";
    public static final String CONFIGMAP_KEY = "routes.yaml";

    private final KubernetesClient client;
    private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());

    public DynamicRouteReconciler(KubernetesClient client) {
        this.client = client;
    }

    @Override
    public UpdateControl<DynamicRoute> reconcile(DynamicRoute resource, Context<DynamicRoute> context) {
        log.info("Reconciling DynamicRoute {} in namespace {}", resource.getMetadata().getName(), resource.getMetadata().getNamespace());
        
        try {
            // Step 1: 获取当前命名空间下所有的 DynamicRoute 资源
            List<DynamicRoute> allRoutes = client.resources(DynamicRoute.class)
                .inNamespace(resource.getMetadata().getNamespace())
                .list().getItems();

            // Step 2: 将所有 DynamicRoute CR 转换为我们的内部路由定义
            List<RouteDefinition> routeDefinitions = allRoutes.stream()
                .map(this::toRouteDefinition)
                .collect(Collectors.toList());

            RouteDefinitionList routeList = new RouteDefinitionList();
            routeList.setRoutes(routeDefinitions);

            // Step 3: 将路由定义列表序列化为 YAML 字符串
            String yamlConfig = yamlMapper.writeValueAsString(routeList);

            // Step 4: 创建或更新 ConfigMap
            createOrUpdateConfigMap(resource.getMetadata().getNamespace(), yamlConfig);
            
            // Step 5: (可选) 更新 CR 的 status 字段,这是一个最佳实践
            // ... update status logic ...

            log.info("Successfully reconciled routes for namespace {}. Total routes: {}", resource.getMetadata().getNamespace(), routeDefinitions.size());
            return UpdateControl.noUpdate();

        } catch (JsonProcessingException e) {
            log.error("Failed to serialize route definitions to YAML", e);
            // 可以在这里更新 CR 的 status 来反映错误状态
            return UpdateControl.noUpdate(); // 避免无限重试循环,除非错误是可恢复的
        } catch (Exception e) {
            log.error("An unexpected error occurred during reconciliation", e);
            // 对于 Kubernetes API 调用失败等情况,SDK 会自动重试
            throw new ReconciliationException(e);
        }
    }
    
    // 当资源被删除时,也需要触发一次调谐,以确保配置被清理
    @Override
    public DeleteControl cleanup(DynamicRoute resource, Context<DynamicRoute> context) {
        log.info("Cleaning up for deleted DynamicRoute {} in namespace {}", resource.getMetadata().getName(), resource.getMetadata().getNamespace());
        reconcile(resource, context);
        return Reconciler.super.cleanup(resource, context);
    }
    
    private void createOrUpdateConfigMap(String namespace, String data) {
        ConfigMap existingCm = client.configMaps().inNamespace(namespace).withName(CONFIGMAP_NAME).get();

        if (existingCm == null) {
            ConfigMap newCm = new ConfigMapBuilder()
                .withMetadata(new ObjectMetaBuilder()
                    .withName(CONFIGMAP_NAME)
                    .withNamespace(namespace)
                    .build())
                .withData(Collections.singletonMap(CONFIGMAP_KEY, data))
                .build();
            client.configMaps().inNamespace(namespace).create(newCm);
            log.info("Created new ConfigMap {} in namespace {}", CONFIGMAP_NAME, namespace);
        } else {
            // 比较数据,只有在内容变化时才更新,避免不必要的触发
            if (!data.equals(existingCm.getData().get(CONFIGMAP_KEY))) {
                existingCm.setData(Map.of(CONFIGMAP_KEY, data));
                client.configMaps().inNamespace(namespace).replace(existingCm);
                log.info("Updated ConfigMap {} in namespace {}", CONFIGMAP_NAME, namespace);
            } else {
                log.info("ConfigMap {} data is already up-to-date. Skipping update.", CONFIGMAP_NAME);
            }
        }
    }

    private RouteDefinition toRouteDefinition(DynamicRoute cr) {
        RouteDefinition def = new RouteDefinition();
        // ID 通常使用 CR 的唯一标识符,便于追踪
        def.setId(cr.getMetadata().getNamespace() + "_" + cr.getMetadata().getName());
        def.setPathPrefix(cr.getSpec().getPathPrefix());
        def.setTargetService(cr.getSpec().getTargetService());
        def.setTargetPort(cr.getSpec().getTargetPort());
        def.setStripPrefix(cr.getSpec().isStripPrefix());
        return def;
    }
}

架构的扩展性与局限性

当前这套架构已经能够满足声明式、动态路由的核心需求,并且具备良好的可维护性。然而,它并非没有局限性。

一个主要的局限在于配置分发机制。通过 Operator 更新 ConfigMap,再由网关 Pod 挂载并监听文件变化,这是一种异步且最终一致的模式。从 kubectl apply 到所有网关实例都应用新配置,中间存在一个可感知的延迟(API Server 事件传播 -> Operator 调谐 -> ConfigMap 更新 -> Kubelet 同步到 Pod -> 应用文件监听触发)。对于需要秒级甚至毫秒级路由更新的场景,这个延迟可能是个问题。

其次,将所有路由规则聚合到单个 ConfigMap 中,当路由数量达到成千上万时,可能会触及 ConfigMap 1MB 的大小限制。同时,任何一个 DynamicRoute 的微小改动都会导致整个 ConfigMap 的重写,并触发所有网关 Pod 的配置重载,这在规模化部署时会产生不必要的扰动。

未来的优化路径可以探索更高效的配置分发模型。例如,Operator 可以不通过 ConfigMap,而是直接通过 gRPC 推送或调用每个网关 Pod 暴露的管理 API 来分发增量配置。这可以显著降低延迟并实现更细粒度的更新。另一个方向是借鉴 Envoy 的 xDS 协议,将我们的 Operator 实现为一个控制平面,网关作为数据平面,通过标准化的服务发现协议进行通信,这将使整个系统更加专业和可插拔。


  目录