在 Azure AKS 上构建基于 KEDA 的事件驱动型 MLOps 推理服务架构以支持 PWA


项目的初始需求很简单:为一个内部的AI模型提供服务,该模型用于分析用户上传的文档(5-100MB),提取关键信息。前端团队计划使用PWA技术,提供一个类似原生应用的体验。然而,第一个架构原型就暴露了致命问题。模型推理耗时从30秒到5分钟不等,一个简单的同步HTTP请求直接导致前端超时,用户体验极差。更糟糕的是,流量高峰期,基于CPU和内存的HPA(Horizontal Pod Autoscaler)策略完全失灵。Pod数量在请求到来时急剧增加,但由于推理任务是长轮询,CPU占用率并不高,导致HPA在任务处理完成前就错误地缩容,造成任务堆积和失败。

这个痛点促使我们彻底放弃同步调用的思路,转向一个完全解耦的、事件驱动的架构。我们的目标是:

  1. PWA提交任务后立即获得响应,无需等待处理结果。
  2. 后端推理服务的伸缩必须与实际的任务负载(即待处理的文档数量)直接挂钩,而不是CPU/内存这种间接指标。
  3. 在没有任务时,推理服务的资源占用应降至最低,甚至为零,以控制成本。

技术选型决策很快就清晰了:Azure AKS作为我们的容器编排平台,Azure Service Bus作为消息队列实现解耦,而关键的弹性伸缩则交给了KEDA(Kubernetes Event-driven Autoscaling)。

架构设计与基础设施准备

整个工作流被设计成异步模式。

graph TD
    subgraph PWA on Static Web App
        A[用户界面] --> B{上传文档并提交};
    end

    B --> C[API Gateway];

    subgraph Azure AKS Cluster
        C --> D[任务受理服务 Pod];
        D -- 1. 将文档存入Blob Storage, 2. 推送处理消息 --> E((Azure Service Bus Queue));

        subgraph KEDA Control Loop
            F[KEDA Operator] -- 持续监控队列长度 --> E;
            F -- 动态调整副本数 --> G[推理服务 Deployment];
        end

        G -- 从队列拉取消息 --> E;
        G -- 3. 从Blob下载文档, 4. 执行模型推理 --> H[ML 模型];
        H -- 5. 将结果写入 Cosmos DB --> I[(Azure Cosmos DB)];
    end

    A -.-> J[状态轮询API];
    J -- 读取处理状态与结果 --> I;
    C --> J;

在真实项目中,我们使用Terraform来管理所有Azure资源,确保环境的一致性和可重复性。以下是关键资源的简化版定义,重点在于资源间的关联,例如为AKS集群启用工作负载身份(Workload Identity)以安全地访问其他Azure服务。

# main.tf

provider "azurerm" {
  features {}
}

resource "azurerm_resource_group" "rg" {
  name     = "mlops-pwa-rg"
  location = "East US"
}

# Azure Service Bus for decoupling
resource "azurerm_servicebus_namespace" "sbus" {
  name                = "mlops-sbus-ns-unique"
  location            = azurerm_resource_group.rg.location
  resource_group_name = azurerm_resource_group.rg.name
  sku                 = "Standard"
}

resource "azurerm_servicebus_queue" "task_queue" {
  name                = "inference-tasks"
  namespace_id        = azurerm_servicebus_namespace.sbus.id
  enable_partitioning = true # For higher throughput
}

# Azure Kubernetes Service
resource "azurerm_kubernetes_cluster" "aks" {
  name                = "mlops-aks-cluster"
  location            = azurerm_resource_group.rg.location
  resource_group_name = azurerm_resource_group.rg.name
  dns_prefix          = "mlopsaks"
  default_node_pool {
    name       = "default"
    node_count = 1
    vm_size    = "Standard_DS2_v2"
  }
  identity {
    type = "SystemAssigned"
  }
  oidc_issuer_enabled = true # Required for Workload Identity
  workload_identity_enabled = true
}

# Install KEDA using Helm provider
resource "helm_release" "keda" {
  name       = "keda"
  repository = "https://kedacore.github.io/charts"
  chart      = "keda"
  namespace  = "keda"
  create_namespace = true
  version    = "2.10.0" # Use a specific version

  depends_on = [
    azurerm_kubernetes_cluster.aks
  ]
}

这里的核心是为AKS启用了oidc_issuer_enabledworkload_identity_enabled。这允许我们摆脱在Kubernetes Secret中存储Azure服务连接字符串的古老方式,转而使用更安全的联合身份凭证。

推理服务的容器化与实现

推理服务是整个系统的核心工作负载。它是一个Python应用,使用fastapi作为web框架(尽管在这个worker模式下我们不直接暴露HTTP端口),并集成azure-servicebusazure-identity SDK。

Dockerfile:
一个生产级的Dockerfile必须考虑镜像大小、构建效率和安全性。我们使用多阶段构建。

# Dockerfile for inference-worker

# --- Build Stage ---
FROM python:3.9-slim as builder

WORKDIR /app

# Install build dependencies
RUN pip install --no-cache-dir poetry

# Copy only files needed for dependency installation
COPY poetry.lock pyproject.toml ./

# Install dependencies
RUN poetry config virtualenvs.create false && \
    poetry install --no-dev --no-interaction --no-ansi

# --- Final Stage ---
FROM python:3.9-slim

WORKDIR /app

# Non-root user for security
RUN groupadd --gid 1001 appuser && \
    useradd --uid 1001 --gid 1001 -ms /bin/bash appuser

# Copy virtual env from builder stage
COPY --from=builder /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application code
COPY ./src ./src

# Set ownership
RUN chown -R appuser:appuser /app
USER appuser

# Command to run
CMD ["python", "src/main.py"]

这种方式将构建环境和最终的运行时环境分离,最终镜像非常小且不包含不必要的构建工具。

Python Worker核心逻辑 (src/main.py):
这段代码的健壮性至关重要。它需要处理优雅退出、消息处理失败、连接重试等问题。

# src/main.py

import os
import logging
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.identity.aio import ManagedIdentityCredential
from azure.servicebus.exceptions import ServiceBusError
from azure.servicebus.aio.receiver import ServiceBusReceiver

# --- Configuration ---
# Using Managed Identity - no secrets in code
FULLY_QUALIFIED_NAMESPACE = os.environ.get("SERVICEBUS_NAMESPACE")
QUEUE_NAME = os.environ.get("SERVICEBUS_QUEUE_NAME")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID") # For workload identity

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def process_message(receiver: ServiceBusReceiver):
    """Continuously receive and process messages."""
    async with receiver:
        while True:
            try:
                messages = await receiver.receive_messages(max_message_count=1, max_wait_time=30)
                if not messages:
                    logger.info("No messages received in the last 30s. Still listening.")
                    continue

                for msg in messages:
                    try:
                        message_body = str(msg)
                        logger.info(f"Received message: {message_body}")
                        
                        # TODO:
                        # 1. Parse message body (e.g., JSON with blob storage path)
                        # 2. Download document from blob storage
                        # 3. Perform ML inference (this is the long-running part)
                        await asyncio.sleep(45) # Simulating a 45-second inference task
                        # 4. Write results to Cosmos DB
                        
                        logger.info("Processing complete. Completing the message.")
                        await receiver.complete_message(msg)

                    except Exception as e:
                        logger.error(f"Error processing message {msg.message_id}: {e}", exc_info=True)
                        # Abandon the message to allow another worker to retry
                        await receiver.abandon_message(msg)
            
            except ServiceBusError as sbe:
                logger.error(f"Service Bus connection error: {sbe}. Reconnecting...")
                await asyncio.sleep(10) # Wait before retrying
            except Exception as e:
                logger.error(f"An unexpected error occurred in the receiver loop: {e}", exc_info=True)
                # In a real scenario, this might trigger a pod restart
                break

async def main():
    """Main function to setup and run the service bus listener."""
    logger.info("Starting inference worker...")
    
    # Use workload identity via ManagedIdentityCredential
    credential = ManagedIdentityCredential(client_id=AZURE_CLIENT_ID)
    
    servicebus_client = ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True
    )
    
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
    
    try:
        await process_message(receiver)
    finally:
        logger.info("Closing service bus client.")
        await servicebus_client.close()
        await credential.close()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Worker shutting down gracefully.")

注意这里的错误处理。abandon_message会让消息重新回到队列,可被其他worker处理,这对于瞬时故障非常有效。如果消息处理多次失败,Service Bus会自动将其移入死信队列(Dead-Letter Queue),防止无限循环。

Kubernetes部署与KEDA配置

现在,我们将推理服务部署到AKS,并用KEDA来驱动它的伸缩。

Deployment (inference-worker-deployment.yaml):

# inference-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-worker
  namespace: mlops
spec:
  replicas: 0 # KEDA will manage this. Starting with 0 is key for cost saving.
  selector:
    matchLabels:
      app: inference-worker
  template:
    metadata:
      labels:
        app: inference-worker
        azure.workload.identity/use: "true" # Enable workload identity
    spec:
      serviceAccountName: inference-worker-sa # Important for workload identity
      containers:
      - name: worker
        image: myacr.azurecr.io/inference-worker:v1.2.0
        env:
        - name: SERVICEBUS_NAMESPACE
          value: "mlops-sbus-ns-unique.servicebus.windows.net"
        - name: SERVICEBUS_QUEUE_NAME
          value: "inference-tasks"
        - name: AZURE_CLIENT_ID
          value: "<your-managed-identity-client-id>"
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "1"
            memory: "2Gi"

有两个关键点:

  1. replicas: 0:我们明确告诉Kubernetes,默认情况下不需要任何Pod。KEDA将全权负责在需要时将其从0扩展到N。
  2. azure.workload.identity/use: "true"serviceAccountName: 这是配置工作负载身份,让Pod能够安全地获取Azure AD令牌以访问Service Bus。

KEDA ScaledObject (keda-scaler.yaml):
这是整个架构的魔法所在。我们定义一个ScaledObject,它告诉KEDA要监控什么、如何伸缩。

# keda-scaler.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: inference-worker-scaler
  namespace: mlops
spec:
  scaleTargetRef:
    name: inference-worker # The deployment to scale
  pollingInterval: 30  # How often to check the queue (seconds)
  cooldownPeriod:  300 # Wait 5 minutes after the last trigger before scaling down to 0
  minReplicaCount: 0   # Minimum replicas, can be 0
  maxReplicaCount: 20  # Maximum replicas to prevent runaway costs
  triggers:
  - type: azure-servicebus
    metadata:
      # Required
      queueName: inference-tasks
      # Connection authentication
      namespace: mlops-sbus-ns-unique
      # The number of messages in the queue that will trigger scaling up one pod.
      messageCount: "5" 
    authenticationRef:
      name: keda-trigger-auth-workload-identity
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-workload-identity
  namespace: mlops
spec:
  workloadIdentity:
    # The service account that has federated identity credential with Azure AD
    podIdentity:
      provider: azure

ScaledObject的配置非常关键。这里的messageCount: "5"意味着队列中每出现5条消息,KEDA就会尝试增加一个Pod,直到达到maxReplicaCount。这个值需要根据每个任务的平均处理时间和资源消耗来仔细调整。如果设置得太低(比如1),可能会导致Pod频繁地启停;如果太高,则任务的平均等待时间会增加。cooldownPeriod同样重要,它防止了在任务处理的间隙期Pod被过早地缩减为零。

PWA前端的异步交互逻辑

前端的体验是这次架构重构的初衷。PWA需要实现一个非阻塞的提交和状态查询流程。

核心JavaScript逻辑 (apiService.js):

// apiService.js - Simplified example

const API_BASE_URL = '/api'; // Assuming a proxy setup

/**
 * Submits a document for processing.
 * This is a fire-and-forget operation from the user's perspective.
 * @param {File} file The document to upload.
 * @returns {Promise<{jobId: string}>} The job ID for status tracking.
 */
async function submitInferenceJob(file) {
  const formData = new FormData();
  formData.append('document', file);

  try {
    const response = await fetch(`${API_BASE_URL}/jobs`, {
      method: 'POST',
      body: formData,
    });

    if (!response.ok) {
      const errorData = await response.json();
      throw new Error(errorData.message || 'Failed to submit job.');
    }

    return await response.json(); // e.g., { jobId: "some-uuid-v4" }
  } catch (error) {
    console.error('Job submission failed:', error);
    throw error;
  }
}

/**
 * Polls for the status of a specific job.
 * @param {string} jobId The ID of the job to check.
 * @returns {Promise<any>} The job status and result if completed.
 */
async function getJobStatus(jobId) {
  try {
    const response = await fetch(`${API_BASE_URL}/jobs/${jobId}`);
    
    if (!response.ok) {
        // Handle 404 for not-yet-created results, etc.
        return { status: 'PENDING' };
    }
    
    return await response.json(); // e.g., { status: 'COMPLETED', result: {...} }
  } catch (error) {
    console.error(`Failed to get status for job ${jobId}:`, error);
    return { status: 'UNKNOWN', error: error.message };
  }
}

// UI component logic would use these functions
function handleFileUpload(file) {
    const statusElement = document.getElementById('status');
    statusElement.textContent = 'Submitting...';

    submitInferenceJob(file)
        .then(({ jobId }) => {
            statusElement.textContent = `Job submitted with ID: ${jobId}. Waiting for results...`;
            pollForResult(jobId);
        })
        .catch(error => {
            statusElement.textContent = `Error: ${error.message}`;
        });
}

function pollForResult(jobId, interval = 5000) {
    const intervalId = setInterval(async () => {
        const data = await getJobStatus(jobId);
        if (data.status === 'COMPLETED' || data.status === 'FAILED') {
            clearInterval(intervalId);
            // Update UI with the final result or error
            document.getElementById('result').textContent = JSON.stringify(data.result || data.error, null, 2);
        }
    }, interval);
}

这段代码展示了经典的异步工作流:submitInferenceJob快速返回一个jobId,然后UI启动一个轮询器pollForResult在后台检查作业状态。即使用户关闭了浏览器标签页,后端的处理也不会中断。PWA的Service Worker甚至可以进一步增强体验,例如在应用后台运行时继续轮询,并在任务完成时通过Web Notifications API通知用户。

遗留问题与未来迭代路径

这个架构成功解决了最初的性能和伸缩性问题,实现了基于实际负载的、成本高效的MLOps推理服务。但它并非完美无缺。

首先,客户端轮询是一种简单但低效的机制。当任务处理时间非常长或用户量巨大时,大量的轮询请求会给状态查询API带来不必要的压力。一个更优的方案是引入WebSocket或Azure SignalR服务。任务处理完成后,推理worker可以触发一个事件,通过SignalR将结果直接推送给对应的PWA客户端,实现真正的实时更新。

其次,当前的死信队列处理是被动的。在生产环境中,我们需要为死信队列设置监控和告警。一个常见的模式是创建一个Azure Function,由死信队列中的消息触发,它会记录失败详情、通知开发团队,甚至根据错误类型尝试执行自动化的补偿逻辑。

最后,这个架构只解决了模型部署和服务的环节。一个完整的MLOps流程还应包括模型训练、版本控制、自动化再训练和A/B测试。未来的迭代方向是将这个KEDA驱动的推理服务与Azure Machine Learning的流水线集成,当新版本的模型在模型注册表中注册后,可以自动触发CI/CD流程,构建新的容器镜像并安全地滚动更新到AKS上的inference-worker部署。


  目录