我们团队维护着一套实时数据处理系统,手动部署和升级Apache Flink作业一直是个痛点。每次发布都需要运维手动上传JAR包,小心翼翼地停止旧作业、创建savepoint,再启动新作业。整个过程不仅繁琐,而且极易出错,尤其是在处理有状态作业时,一次失误就可能导致数据丢失或处理中断。我们需要一个从代码提交到服务上线的全自动化流程,将整个流处理应用——包括Flink作业本身、上游的数据生产者和下游的消费者——作为一个原子单元进行管理和部署。
最初的构想是在我们现有的技术栈上进行扩展。基础设施由Chef统一管理,服务调度则依赖于Nomad。消息队列是RabbitMQ。既然工具链已经存在,目标就变成了如何将Flink无缝地集成进来,并通过Monorepo的模式来统一管理所有相关服务的代码,实现真正的端到端自动化。
第一步:统一代码的基石 - Monorepo结构设计
将所有相关项目——Flink作业、数据生产者API、事件消费者服务、共享的数据模型——都放在一个Git仓库里,能极大简化依赖管理和跨项目的原子性重构。在真实项目中,不同组件的版本对齐是个大问题,Monorepo天生就能解决它。
我们的目录结构设计如下:
/
├── apps/
│ ├── flink-stream-job/ # Flink流处理作业 (Java/Scala)
│ │ ├── src/
│ │ └── build.gradle
│ ├── data-producer-api/ # 生产数据的Go微服务
│ │ ├── main.go
│ │ └── Dockerfile
│ └── event-consumer-service/ # 消费Flink处理结果的Python服务
│ ├── app.py
│ └── Dockerfile
├── libs/
│ └── event-protocol/ # Protobuf定义的共享事件模型
│ └── event.proto
├── infra/
│ ├── chef/ # Chef Cookbooks
│ │ └── cookbooks/
│ │ └── nomad_client/
│ │ └── recipes/
│ │ └── default.rb
│ └── nomad/ # Nomad Job编排文件
│ ├── flink-cluster.nomad.hcl
│ └── rabbitmq.nomad.hcl
└── .gitlab-ci.yml # CI/CD流水线定义
这种结构的好处是显而易见的:
-
libs/event-protocol
的任何变更,可以在一次提交中同时更新生产者、消费者和Flink作业,避免了多仓库版本依赖地狱。 - CI/CD流水线可以配置得更智能,通过路径变更检测,只构建和部署真正发生变化的应用。
第二步:环境准备 - 用Chef固化Nomad客户端环境
我们的Nomad集群运行在裸金属或虚拟机上,由Chef负责配置管理。为了让Nomad能够顺利运行Flink作业,我们需要确保每个Nomad客户端节点都具备必要的环境。这里的坑在于,如果环境不一致,Flink作业可能会在某个节点上启动失败,导致调度循环和资源浪费。
我们为此编写了一个专用的Chef Recipe,以确保环境的幂等性。
infra/chef/cookbooks/nomad_client/recipes/default.rb
:
# infra/chef/cookbooks/nomad_client/recipes/default.rb
# --- 核心目标:为Nomad客户端节点提供运行Flink作业所需的标准环境 ---
# 1. 安装必要的软件包
package 'openjdk-11-jdk' do
action :install
end
package 'unzip' do
action :install
end
# 2. 创建Flink作业和状态后端所需的目录
# 在生产环境中,这些路径应该挂载到持久化存储上,例如NFS或本地SSD RAID。
# 这里为了演示,我们使用本地目录。
directory '/opt/flink' do
owner 'nomad'
group 'nomad'
mode '0755'
recursive true
action :create
end
directory '/var/data/flink/checkpoints' do
owner 'nomad'
group 'nomad'
mode '0755'
recursive true
action :create
end
directory '/var/data/flink/savepoints' do
owner 'nomad'
group 'nomad'
mode '0755'
recursive true
action :create
end
# 3. 配置Nomad客户端
# 启用原始exec驱动,这对直接运行Java进程非常重要。
# 同时,为Flink作业预留系统资源。
# 这里的配置会合并到Nomad的全局配置中。
# 注意:在真实项目中,不应将配置硬编码在recipe里,而应使用Chef attributes。
file '/etc/nomad.d/client.hcl' do
content <<~EOC
client {
enabled = true
# 允许Nomad直接执行二进制文件,比如java命令
options = {
"driver.raw_exec.enable" = "1"
"driver.java.enable" = "1" # 确保java驱动也启用
}
# 为系统进程、Chef-client等预留资源,避免Nomad独占所有资源
reserved {
cpu = 200 # 200 MHz
memory = 512 # 512 MiB
disk = 1024 # 1 GiB
}
}
EOC
owner 'root'
group 'root'
mode '0644'
notifies :restart, 'service[nomad]', :immediately
end
# 4. 确保Nomad服务正在运行并启用
service 'nomad' do
action [:enable, :start]
end
这个Recipe解决了几个关键问题:安装了正确的JDK,创建了Flink需要的持久化目录(并设置了正确的权限,一个常见的错误是Nomad进程没有权限写入),并且显式启用了Nomad的raw_exec
和java
驱动。这为后续直接在Nomad上运行JAR包打下了基础。
第三步:构建与部署的核心 - Nomad Job Spec
这是整个方案中最关键的部分。我们需要编写一个Nomad Job文件来描述如何部署一个Flink集群(包括一个JobManager和一个或多个TaskManager)。使用Nomad的java
驱动而不是docker
驱动是一个经过权衡的决定。虽然Docker提供了更好的隔离性,但java
驱动更轻量,启动速度更快,并且对于内部信任度较高的环境来说,性能开销更小。
infra/nomad/flink-cluster.nomad.hcl
:
// infra/nomad/flink-cluster.nomad.hcl
job "flink-stream-processor" {
datacenters = ["dc1"]
type = "service"
// 使用变量,使得CI可以动态传入JAR包路径和版本号
meta {
FLINK_VERSION = "1.15.2"
JAR_URL = "https://nexus.example.com/artifacts/flink-stream-job-latest.jar"
}
// 参数化配置,允许我们在运行时调整并行度
parameterized {
meta_required = ["JAR_URL"]
}
// 定义一个持久化卷,用于存储状态后端的本地数据
// 这要求Nomad客户端配置了host_volume
volume "flink-state-volume" {
type = "host"
source = "flink-state-backend" // 对应Nomad客户端配置中的host_volume "flink-state-backend"
read_only = false
}
group "flink-cluster" {
// Flink集群规模,可动态调整
count = 1
network {
mode = "host" // 使用host网络模式,简化Flink组件间的通信
port "rpc" { static = 6123 }
port "blob" { static = 6124 }
port "ui" { to = 8081 }
}
// --- JobManager任务定义 ---
task "jobmanager" {
driver = "java"
// 动态获取构建产物
artifact {
source = meta.JAR_URL
destination = "local/" // 下载到分配的任务目录下的local/
}
// Nomad的java驱动配置
config {
jar_path = "local/flink-stream-job-latest.jar"
jvm_options = [
"-Xms1024m", "-Xmx1024m",
"-Dlog.file=/var/log/flink-jm.log"
]
// 传递给Flink作业main方法的参数
args = [
"--job-classname", "com.example.StreamingJob",
"--rabbit.host", "rabbitmq.service.consul",
"--rabbit.port", "5672"
]
}
// Flink JobManager的核心配置
template {
data = <<EOH
jobmanager.rpc.address: {{ env "NOMAD_IP_rpc" }}
jobmanager.rpc.port: {{ env "NOMAD_PORT_rpc" }}
blob.server.port: {{ env "NOMAD_PORT_blob" }}
rest.port: {{ env "NOMAD_PORT_ui" }}
rest.address: {{ env "NOMAD_IP_ui" }}
high-availability: org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServicesFactory
state.backend: rocksdb
state.checkpoints.dir: file:///var/data/flink/checkpoints/{{ env "NOMAD_ALLOC_ID" }}
state.savepoints.dir: file:///var/data/flink/savepoints/
# 这里的并行度设置是默认值,实际运行的并行度由TaskManager的数量决定
parallelism.default: 2
EOH
destination = "local/flink-conf.yaml"
env = true // 将配置渲染为环境变量,供Flink启动脚本使用
}
// 资源配置
resources {
cpu = 1000 # 1 GHz
memory = 2048 # 2 GiB
}
// 日志配置,便于排查问题
logs {
max_files = 3
max_file_size = 15
}
// 服务发现,将Flink UI注册到Consul
service {
name = "flink-jobmanager-ui"
port = "ui"
tags = ["flink", "streaming"]
check {
type = "http"
path = "/"
interval = "10s"
timeout = "2s"
}
}
}
// --- TaskManager任务定义 ---
task "taskmanager" {
driver = "java"
// TaskManager也需要JAR包,以便访问作业的类
artifact {
source = meta.JAR_URL
destination = "local/"
}
// 挂载用于状态后端的卷
volume_mount {
volume = "flink-state-volume"
destination = "/var/data/flink/state"
read_only = false
}
config {
// TaskManager启动时不需要指定jar_path和args,它会从JobManager获取作业信息
class = "org.apache.flink.runtime.taskexecutor.TaskManagerRunner"
jvm_options = [
"-Xms2048m", "-Xmx2048m",
"-Dlog.file=/var/log/flink-tm.log",
// 配置RocksDB状态后端使用挂载的卷
"-Dstate.backend.rocksdb.localdir=/var/data/flink/state/{{ env \"NOMAD_ALLOC_ID\" }}"
]
}
// TaskManager也需要同样的配置文件
template {
data = <<EOH
jobmanager.rpc.address: {{ env "NOMAD_IP_rpc" }}
jobmanager.rpc.port: {{ env "NOMAD_PORT_rpc" }}
blob.server.port: {{ env "NOMAD_PORT_blob" }}
# 关键:TaskManager需要知道JobManager在哪里
taskmanager.numberOfTaskSlots: 2
EOH
destination = "local/flink-conf.yaml"
env = true
}
resources {
cpu = 2000 # 2 GHz
memory = 4096 # 4 GiB
}
logs {
max_files = 5
max_file_size = 20
}
}
}
}
这个Nomad文件包含了许多生产实践:
- 参数化 (
parameterized
): CI流水线可以通过-var "JAR_URL=..."
的方式传入构建好的JAR包地址,无需每次修改Job文件。 - 网络模式 (
host
): 简化了Flink内部组件的通信,JobManager和TaskManager可以直接通过NOMAD_IP
和静态端口通信,避免了复杂的端口映射和发现机制。 - 状态管理 (
volume
,volume_mount
): 我们定义了一个host
卷来持久化RocksDB的状态。这意味着即使TaskManager重启,它的本地状态也不会丢失。这是一个本地持久化方案,虽然有单点故障风险,但实现简单,适用于很多场景。 - 配置模板 (
template
): 使用Nomad的模板功能动态生成flink-conf.yaml
。这使得我们可以利用Nomad提供的环境变量(如NOMAD_IP_rpc
)来动态配置Flink,避免硬编码。 - 服务发现 (
service
): 将Flink的UI注册到Consul,方便开发和运维人员访问。
第四步:串联一切的CI/CD流水线
有了Monorepo结构、Chef配置和Nomad Job文件,最后一步就是用CI/CD流水线把它们全部自动化地串联起来。我们使用GitLab CI。
.gitlab-ci.yml
:
# .gitlab-ci.yml
variables:
# Nexus作为我们的Maven仓库和制品库
NEXUS_URL: "https://nexus.example.com/repository"
# Docker镜像仓库
IMAGE_REGISTRY: "registry.example.com/data-platform"
stages:
- build
- test
- deploy
# 使用rules:changes来检测变更,实现Monorepo下的智能构建
build_flink_job:
stage: build
image: gradle:7.5-jdk11
script:
- cd apps/flink-stream-job
- gradle clean build shadowJar
- |
# 将构建产物上传到Nexus
curl -v -u "${NEXUS_USER}:${NEXUS_PASS}" --upload-file build/libs/flink-stream-job-*-all.jar \
"${NEXUS_URL}/raw/flink-stream-job-latest.jar"
artifacts:
paths:
- apps/flink-stream-job/build/libs/*.jar
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- apps/flink-stream-job/**/*
- libs/event-protocol/**/*
build_producer_api:
stage: build
image: docker:20.10
services:
- docker:20.10-dind
script:
- cd apps/data-producer-api
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $IMAGE_REGISTRY
- docker build -t ${IMAGE_REGISTRY}/data-producer:latest .
- docker push ${IMAGE_REGISTRY}/data-producer:latest
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
changes:
- apps/data-producer-api/**/*
- libs/event-protocol/**/*
# ... 其他服务的构建任务 ...
deploy_flink_cluster:
stage: deploy
image:
name: hashicorp/nomad:1.4.3
entrypoint: [""]
script:
# 确保NOMAD_ADDR环境变量已在CI/CD设置中配置
- echo "Deploying Flink cluster to Nomad..."
- export JAR_URL="${NEXUS_URL}/raw/flink-stream-job-latest.jar"
# --- 这是有状态更新的关键步骤 ---
# 1. 检查当前是否有正在运行的作业
- |
if nomad job status flink-stream-processor | grep "running"; then
echo "Found running job, triggering savepoint..."
# 从Nomad API获取JobManager的地址和端口
JM_ADDR=$(nomad service status -address flink-jobmanager-ui)
if [ -z "$JM_ADDR" ]; then
echo "Error: Could not resolve JobManager address from Consul."
exit 1
fi
# 通过Flink REST API触发savepoint
TRIGGER_ID=$(curl -s -X POST "http://${JM_ADDR}/jobs/$(curl -s http://${JM_ADDR}/jobs | jq -r '.jobs[0].id')/savepoints" | jq -r '."request-id"')
# 轮询savepoint状态
SP_PATH=""
for i in {1..30}; do
STATUS_RESP=$(curl -s "http://${JM_ADDR}/jobs/$(curl -s http://${JM_ADDR}/jobs | jq -r '.jobs[0].id')/savepoints/${TRIGGER_ID}")
if echo "$STATUS_RESP" | grep -q "COMPLETED"; then
SP_PATH=$(echo "$STATUS_RESP" | jq -r '.operation.location')
echo "Savepoint created successfully at: ${SP_PATH}"
break
fi
echo "Waiting for savepoint completion... (${i}/30)"
sleep 2
done
if [ -z "$SP_PATH" ]; then
echo "Failed to create savepoint in time."
exit 1
fi
# 部署新版本,并从savepoint恢复
nomad job run -var="JAR_URL=${JAR_URL}" -var="savepoint_path=${SP_PATH}" infra/nomad/flink-cluster.nomad.hcl
else
echo "No running job found, performing initial deployment."
nomad job run -var="JAR_URL=${JAR_URL}" infra/nomad/flink-cluster.nomad.hcl
fi
needs:
- build_flink_job
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
when: on_success
changes:
- apps/flink-stream-job/**/*
- infra/nomad/flink-cluster.nomad.hcl
这个流水线实现了几个高级功能:
- 变更驱动:
rules:changes
确保了只有当相关代码(如apps/flink-stream-job
)发生变化时,才会触发对应的构建和部署任务,这在Monorepo中至关重要,避免了不必要的资源浪费。 - 有状态升级:
deploy_flink_cluster
任务中的脚本是整个流程的“大脑”。它首先检查Nomad中是否存在正在运行的作业。如果存在,它会通过调用Flink的REST API来自动触发一个savepoint,并轮询直到savepoint完成。然后,它带着savepoint的路径作为参数来启动新版本的作业,实现了零数据丢失的平滑升级。这是一个典型的生产级部署操作。
最终成果的展现
当开发者向main
分支推送一个对flink-stream-job
的修改时,整个流程会自动触发:
- GitLab CI检测到
apps/flink-stream-job/
路径下的变更。 -
build_flink_job
任务被触发,编译代码,打包成JAR,并上传到Nexus。 deploy_flink_cluster
任务被触发。- 脚本连接到Nomad集群,发现旧的Flink作业正在运行。
- 通过Consul找到Flink JobManager的UI地址。
- 向Flink REST API发送请求,创建一个savepoint。
- 获取到savepoint的路径后,执行
nomad job run
,将新的JAR包URL和savepoint路径注入到flink-cluster.nomad.hcl
模板中。
- Nomad接收到指令,开始执行滚动更新,平滑地用新任务替换旧任务,并从指定的savepoint恢复状态。
整个流程的架构图如下所示:
graph TD A[Developer: git push] --> B{GitLab}; B -- Webhook --> C[GitLab Runner]; C -- Detects changes in 'apps/flink-stream-job' --> D[Build Stage]; D -- 1. gradle build --> E[Fat JAR]; E -- 2. upload --> F[Nexus Artifacts]; C -- On Success --> G[Deploy Stage]; G -- 1. Get JAR URL from Nexus --> H(Deployment Script); H -- 2. Check Nomad for running job --> I{Nomad API}; H -- 3. Trigger Savepoint via Flink API --> J{Flink JobManager}; J -- Returns Savepoint Path --> H; H -- 4. nomad job run with JAR & Savepoint --> I; I -- Schedules Tasks --> K[Nomad Clients]; subgraph Nomad Clients Managed by Chef K -- Runs --> L[JobManager Task]; K -- Runs --> M[TaskManager Task]; end L -- Restores state from --> N[Persistent Volume]; M -- State Backend --> N; subgraph External Services O[RabbitMQ on Nomad]; P[Data Producer on Nomad]; end M -- Consumes/Produces --> O; P -- Produces to --> O;
局限性与未来迭代方向
这套方案虽然实现了高度自动化,但在真实生产环境中,它仍然存在一些可以改进的地方。
首先,状态后端的本地持久化方案(host_volume
)存在单点故障风险。如果某个Nomad客户端节点物理损坏,其上存储的Flink作业状态将会丢失。更健壮的方案是切换到分布式文件系统,如HDFS或S3,作为检查点和savepoint的存储。这需要在Nomad Job文件中修改Flink配置,并确保Nomad客户端有访问这些系统的权限。
其次,当前的部署脚本是命令式的,虽然有效,但不够“云原生”。未来的演进方向是引入GitOps模式。可以开发一个简单的Nomad Operator,或者使用一个能够声明式管理Nomad作业的工具,让它监听Git仓库中Nomad Job文件的变化,并自动调谐(reconcile)集群状态,使得部署过程更加声明式和可追溯。
最后,使用Nomad的java
驱动虽然简化了部署,但也牺牲了部分环境隔离性和依赖管理的可控性。随着业务复杂度的增加,可能会出现不同作业依赖不同版本库的情况。那时,转向使用预构建的、包含Flink环境和作业JAR的Docker镜像,并切换到Nomad的docker
驱动,将是一个更具扩展性的选择。