项目的初始需求很简单:为一个内部的AI模型提供服务,该模型用于分析用户上传的文档(5-100MB),提取关键信息。前端团队计划使用PWA技术,提供一个类似原生应用的体验。然而,第一个架构原型就暴露了致命问题。模型推理耗时从30秒到5分钟不等,一个简单的同步HTTP请求直接导致前端超时,用户体验极差。更糟糕的是,流量高峰期,基于CPU和内存的HPA(Horizontal Pod Autoscaler)策略完全失灵。Pod数量在请求到来时急剧增加,但由于推理任务是长轮询,CPU占用率并不高,导致HPA在任务处理完成前就错误地缩容,造成任务堆积和失败。
这个痛点促使我们彻底放弃同步调用的思路,转向一个完全解耦的、事件驱动的架构。我们的目标是:
- PWA提交任务后立即获得响应,无需等待处理结果。
- 后端推理服务的伸缩必须与实际的任务负载(即待处理的文档数量)直接挂钩,而不是CPU/内存这种间接指标。
- 在没有任务时,推理服务的资源占用应降至最低,甚至为零,以控制成本。
技术选型决策很快就清晰了:Azure AKS作为我们的容器编排平台,Azure Service Bus作为消息队列实现解耦,而关键的弹性伸缩则交给了KEDA(Kubernetes Event-driven Autoscaling)。
架构设计与基础设施准备
整个工作流被设计成异步模式。
graph TD subgraph PWA on Static Web App A[用户界面] --> B{上传文档并提交}; end B --> C[API Gateway]; subgraph Azure AKS Cluster C --> D[任务受理服务 Pod]; D -- 1. 将文档存入Blob Storage, 2. 推送处理消息 --> E((Azure Service Bus Queue)); subgraph KEDA Control Loop F[KEDA Operator] -- 持续监控队列长度 --> E; F -- 动态调整副本数 --> G[推理服务 Deployment]; end G -- 从队列拉取消息 --> E; G -- 3. 从Blob下载文档, 4. 执行模型推理 --> H[ML 模型]; H -- 5. 将结果写入 Cosmos DB --> I[(Azure Cosmos DB)]; end A -.-> J[状态轮询API]; J -- 读取处理状态与结果 --> I; C --> J;
在真实项目中,我们使用Terraform来管理所有Azure资源,确保环境的一致性和可重复性。以下是关键资源的简化版定义,重点在于资源间的关联,例如为AKS集群启用工作负载身份(Workload Identity)以安全地访问其他Azure服务。
# main.tf
provider "azurerm" {
features {}
}
resource "azurerm_resource_group" "rg" {
name = "mlops-pwa-rg"
location = "East US"
}
# Azure Service Bus for decoupling
resource "azurerm_servicebus_namespace" "sbus" {
name = "mlops-sbus-ns-unique"
location = azurerm_resource_group.rg.location
resource_group_name = azurerm_resource_group.rg.name
sku = "Standard"
}
resource "azurerm_servicebus_queue" "task_queue" {
name = "inference-tasks"
namespace_id = azurerm_servicebus_namespace.sbus.id
enable_partitioning = true # For higher throughput
}
# Azure Kubernetes Service
resource "azurerm_kubernetes_cluster" "aks" {
name = "mlops-aks-cluster"
location = azurerm_resource_group.rg.location
resource_group_name = azurerm_resource_group.rg.name
dns_prefix = "mlopsaks"
default_node_pool {
name = "default"
node_count = 1
vm_size = "Standard_DS2_v2"
}
identity {
type = "SystemAssigned"
}
oidc_issuer_enabled = true # Required for Workload Identity
workload_identity_enabled = true
}
# Install KEDA using Helm provider
resource "helm_release" "keda" {
name = "keda"
repository = "https://kedacore.github.io/charts"
chart = "keda"
namespace = "keda"
create_namespace = true
version = "2.10.0" # Use a specific version
depends_on = [
azurerm_kubernetes_cluster.aks
]
}
这里的核心是为AKS启用了oidc_issuer_enabled
和workload_identity_enabled
。这允许我们摆脱在Kubernetes Secret中存储Azure服务连接字符串的古老方式,转而使用更安全的联合身份凭证。
推理服务的容器化与实现
推理服务是整个系统的核心工作负载。它是一个Python应用,使用fastapi
作为web框架(尽管在这个worker模式下我们不直接暴露HTTP端口),并集成azure-servicebus
和azure-identity
SDK。
Dockerfile:
一个生产级的Dockerfile必须考虑镜像大小、构建效率和安全性。我们使用多阶段构建。
# Dockerfile for inference-worker
# --- Build Stage ---
FROM python:3.9-slim as builder
WORKDIR /app
# Install build dependencies
RUN pip install --no-cache-dir poetry
# Copy only files needed for dependency installation
COPY poetry.lock pyproject.toml ./
# Install dependencies
RUN poetry config virtualenvs.create false && \
poetry install --no-dev --no-interaction --no-ansi
# --- Final Stage ---
FROM python:3.9-slim
WORKDIR /app
# Non-root user for security
RUN groupadd --gid 1001 appuser && \
useradd --uid 1001 --gid 1001 -ms /bin/bash appuser
# Copy virtual env from builder stage
COPY /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
COPY /usr/local/bin /usr/local/bin
# Copy application code
COPY ./src ./src
# Set ownership
RUN chown -R appuser:appuser /app
USER appuser
# Command to run
CMD ["python", "src/main.py"]
这种方式将构建环境和最终的运行时环境分离,最终镜像非常小且不包含不必要的构建工具。
Python Worker核心逻辑 (src/main.py
):
这段代码的健壮性至关重要。它需要处理优雅退出、消息处理失败、连接重试等问题。
# src/main.py
import os
import logging
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.identity.aio import ManagedIdentityCredential
from azure.servicebus.exceptions import ServiceBusError
from azure.servicebus.aio.receiver import ServiceBusReceiver
# --- Configuration ---
# Using Managed Identity - no secrets in code
FULLY_QUALIFIED_NAMESPACE = os.environ.get("SERVICEBUS_NAMESPACE")
QUEUE_NAME = os.environ.get("SERVICEBUS_QUEUE_NAME")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID") # For workload identity
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def process_message(receiver: ServiceBusReceiver):
"""Continuously receive and process messages."""
async with receiver:
while True:
try:
messages = await receiver.receive_messages(max_message_count=1, max_wait_time=30)
if not messages:
logger.info("No messages received in the last 30s. Still listening.")
continue
for msg in messages:
try:
message_body = str(msg)
logger.info(f"Received message: {message_body}")
# TODO:
# 1. Parse message body (e.g., JSON with blob storage path)
# 2. Download document from blob storage
# 3. Perform ML inference (this is the long-running part)
await asyncio.sleep(45) # Simulating a 45-second inference task
# 4. Write results to Cosmos DB
logger.info("Processing complete. Completing the message.")
await receiver.complete_message(msg)
except Exception as e:
logger.error(f"Error processing message {msg.message_id}: {e}", exc_info=True)
# Abandon the message to allow another worker to retry
await receiver.abandon_message(msg)
except ServiceBusError as sbe:
logger.error(f"Service Bus connection error: {sbe}. Reconnecting...")
await asyncio.sleep(10) # Wait before retrying
except Exception as e:
logger.error(f"An unexpected error occurred in the receiver loop: {e}", exc_info=True)
# In a real scenario, this might trigger a pod restart
break
async def main():
"""Main function to setup and run the service bus listener."""
logger.info("Starting inference worker...")
# Use workload identity via ManagedIdentityCredential
credential = ManagedIdentityCredential(client_id=AZURE_CLIENT_ID)
servicebus_client = ServiceBusClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
credential=credential,
logging_enable=True
)
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
try:
await process_message(receiver)
finally:
logger.info("Closing service bus client.")
await servicebus_client.close()
await credential.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Worker shutting down gracefully.")
注意这里的错误处理。abandon_message
会让消息重新回到队列,可被其他worker处理,这对于瞬时故障非常有效。如果消息处理多次失败,Service Bus会自动将其移入死信队列(Dead-Letter Queue),防止无限循环。
Kubernetes部署与KEDA配置
现在,我们将推理服务部署到AKS,并用KEDA来驱动它的伸缩。
Deployment (inference-worker-deployment.yaml
):
# inference-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-worker
namespace: mlops
spec:
replicas: 0 # KEDA will manage this. Starting with 0 is key for cost saving.
selector:
matchLabels:
app: inference-worker
template:
metadata:
labels:
app: inference-worker
azure.workload.identity/use: "true" # Enable workload identity
spec:
serviceAccountName: inference-worker-sa # Important for workload identity
containers:
- name: worker
image: myacr.azurecr.io/inference-worker:v1.2.0
env:
- name: SERVICEBUS_NAMESPACE
value: "mlops-sbus-ns-unique.servicebus.windows.net"
- name: SERVICEBUS_QUEUE_NAME
value: "inference-tasks"
- name: AZURE_CLIENT_ID
value: "<your-managed-identity-client-id>"
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
有两个关键点:
-
replicas: 0
:我们明确告诉Kubernetes,默认情况下不需要任何Pod。KEDA将全权负责在需要时将其从0扩展到N。 -
azure.workload.identity/use: "true"
和serviceAccountName
: 这是配置工作负载身份,让Pod能够安全地获取Azure AD令牌以访问Service Bus。
KEDA ScaledObject (keda-scaler.yaml
):
这是整个架构的魔法所在。我们定义一个ScaledObject
,它告诉KEDA要监控什么、如何伸缩。
# keda-scaler.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: inference-worker-scaler
namespace: mlops
spec:
scaleTargetRef:
name: inference-worker # The deployment to scale
pollingInterval: 30 # How often to check the queue (seconds)
cooldownPeriod: 300 # Wait 5 minutes after the last trigger before scaling down to 0
minReplicaCount: 0 # Minimum replicas, can be 0
maxReplicaCount: 20 # Maximum replicas to prevent runaway costs
triggers:
- type: azure-servicebus
metadata:
# Required
queueName: inference-tasks
# Connection authentication
namespace: mlops-sbus-ns-unique
# The number of messages in the queue that will trigger scaling up one pod.
messageCount: "5"
authenticationRef:
name: keda-trigger-auth-workload-identity
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-workload-identity
namespace: mlops
spec:
workloadIdentity:
# The service account that has federated identity credential with Azure AD
podIdentity:
provider: azure
ScaledObject
的配置非常关键。这里的messageCount: "5"
意味着队列中每出现5条消息,KEDA就会尝试增加一个Pod,直到达到maxReplicaCount
。这个值需要根据每个任务的平均处理时间和资源消耗来仔细调整。如果设置得太低(比如1),可能会导致Pod频繁地启停;如果太高,则任务的平均等待时间会增加。cooldownPeriod
同样重要,它防止了在任务处理的间隙期Pod被过早地缩减为零。
PWA前端的异步交互逻辑
前端的体验是这次架构重构的初衷。PWA需要实现一个非阻塞的提交和状态查询流程。
核心JavaScript逻辑 (apiService.js
):
// apiService.js - Simplified example
const API_BASE_URL = '/api'; // Assuming a proxy setup
/**
* Submits a document for processing.
* This is a fire-and-forget operation from the user's perspective.
* @param {File} file The document to upload.
* @returns {Promise<{jobId: string}>} The job ID for status tracking.
*/
async function submitInferenceJob(file) {
const formData = new FormData();
formData.append('document', file);
try {
const response = await fetch(`${API_BASE_URL}/jobs`, {
method: 'POST',
body: formData,
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(errorData.message || 'Failed to submit job.');
}
return await response.json(); // e.g., { jobId: "some-uuid-v4" }
} catch (error) {
console.error('Job submission failed:', error);
throw error;
}
}
/**
* Polls for the status of a specific job.
* @param {string} jobId The ID of the job to check.
* @returns {Promise<any>} The job status and result if completed.
*/
async function getJobStatus(jobId) {
try {
const response = await fetch(`${API_BASE_URL}/jobs/${jobId}`);
if (!response.ok) {
// Handle 404 for not-yet-created results, etc.
return { status: 'PENDING' };
}
return await response.json(); // e.g., { status: 'COMPLETED', result: {...} }
} catch (error) {
console.error(`Failed to get status for job ${jobId}:`, error);
return { status: 'UNKNOWN', error: error.message };
}
}
// UI component logic would use these functions
function handleFileUpload(file) {
const statusElement = document.getElementById('status');
statusElement.textContent = 'Submitting...';
submitInferenceJob(file)
.then(({ jobId }) => {
statusElement.textContent = `Job submitted with ID: ${jobId}. Waiting for results...`;
pollForResult(jobId);
})
.catch(error => {
statusElement.textContent = `Error: ${error.message}`;
});
}
function pollForResult(jobId, interval = 5000) {
const intervalId = setInterval(async () => {
const data = await getJobStatus(jobId);
if (data.status === 'COMPLETED' || data.status === 'FAILED') {
clearInterval(intervalId);
// Update UI with the final result or error
document.getElementById('result').textContent = JSON.stringify(data.result || data.error, null, 2);
}
}, interval);
}
这段代码展示了经典的异步工作流:submitInferenceJob
快速返回一个jobId
,然后UI启动一个轮询器pollForResult
在后台检查作业状态。即使用户关闭了浏览器标签页,后端的处理也不会中断。PWA的Service Worker甚至可以进一步增强体验,例如在应用后台运行时继续轮询,并在任务完成时通过Web Notifications API通知用户。
遗留问题与未来迭代路径
这个架构成功解决了最初的性能和伸缩性问题,实现了基于实际负载的、成本高效的MLOps推理服务。但它并非完美无缺。
首先,客户端轮询是一种简单但低效的机制。当任务处理时间非常长或用户量巨大时,大量的轮询请求会给状态查询API带来不必要的压力。一个更优的方案是引入WebSocket或Azure SignalR服务。任务处理完成后,推理worker可以触发一个事件,通过SignalR将结果直接推送给对应的PWA客户端,实现真正的实时更新。
其次,当前的死信队列处理是被动的。在生产环境中,我们需要为死信队列设置监控和告警。一个常见的模式是创建一个Azure Function,由死信队列中的消息触发,它会记录失败详情、通知开发团队,甚至根据错误类型尝试执行自动化的补偿逻辑。
最后,这个架构只解决了模型部署和服务的环节。一个完整的MLOps流程还应包括模型训练、版本控制、自动化再训练和A/B测试。未来的迭代方向是将这个KEDA驱动的推理服务与Azure Machine Learning的流水线集成,当新版本的模型在模型注册表中注册后,可以自动触发CI/CD流程,构建新的容器镜像并安全地滚动更新到AKS上的inference-worker
部署。