基于 ZooKeeper Watcher 与 Valtio Proxy 实现的动态功能开关实时同步架构


一个功能开关 (Feature Flag) 系统,其复杂性不在于存储键值对,而在于状态变更的实时、可靠分发。当后端开关状态变更后,如何以最小的延迟、最低的资源消耗,将这一变更精确地反映到成千上万个前端客户端的UI上,是衡量其架构是否健壮的关键。

方案A:HTTP轮询的困境

最直接的思路是客户端通过HTTP定时轮询。例如,每30秒请求一次 /api/feature-flags 接口。

// 前端轮询伪代码
setInterval(async () => {
  const response = await fetch('/api/feature-flags');
  const flags = await response.json();
  updateUI(flags);
}, 30000);

这个方案在真实项目中很快就会暴露其固有的缺陷:

  1. 延迟不可控: 30秒的轮询周期意味着用户最多需要等待30秒才能看到UI变化。缩短周期会急剧增加服务端压力。
  2. 资源浪费: 绝大多数请求都是无效的,因为开关状态通常不会频繁变更。这在服务端和客户端都造成了不必要的CPU和网络开销。
  3. 无状态下的冲击: 每次部署或服务重启,所有客户端的轮询计时器被重置,可能在同一时间点形成请求风暴,冲击后端服务。

方案B:通用消息队列的权衡

引入消息队列(如Redis Pub/Sub, RabbitMQ)是常见的改进方案。后端服务在变更开关后,向一个Topic或Exchange发布消息,前端通过WebSocket连接到后端,后端服务再将消息推送给所有连接的客户端。

此方案解决了实时性问题,但引入了新的架构复杂性:

  1. 增加了中间件依赖: 整个系统的可用性现在依赖于消息队列的稳定性。
  2. 消息风暴: 如果开关变更频繁(例如在A/B测试场景中),消息总线可能会成为瓶颈。
  3. 连接管理: 后端需要维护一个从WebSocket会话到用户的映射,确保消息被准确推送,并处理大量长连接的生命周期管理。

最终选择:ZooKeeper Watcher + WebSocket直连

考虑到功能开关的本质是分布式系统中的配置协调问题,ZooKeeper(ZK)便成为了一个极具吸引力的选项。ZK不仅仅是一个存储,它是一个协调服务,其核心的Watcher机制正是为这类场景设计的。

我们的最终架构决策是:

  • 状态存储与通知: 使用ZooKeeper存储功能开关的状态。利用其持久化的Watcher机制,当节点数据发生变化时,ZK会主动通知所有监听该节点的客户端。
  • 后端桥梁: NestJS应用作为ZK的一个客户端,在启动时连接ZK并注册一个持久化的Watcher。当收到ZK的变更通知时,通过其内置的WebSocket网关,将最新的开关状态广播给所有连接的前端客户端。
  • 前端响应: 前端使用WebSocket接收来自动态。利用轻量级的状态管理库Valtio,通过其基于Proxy的机制,实现UI的自动、无感更新。

这个架构的优势在于:

  • 单一数据源: ZK是唯一可信的数据源,避免了数据不一致。
  • 事件驱动: 整个链路是事件驱动而非轮询,高效且实时。
  • 简化架构: 移除了专用的消息队列,NestJS应用直接扮演了从ZK到WebSocket的桥梁角色,降低了运维复杂度。
graph TD
    subgraph "管理后台/运维"
        A[Admin UI / CLI] -- 1. 更新ZNode数据 --> ZK
    end

    subgraph "ZooKeeper集群"
        ZK[("/feature_flags/app_name/flag_a: true")]
    end

    subgraph "后端服务 (NestJS)"
        B(ZookeeperService) -- 2. 注册持久Watcher --> ZK
        ZK -- 3. ZNode变更通知 --> B
        B -- 4. 触发内部事件 --> C(FeatureFlagGateway)
    end

    subgraph "浏览器客户端"
        C -- 5. WebSocket广播 --> D{WebSocket Client}
        D -- 6. 接收消息 --> E[Valtio Store]
        E -- 7. 自动更新Proxy状态 --> F[React Component]
        F -- 8. UI无感知重新渲染 --> G[用户界面]
    end

    style ZK fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px

核心实现概览

1. NestJS后端:ZooKeeper连接与事件桥梁

首先,我们需要一个健壮的ZookeeperService来处理连接、重连以及Watcher的注册。

zookeeper.service.ts:

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import * as ZooKeeper from 'node-zookeeper-client';
import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class ZookeeperService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(ZookeeperService.name);
  private client: ZooKeeper.Client;
  private readonly ZK_HOSTS = process.env.ZK_HOSTS || 'localhost:2181';
  private readonly FLAGS_PATH = '/feature_flags/my_app'; // 功能开关的根ZNode
  public currentFlags: Record<string, boolean> = {};

  constructor(private eventEmitter: EventEmitter2) {}

  onModuleInit() {
    this.connect();
  }

  onModuleDestroy() {
    if (this.client) {
      this.client.close();
      this.logger.log('ZooKeeper client closed.');
    }
  }

  private connect() {
    this.client = ZooKeeper.createClient(this.ZK_HOSTS, {
      sessionTimeout: 5000,
      retries: 2,
    });

    this.client.once('connected', () => {
      this.logger.log('Connected to ZooKeeper.');
      this.ensurePathExists(this.FLAGS_PATH)
        .then(() => this.watchFeatureFlags())
        .catch(error => this.logger.error('Failed to setup watchers', error));
    });

    this.client.on('disconnected', () => {
      this.logger.warn('Disconnected from ZooKeeper. Attempting to reconnect...');
      // node-zookeeper-client库内部有重连逻辑, 这里只做日志记录
    });
    
    this.client.on('expired', () => {
        this.logger.error('ZooKeeper session expired. Reconnecting...');
        // session过期必须重新创建客户端并连接
        this.client.close();
        this.connect();
    });

    this.client.connect();
  }

  private async ensurePathExists(path: string): Promise<void> {
    try {
      const pathParts = path.split('/').filter(p => p);
      let currentPath = '';
      for (const part of pathParts) {
        currentPath = `${currentPath}/${part}`;
        const exists = await new Promise<boolean>((resolve, reject) => {
          this.client.exists(currentPath, (error, stat) => {
            if (error) return reject(error);
            resolve(!!stat);
          });
        });

        if (!exists) {
          await new Promise<void>((resolve, reject) => {
            this.client.create(currentPath, null, ZooKeeper.ACL.OPEN_ACL_UNSAFE, ZooKeeper.CreateMode.PERSISTENT, (error) => {
              if (error && error.getCode() !== ZooKeeper.Exception.NODE_EXISTS) {
                 return reject(error);
              }
              resolve();
            });
          });
          this.logger.log(`Created ZNode path: ${currentPath}`);
        }
      }
    } catch (error) {
        this.logger.error(`Failed to ensure path ${path} exists.`, error.stack);
        throw error;
    }
  }

  // 这里的核心是使用getData和watcher来获取数据并设置监听
  private async watchFeatureFlags() {
    this.logger.log(`Setting up watcher on path: ${this.FLAGS_PATH}`);
    // 使用getData而不是getChildren,因为我们需要根节点本身的变更通知
    // watcher会监听节点数据变更、删除和子节点创建/删除
    const watcher = (event: ZooKeeper.Event) => {
      this.logger.log(`Watcher triggered for event: ${event.getName()} on path ${event.getPath()}`);
      // 只要有任何变化,就重新获取所有子节点,这是最简单的策略
      // 生产环境中,可以根据事件类型做更精细化的处理
      this.fetchAllFlags().catch(err => this.logger.error('Error refetching flags after watch event', err));
    };
    
    // 首次拉取并设置watcher
    this.client.getChildren(this.FLAGS_PATH, watcher, (error, children) => {
        if(error) {
            this.logger.error(`Failed to list children for ${this.FLAGS_PATH}`, error.stack);
            return;
        }
        this.updateFlagsFromChildren(children);
    });
  }
  
  private async fetchAllFlags() {
      const children = await new Promise<string[]>((resolve, reject) => {
          this.client.getChildren(this.FLAGS_PATH, (error, children) => {
              if(error) return reject(error);
              resolve(children);
          })
      });
      this.updateFlagsFromChildren(children);
  }

  private async updateFlagsFromChildren(children: string[]) {
    const flags: Record<string, boolean> = {};
    for (const child of children) {
      const childPath = `${this.FLAGS_PATH}/${child}`;
      try {
        const data = await new Promise<Buffer | null>((resolve, reject) => {
          this.client.getData(childPath, (error, data) => {
            if (error) {
                // 如果节点在获取期间被删除,这是正常现象,忽略即可
                if (error.getCode() === ZooKeeper.Exception.NO_NODE) {
                    return resolve(null);
                }
                return reject(error);
            }
            resolve(data);
          });
        });
        if (data) {
            // 我们约定,值为'true'表示开启,其他任何值(包括空)都为关闭
            flags[child] = data.toString('utf8') === 'true';
        }
      } catch (error) {
        this.logger.error(`Failed to get data for flag: ${childPath}`, error.stack);
      }
    }
    
    this.currentFlags = flags;
    this.logger.log('Feature flags updated', JSON.stringify(this.currentFlags));
    // 发射内部事件,解耦ZookeeperService和WebSocket Gateway
    this.eventEmitter.emit('feature.flags.updated', this.currentFlags);
  }

  // 提供给管理API使用的方法
  async setFlag(name: string, value: boolean): Promise<void> {
    const path = `${this.FLAGS_PATH}/${name}`;
    const data = Buffer.from(value.toString());
    
    const exists = await new Promise<boolean>((resolve, reject) => {
        this.client.exists(path, (error, stat) => {
            if(error) return reject(error);
            resolve(!!stat);
        });
    });

    if(exists) {
        await new Promise<void>((resolve, reject) => {
            this.client.setData(path, data, -1, (error) => {
                if(error) return reject(error);
                resolve();
            });
        });
    } else {
        await new Promise<void>((resolve, reject) => {
            this.client.create(path, data, ZooKeeper.ACL.OPEN_ACL_UNSAFE, ZooKeeper.CreateMode.PERSISTENT, (error) => {
                if(error) return reject(error);
                resolve();
            });
        });
    }
    this.logger.log(`Set flag '${name}' to '${value}'`);
  }
}

接下来是FeatureFlagGateway,它负责监听内部事件并广播给所有连接的客户端。

feature-flag.gateway.ts:

import { WebSocketGateway, WebSocketServer, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ZookeeperService } from './zookeeper.service';

@WebSocketGateway({ cors: { origin: '*' }, namespace: '/flags' })
export class FeatureFlagGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger(FeatureFlagGateway.name);

  constructor(private readonly zookeeperService: ZookeeperService) {}

  handleConnection(client: Socket) {
    this.logger.log(`Client connected: ${client.id}`);
    // 新客户端连接时,立即发送当前的全部开关状态
    client.emit('flags_init', this.zookeeperService.currentFlags);
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
  }
  
  // 监听由ZookeeperService发出的事件
  @OnEvent('feature.flags.updated')
  handleFlagsUpdated(payload: Record<string, boolean>) {
    this.logger.log('Broadcasting flag updates to all clients');
    // 向所有客户端广播最新的开关状态
    this.server.emit('flags_updated', payload);
  }
}

2. 前端:Valtio与WebSocket的无缝集成

前端的实现出奇地简单,这正是Valtio的魅力所在。

stores/featureFlagStore.ts:

import { proxy } from 'valtio';

interface FeatureFlagState {
  isInitialized: boolean;
  flags: Record<string, boolean>;
}

// 创建一个proxy store
export const featureFlagStore = proxy<FeatureFlagState>({
  isInitialized: false,
  flags: {},
});

// 在这里初始化WebSocket连接和服务
// 这段代码应该在应用启动时(如main.tsx或App.tsx)执行一次
import { io } from 'socket.io-client';

const socket = io('http://localhost:3001/flags', {
    transports: ['websocket'],
    reconnectionAttempts: 5,
    reconnectionDelay: 1000,
});

socket.on('connect', () => {
  console.log('Connected to Feature Flag WebSocket server');
});

// 接收初始全量数据
socket.on('flags_init', (initialFlags: Record<string, boolean>) => {
  console.log('Received initial flags:', initialFlags);
  // 直接修改proxy state,所有使用它的组件都会自动更新
  featureFlagStore.flags = initialFlags;
  featureFlagStore.isInitialized = true;
});

// 接收增量/全量更新
socket.on('flags_updated', (updatedFlags: Record<string, boolean>) => {
  console.log('Received flag updates:', updatedFlags);
  featureFlagStore.flags = updatedFlags;
});

socket.on('disconnect', () => {
    console.warn('Disconnected from Feature Flag WebSocket server');
});

socket.on('connect_error', (err) => {
    console.error('Connection error with Feature Flag WebSocket server', err);
});

在React组件中使用它:

components/NewFeatureButton.tsx:

import React from 'react';
import { useSnapshot } from 'valtio';
import { featureFlagStore } from '../stores/featureFlagStore';

export const NewFeatureButton: React.FC = () => {
  // useSnapshot会订阅store的变化
  // 当featureFlagStore.flags.enableNewCheckoutButton变化时,组件自动重渲染
  const snap = useSnapshot(featureFlagStore);

  // 在初始化完成前,可以显示一个加载状态或什么都不显示
  if (!snap.isInitialized) {
    return null; // or <Spinner />
  }

  // 根据flag决定是否渲染新功能
  if (snap.flags.enableNewCheckoutButton) {
    return <button style={{ backgroundColor: 'green', color: 'white' }}>New Checkout</button>;
  }

  return <button>Old Checkout</button>;
};

Valtio的Proxy机制使得状态更新代码极其简洁。我们不需要写reducers, actions, 或者复杂的selectors。当featureFlagStore.flags被WebSocket回调修改时,useSnapshot会自动侦测到变化并触发组件的重新渲染。

3. BDD测试:端到端行为验证

这个实时系统的测试是一个挑战。单元测试可以验证各模块的逻辑,但无法保证整个数据流的正确性。BDD(行为驱动开发)结合端到端测试工具(如Playwright)是验证这种异步、跨系统行为的理想选择。

feature-flags.feature:

Feature: Real-time Feature Flag Synchronization

  Scenario: UI updates immediately when a feature flag is enabled
    Given the user is on the main application page
    And the "enableNewDashboard" feature flag is initially disabled in ZooKeeper
    When an administrator enables the "enableNewDashboard" feature flag via an API call
    Then the user should see the "New Dashboard" component rendered on the page within 2 seconds
    And the old dashboard component should no longer be visible

测试步骤定义的实现 (使用Jest和Playwright):

import { Given, When, Then, Before, After } from '@cucumber/cucumber';
import { chromium, Browser, Page, expect } from '@playwright/test';
import * as ZooKeeper from 'node-zookeeper-client';

let browser: Browser;
let page: Page;
let zkClient: ZooKeeper.Client;
const ZK_HOSTS = 'localhost:2181';
const FLAG_PATH = '/feature_flags/my_app/enableNewDashboard';

Before(async () => {
  browser = await chromium.launch();
  page = await browser.newPage();
  zkClient = ZooKeeper.createClient(ZK_HOSTS);
  await new Promise<void>(resolve => zkClient.once('connected', resolve));
});

After(async () => {
  await browser.close();
  zkClient.close();
});

Given('the user is on the main application page', async () => {
  await page.goto('http://localhost:3000');
});

Given('the "enableNewDashboard" feature flag is initially disabled in ZooKeeper', async () => {
  // 确保测试开始前状态是干净的
  const exists = await new Promise<boolean>(resolve => zkClient.exists(FLAG_PATH, (e,s) => resolve(!!s)));
  if (exists) {
      await new Promise<void>(resolve => zkClient.setData(FLAG_PATH, Buffer.from('false'), -1, () => resolve()));
  } else {
      await new Promise<void>(resolve => zkClient.create(FLAG_PATH, Buffer.from('false'), ZooKeeper.ACL.OPEN_ACL_UNSAFE, ZooKeeper.CreateMode.PERSISTENT, () => resolve()));
  }
  // 等待UI刷新到初始状态
  await page.waitForSelector('#old-dashboard');
});

When('an administrator enables the "enableNewDashboard" feature flag via an API call', async () => {
  // 在真实测试中,这里会调用一个内部的API来修改ZK
  // 为了简化,我们直接操作ZK来模拟后端变更
  await new Promise<void>((resolve, reject) => {
    zkClient.setData(FLAG_PATH, Buffer.from('true'), -1, (error) => {
      if (error) return reject(error);
      resolve();
    });
  });
});

Then('the user should see the "New Dashboard" component rendered on the page within 2 seconds', async () => {
  // Playwright的waitForSelector会自动等待元素出现
  const newDashboard = page.locator('#new-dashboard');
  await expect(newDashboard).toBeVisible({ timeout: 2000 });
});

Then('the old dashboard component should no longer be visible', async () => {
  const oldDashboard = page.locator('#old-dashboard');
  await expect(oldDashboard).not.toBeVisible();
});

这个BDD测试覆盖了从数据源(ZooKeeper)变更到最终用户界面(DOM)响应的整个链路,为这个复杂的实时系统提供了强大的信心。

架构的局限性与未来迭代

此架构并非没有缺点。其主要瓶颈在于NestJS应用层。所有前端客户端都连接到此层,如果客户端数量达到数十万级别,单体NestJS应用会成为瓶颈。此时,需要水平扩展NestJS节点,并引入一个WebSocket代理(如Nginx或专门的网关)来分发连接。

另一个考量是数据广播的粒度。当前实现是当任何一个flag变化时,都将全量flags广播给所有客户端。在flag数量巨大且变更频繁的场景下,这会产生不必要的网络流量。未来的优化可以是在后端计算diff,只向客户端推送变更的部分,但这会显著增加服务端的实现复杂度。

最后,ZooKeeper本身是一个为强一致性设计的系统,对于功能开关这种可以容忍短暂最终一致性的场景,或许etcd或Consul等其他协调服务也是值得考虑的替代品,它们在某些方面可能提供更好的性能或更简单的运维体验。


  目录