Monorepo驱动的Apache Flink流处理应用在Nomad集群上的自动化部署实践


我们团队维护着一套实时数据处理系统,手动部署和升级Apache Flink作业一直是个痛点。每次发布都需要运维手动上传JAR包,小心翼翼地停止旧作业、创建savepoint,再启动新作业。整个过程不仅繁琐,而且极易出错,尤其是在处理有状态作业时,一次失误就可能导致数据丢失或处理中断。我们需要一个从代码提交到服务上线的全自动化流程,将整个流处理应用——包括Flink作业本身、上游的数据生产者和下游的消费者——作为一个原子单元进行管理和部署。

最初的构想是在我们现有的技术栈上进行扩展。基础设施由Chef统一管理,服务调度则依赖于Nomad。消息队列是RabbitMQ。既然工具链已经存在,目标就变成了如何将Flink无缝地集成进来,并通过Monorepo的模式来统一管理所有相关服务的代码,实现真正的端到端自动化。

第一步:统一代码的基石 - Monorepo结构设计

将所有相关项目——Flink作业、数据生产者API、事件消费者服务、共享的数据模型——都放在一个Git仓库里,能极大简化依赖管理和跨项目的原子性重构。在真实项目中,不同组件的版本对齐是个大问题,Monorepo天生就能解决它。

我们的目录结构设计如下:

/
├── apps/
│   ├── flink-stream-job/         # Flink流处理作业 (Java/Scala)
│   │   ├── src/
│   │   └── build.gradle
│   ├── data-producer-api/        # 生产数据的Go微服务
│   │   ├── main.go
│   │   └── Dockerfile
│   └── event-consumer-service/   # 消费Flink处理结果的Python服务
│       ├── app.py
│       └── Dockerfile
├── libs/
│   └── event-protocol/           # Protobuf定义的共享事件模型
│       └── event.proto
├── infra/
│   ├── chef/                     # Chef Cookbooks
│   │   └── cookbooks/
│   │       └── nomad_client/
│   │           └── recipes/
│   │               └── default.rb
│   └── nomad/                    # Nomad Job编排文件
│       ├── flink-cluster.nomad.hcl
│       └── rabbitmq.nomad.hcl
└── .gitlab-ci.yml                # CI/CD流水线定义

这种结构的好处是显而易见的:

  1. libs/event-protocol的任何变更,可以在一次提交中同时更新生产者、消费者和Flink作业,避免了多仓库版本依赖地狱。
  2. CI/CD流水线可以配置得更智能,通过路径变更检测,只构建和部署真正发生变化的应用。

第二步:环境准备 - 用Chef固化Nomad客户端环境

我们的Nomad集群运行在裸金属或虚拟机上,由Chef负责配置管理。为了让Nomad能够顺利运行Flink作业,我们需要确保每个Nomad客户端节点都具备必要的环境。这里的坑在于,如果环境不一致,Flink作业可能会在某个节点上启动失败,导致调度循环和资源浪费。

我们为此编写了一个专用的Chef Recipe,以确保环境的幂等性。

infra/chef/cookbooks/nomad_client/recipes/default.rb:

# infra/chef/cookbooks/nomad_client/recipes/default.rb

# --- 核心目标:为Nomad客户端节点提供运行Flink作业所需的标准环境 ---

# 1. 安装必要的软件包
package 'openjdk-11-jdk' do
  action :install
end

package 'unzip' do
  action :install
end

# 2. 创建Flink作业和状态后端所需的目录
# 在生产环境中,这些路径应该挂载到持久化存储上,例如NFS或本地SSD RAID。
# 这里为了演示,我们使用本地目录。
directory '/opt/flink' do
  owner 'nomad'
  group 'nomad'
  mode '0755'
  recursive true
  action :create
end

directory '/var/data/flink/checkpoints' do
  owner 'nomad'
  group 'nomad'
  mode '0755'
  recursive true
  action :create
end

directory '/var/data/flink/savepoints' do
  owner 'nomad'
  group 'nomad'
  mode '0755'
  recursive true
  action :create
end

# 3. 配置Nomad客户端
# 启用原始exec驱动,这对直接运行Java进程非常重要。
# 同时,为Flink作业预留系统资源。
# 这里的配置会合并到Nomad的全局配置中。
# 注意:在真实项目中,不应将配置硬编码在recipe里,而应使用Chef attributes。
file '/etc/nomad.d/client.hcl' do
  content <<~EOC
    client {
      enabled = true
      
      # 允许Nomad直接执行二进制文件,比如java命令
      options = {
        "driver.raw_exec.enable" = "1"
        "driver.java.enable" = "1" # 确保java驱动也启用
      }

      # 为系统进程、Chef-client等预留资源,避免Nomad独占所有资源
      reserved {
        cpu    = 200 # 200 MHz
        memory = 512 # 512 MiB
        disk   = 1024 # 1 GiB
      }
    }
  EOC
  owner 'root'
  group 'root'
  mode '0644'
  notifies :restart, 'service[nomad]', :immediately
end

# 4. 确保Nomad服务正在运行并启用
service 'nomad' do
  action [:enable, :start]
end

这个Recipe解决了几个关键问题:安装了正确的JDK,创建了Flink需要的持久化目录(并设置了正确的权限,一个常见的错误是Nomad进程没有权限写入),并且显式启用了Nomad的raw_execjava驱动。这为后续直接在Nomad上运行JAR包打下了基础。

第三步:构建与部署的核心 - Nomad Job Spec

这是整个方案中最关键的部分。我们需要编写一个Nomad Job文件来描述如何部署一个Flink集群(包括一个JobManager和一个或多个TaskManager)。使用Nomad的java驱动而不是docker驱动是一个经过权衡的决定。虽然Docker提供了更好的隔离性,但java驱动更轻量,启动速度更快,并且对于内部信任度较高的环境来说,性能开销更小。

infra/nomad/flink-cluster.nomad.hcl:

// infra/nomad/flink-cluster.nomad.hcl

job "flink-stream-processor" {
  datacenters = ["dc1"]
  type        = "service"

  // 使用变量,使得CI可以动态传入JAR包路径和版本号
  meta {
    FLINK_VERSION = "1.15.2"
    JAR_URL       = "https://nexus.example.com/artifacts/flink-stream-job-latest.jar"
  }
  
  // 参数化配置,允许我们在运行时调整并行度
  parameterized {
    meta_required = ["JAR_URL"]
  }

  // 定义一个持久化卷,用于存储状态后端的本地数据
  // 这要求Nomad客户端配置了host_volume
  volume "flink-state-volume" {
    type      = "host"
    source    = "flink-state-backend" // 对应Nomad客户端配置中的host_volume "flink-state-backend"
    read_only = false
  }

  group "flink-cluster" {
    // Flink集群规模,可动态调整
    count = 1

    network {
      mode = "host" // 使用host网络模式,简化Flink组件间的通信
      port "rpc" { static = 6123 }
      port "blob" { static = 6124 }
      port "ui" { to = 8081 }
    }

    // --- JobManager任务定义 ---
    task "jobmanager" {
      driver = "java"

      // 动态获取构建产物
      artifact {
        source      = meta.JAR_URL
        destination = "local/" // 下载到分配的任务目录下的local/
      }

      // Nomad的java驱动配置
      config {
        jar_path = "local/flink-stream-job-latest.jar"
        jvm_options = [
          "-Xms1024m", "-Xmx1024m",
          "-Dlog.file=/var/log/flink-jm.log"
        ]
        // 传递给Flink作业main方法的参数
        args = [
          "--job-classname", "com.example.StreamingJob",
          "--rabbit.host", "rabbitmq.service.consul",
          "--rabbit.port", "5672"
        ]
      }

      // Flink JobManager的核心配置
      template {
        data = <<EOH
jobmanager.rpc.address: {{ env "NOMAD_IP_rpc" }}
jobmanager.rpc.port: {{ env "NOMAD_PORT_rpc" }}
blob.server.port: {{ env "NOMAD_PORT_blob" }}
rest.port: {{ env "NOMAD_PORT_ui" }}
rest.address: {{ env "NOMAD_IP_ui" }}
high-availability: org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServicesFactory
state.backend: rocksdb
state.checkpoints.dir: file:///var/data/flink/checkpoints/{{ env "NOMAD_ALLOC_ID" }}
state.savepoints.dir: file:///var/data/flink/savepoints/
# 这里的并行度设置是默认值,实际运行的并行度由TaskManager的数量决定
parallelism.default: 2
EOH
        destination = "local/flink-conf.yaml"
        env         = true // 将配置渲染为环境变量,供Flink启动脚本使用
      }

      // 资源配置
      resources {
        cpu    = 1000 # 1 GHz
        memory = 2048 # 2 GiB
      }

      // 日志配置,便于排查问题
      logs {
        max_files     = 3
        max_file_size = 15
      }

      // 服务发现,将Flink UI注册到Consul
      service {
        name = "flink-jobmanager-ui"
        port = "ui"
        tags = ["flink", "streaming"]

        check {
          type     = "http"
          path     = "/"
          interval = "10s"
          timeout  = "2s"
        }
      }
    }

    // --- TaskManager任务定义 ---
    task "taskmanager" {
      driver = "java"

      // TaskManager也需要JAR包,以便访问作业的类
      artifact {
        source      = meta.JAR_URL
        destination = "local/"
      }

      // 挂载用于状态后端的卷
      volume_mount {
        volume      = "flink-state-volume"
        destination = "/var/data/flink/state"
        read_only   = false
      }

      config {
        // TaskManager启动时不需要指定jar_path和args,它会从JobManager获取作业信息
        class = "org.apache.flink.runtime.taskexecutor.TaskManagerRunner"
        jvm_options = [
          "-Xms2048m", "-Xmx2048m",
          "-Dlog.file=/var/log/flink-tm.log",
          // 配置RocksDB状态后端使用挂载的卷
          "-Dstate.backend.rocksdb.localdir=/var/data/flink/state/{{ env \"NOMAD_ALLOC_ID\" }}"
        ]
      }
      
      // TaskManager也需要同样的配置文件
      template {
        data = <<EOH
jobmanager.rpc.address: {{ env "NOMAD_IP_rpc" }}
jobmanager.rpc.port: {{ env "NOMAD_PORT_rpc" }}
blob.server.port: {{ env "NOMAD_PORT_blob" }}
# 关键:TaskManager需要知道JobManager在哪里
taskmanager.numberOfTaskSlots: 2
EOH
        destination = "local/flink-conf.yaml"
        env         = true
      }

      resources {
        cpu    = 2000 # 2 GHz
        memory = 4096 # 4 GiB
      }

      logs {
        max_files     = 5
        max_file_size = 20
      }
    }
  }
}

这个Nomad文件包含了许多生产实践:

  1. 参数化 (parameterized): CI流水线可以通过 -var "JAR_URL=..." 的方式传入构建好的JAR包地址,无需每次修改Job文件。
  2. 网络模式 (host): 简化了Flink内部组件的通信,JobManager和TaskManager可以直接通过NOMAD_IP和静态端口通信,避免了复杂的端口映射和发现机制。
  3. 状态管理 (volume, volume_mount): 我们定义了一个host卷来持久化RocksDB的状态。这意味着即使TaskManager重启,它的本地状态也不会丢失。这是一个本地持久化方案,虽然有单点故障风险,但实现简单,适用于很多场景。
  4. 配置模板 (template): 使用Nomad的模板功能动态生成flink-conf.yaml。这使得我们可以利用Nomad提供的环境变量(如NOMAD_IP_rpc)来动态配置Flink,避免硬编码。
  5. 服务发现 (service): 将Flink的UI注册到Consul,方便开发和运维人员访问。

第四步:串联一切的CI/CD流水线

有了Monorepo结构、Chef配置和Nomad Job文件,最后一步就是用CI/CD流水线把它们全部自动化地串联起来。我们使用GitLab CI。

.gitlab-ci.yml:

# .gitlab-ci.yml

variables:
  # Nexus作为我们的Maven仓库和制品库
  NEXUS_URL: "https://nexus.example.com/repository"
  # Docker镜像仓库
  IMAGE_REGISTRY: "registry.example.com/data-platform"

stages:
  - build
  - test
  - deploy

# 使用rules:changes来检测变更,实现Monorepo下的智能构建
build_flink_job:
  stage: build
  image: gradle:7.5-jdk11
  script:
    - cd apps/flink-stream-job
    - gradle clean build shadowJar
    - |
      # 将构建产物上传到Nexus
      curl -v -u "${NEXUS_USER}:${NEXUS_PASS}" --upload-file build/libs/flink-stream-job-*-all.jar \
      "${NEXUS_URL}/raw/flink-stream-job-latest.jar"
  artifacts:
    paths:
      - apps/flink-stream-job/build/libs/*.jar
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - apps/flink-stream-job/**/*
        - libs/event-protocol/**/*

build_producer_api:
  stage: build
  image: docker:20.10
  services:
    - docker:20.10-dind
  script:
    - cd apps/data-producer-api
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $IMAGE_REGISTRY
    - docker build -t ${IMAGE_REGISTRY}/data-producer:latest .
    - docker push ${IMAGE_REGISTRY}/data-producer:latest
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'
      changes:
        - apps/data-producer-api/**/*
        - libs/event-protocol/**/*

# ... 其他服务的构建任务 ...

deploy_flink_cluster:
  stage: deploy
  image:
    name: hashicorp/nomad:1.4.3
    entrypoint: [""]
  script:
    # 确保NOMAD_ADDR环境变量已在CI/CD设置中配置
    - echo "Deploying Flink cluster to Nomad..."
    - export JAR_URL="${NEXUS_URL}/raw/flink-stream-job-latest.jar"
    
    # --- 这是有状态更新的关键步骤 ---
    # 1. 检查当前是否有正在运行的作业
    - |
      if nomad job status flink-stream-processor | grep "running"; then
        echo "Found running job, triggering savepoint..."
        # 从Nomad API获取JobManager的地址和端口
        JM_ADDR=$(nomad service status -address flink-jobmanager-ui)
        if [ -z "$JM_ADDR" ]; then
          echo "Error: Could not resolve JobManager address from Consul."
          exit 1
        fi
        
        # 通过Flink REST API触发savepoint
        TRIGGER_ID=$(curl -s -X POST "http://${JM_ADDR}/jobs/$(curl -s http://${JM_ADDR}/jobs | jq -r '.jobs[0].id')/savepoints" | jq -r '."request-id"')
        
        # 轮询savepoint状态
        SP_PATH=""
        for i in {1..30}; do
          STATUS_RESP=$(curl -s "http://${JM_ADDR}/jobs/$(curl -s http://${JM_ADDR}/jobs | jq -r '.jobs[0].id')/savepoints/${TRIGGER_ID}")
          if echo "$STATUS_RESP" | grep -q "COMPLETED"; then
            SP_PATH=$(echo "$STATUS_RESP" | jq -r '.operation.location')
            echo "Savepoint created successfully at: ${SP_PATH}"
            break
          fi
          echo "Waiting for savepoint completion... (${i}/30)"
          sleep 2
        done

        if [ -z "$SP_PATH" ]; then
          echo "Failed to create savepoint in time."
          exit 1
        fi
        
        # 部署新版本,并从savepoint恢复
        nomad job run -var="JAR_URL=${JAR_URL}" -var="savepoint_path=${SP_PATH}" infra/nomad/flink-cluster.nomad.hcl
      else
        echo "No running job found, performing initial deployment."
        nomad job run -var="JAR_URL=${JAR_URL}" infra/nomad/flink-cluster.nomad.hcl
      fi
  needs:
    - build_flink_job
  rules:
    - if: '$CI_COMMIT_BRANCH == "main"'
      when: on_success
      changes:
        - apps/flink-stream-job/**/*
        - infra/nomad/flink-cluster.nomad.hcl

这个流水线实现了几个高级功能:

  1. 变更驱动: rules:changes确保了只有当相关代码(如apps/flink-stream-job)发生变化时,才会触发对应的构建和部署任务,这在Monorepo中至关重要,避免了不必要的资源浪费。
  2. 有状态升级: deploy_flink_cluster任务中的脚本是整个流程的“大脑”。它首先检查Nomad中是否存在正在运行的作业。如果存在,它会通过调用Flink的REST API来自动触发一个savepoint,并轮询直到savepoint完成。然后,它带着savepoint的路径作为参数来启动新版本的作业,实现了零数据丢失的平滑升级。这是一个典型的生产级部署操作。

最终成果的展现

当开发者向main分支推送一个对flink-stream-job的修改时,整个流程会自动触发:

  1. GitLab CI检测到apps/flink-stream-job/路径下的变更。
  2. build_flink_job任务被触发,编译代码,打包成JAR,并上传到Nexus。
  3. deploy_flink_cluster任务被触发。
    • 脚本连接到Nomad集群,发现旧的Flink作业正在运行。
    • 通过Consul找到Flink JobManager的UI地址。
    • 向Flink REST API发送请求,创建一个savepoint。
    • 获取到savepoint的路径后,执行nomad job run,将新的JAR包URL和savepoint路径注入到flink-cluster.nomad.hcl模板中。
  4. Nomad接收到指令,开始执行滚动更新,平滑地用新任务替换旧任务,并从指定的savepoint恢复状态。

整个流程的架构图如下所示:

graph TD
    A[Developer: git push] --> B{GitLab};
    B -- Webhook --> C[GitLab Runner];
    C -- Detects changes in 'apps/flink-stream-job' --> D[Build Stage];
    D -- 1. gradle build --> E[Fat JAR];
    E -- 2. upload --> F[Nexus Artifacts];
    
    C -- On Success --> G[Deploy Stage];
    G -- 1. Get JAR URL from Nexus --> H(Deployment Script);
    H -- 2. Check Nomad for running job --> I{Nomad API};
    H -- 3. Trigger Savepoint via Flink API --> J{Flink JobManager};
    J -- Returns Savepoint Path --> H;
    H -- 4. nomad job run with JAR & Savepoint --> I;
    I -- Schedules Tasks --> K[Nomad Clients];
    
    subgraph Nomad Clients Managed by Chef
        K -- Runs --> L[JobManager Task];
        K -- Runs --> M[TaskManager Task];
    end

    L -- Restores state from --> N[Persistent Volume];
    M -- State Backend --> N;

    subgraph External Services
        O[RabbitMQ on Nomad];
        P[Data Producer on Nomad];
    end

    M -- Consumes/Produces --> O;
    P -- Produces to --> O;

局限性与未来迭代方向

这套方案虽然实现了高度自动化,但在真实生产环境中,它仍然存在一些可以改进的地方。

首先,状态后端的本地持久化方案(host_volume)存在单点故障风险。如果某个Nomad客户端节点物理损坏,其上存储的Flink作业状态将会丢失。更健壮的方案是切换到分布式文件系统,如HDFS或S3,作为检查点和savepoint的存储。这需要在Nomad Job文件中修改Flink配置,并确保Nomad客户端有访问这些系统的权限。

其次,当前的部署脚本是命令式的,虽然有效,但不够“云原生”。未来的演进方向是引入GitOps模式。可以开发一个简单的Nomad Operator,或者使用一个能够声明式管理Nomad作业的工具,让它监听Git仓库中Nomad Job文件的变化,并自动调谐(reconcile)集群状态,使得部署过程更加声明式和可追溯。

最后,使用Nomad的java驱动虽然简化了部署,但也牺牲了部分环境隔离性和依赖管理的可控性。随着业务复杂度的增加,可能会出现不同作业依赖不同版本库的情况。那时,转向使用预构建的、包含Flink环境和作业JAR的Docker镜像,并切换到Nomad的docker驱动,将是一个更具扩展性的选择。


  目录