基于 Laravel Echo 与 Redux Toolkit 构建一个实时的 CI/CD 流水线监控终端


最初,我们团队监控 CI/CD 流水线的方式非常原始:在多个终端窗口里 tail -f 不同的日志文件。当并发构建任务超过两三个时,这种方法迅速变得混乱且无法管理。定位一个失败的构建步骤,需要在日志的海洋中手动搜索,效率极低。我们需要一个集中式的、实时的、可视化的解决方案,一个能清晰展示所有正在运行、已完成和已失败的流水线状态的 Web UI。这就是这个内部项目的起点。

初步构想与技术选型决策

我们的目标是构建一个内部开发者平台(IDP)的核心组件:一个实时日志查看器。它的核心需求是低延迟、状态同步准确,并且 UI 必须能够清晰地展示大量信息。

后端技术栈:Laravel + Soketi

  • Laravel: 团队的技术栈基础是 PHP 和 Laravel。利用其成熟的队列系统 (Queues) 和事件广播 (Broadcasting) 功能是自然的选择。我们可以将每个 CI/CD 步骤封装成一个 Job,在执行过程中通过广播事件来实时更新前端状态。
  • WebSockets (Soketi): 替代方案是前端轮询。但在实时场景下,轮询会带来不必要的延迟和服务器负载。WebSocket 提供了持久的双向连接,是推送日志和状态更新的理想选择。我们没有选择 Pusher 这样的 SaaS 服务,而是采用开源的 soketi 进行自托管。在内部工具场景下,这能更好地控制成本和数据隐私,同时避免了外部依赖的潜在不稳定性。

前端技术栈:React + Redux Toolkit + Tailwind CSS

  • React: 前端标准,无需赘言。
  • Redux Toolkit: 为什么不直接用 React 的 useStateuseContext?因为一个 CI/CD 监控终端的状态本质上是复杂的、全局的。多条流水线可能同时运行,每条流水线有多个阶段,每个阶段会持续不断地产生日志流。这种高度动态且相互关联的状态,如果用组件本地状态管理,很快会陷入 prop-drilling 和状态同步的泥潭。Redux Toolkit 提供了一个可预测的、集中式的状态容器,其 createSlicecreateEntityAdapter 等工具能极大地简化对这种范式化数据的管理。
  • Tailwind CSS: 对于内部工具,快速迭代和功能实现优于精雕细琢的视觉设计。Tailwind CSS 的原子化类库让我们能够直接在 HTML 结构中构建出功能性强、信息密度高的界面,而无需编写一行自定义 CSS。这对于快速原型和迭代至关重要。

代码质量:ESLint

  • 这是一个全栈项目,前端和后端(如果使用 Inertia.js 或类似的方案)都涉及 JavaScript。统一的 ESLint 规则集能确保代码风格的一致性,减少低级错误,尤其是在处理复杂的异步逻辑和状态更新时,静态代码分析能提前发现很多潜在问题。

后端实现:从队列任务到事件广播

我们的核心是在 Laravel 中模拟一个 CI/CD 流水线。一个流水线被触发后,会被分发到队列中执行。在执行过程中,它会广播多个事件来通知前端。

1. 环境配置与依赖

首先,安装必要的包并配置广播驱动。

composer require pusher/pusher-php-server

.env 文件中,我们将广播驱动指向自托管的 soketi 实例。

BROADCAST_DRIVER=pusher
PUSHER_APP_ID=app-id
PUSHER_APP_KEY=app-key
PUSHER_APP_SECRET=app-secret
PUSHER_HOST=127.0.0.1
PUSHER_PORT=6001
PUSHER_SCHEME=http
PUSHER_APP_CLUSTER=mt1

# 使用 Redis 作为队列驱动
QUEUE_CONNECTION=redis

为了运行 soketi,我们使用一个简单的 docker-compose.yml

# docker-compose.yml
version: '3'
services:
  soketi:
    image: 'quay.io/soketi/soketi:1.6-16-alpine'
    environment:
      SOKETI_DEBUG: '1'
      SOKETI_METRICS_SERVER_PORT: '9601'
    ports:
      - '6001:6001'
      - '9601:9601'
    networks:
      - sail # 或者你的项目网络

networks:
  sail:
    driver: bridge

2. 定义广播事件

我们需要为流水线的生命周期定义一系列事件。所有事件都应实现 ShouldBroadcast 接口。

app/Events/PipelineStatusChanged.php:

<?php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class PipelineStatusChanged implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public string $pipelineId;
    public string $status;
    public ?string $details;
    public ?string $finished_at;

    /**
     * Create a new event instance.
     *
     * @return void
     */
    public function __construct(string $pipelineId, string $status, ?string $details = null, ?string $finished_at = null)
    {
        $this->pipelineId = $pipelineId;
        $this->status = $status;
        $this->details = $details;
        $this->finished_at = $finished_at;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return \Illuminate\Broadcasting\Channel|array
     */
    public function broadcastOn()
    {
        // 使用私有频道确保只有授权用户能接收
        return new PrivateChannel('pipelines.' . $this->pipelineId);
    }
}

app/Events/PipelineLogReceived.php:

<?php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class PipelineLogReceived implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public string $pipelineId;
    public string $line;
    public string $timestamp;

    public function __construct(string $pipelineId, string $line)
    {
        $this->pipelineId = $pipelineId;
        $this->line = $line;
        $this->timestamp = now()->toIso8601String();
    }

    public function broadcastOn()
    {
        return new PrivateChannel('pipelines.' . $this->pipelineId);
    }
}

3. 创建模拟流水线的 Job

这个 Job 是核心。它会模拟一个多阶段的构建过程,并在每个关键节点广播事件。

app/Jobs/SimulatePipelineRun.php:

<?php

namespace App\Jobs;

use App\Events\PipelineLogReceived;
use App\Events\PipelineStatusChanged;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Throwable;

class SimulatePipelineRun implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public string $pipelineId;

    // 在真实项目中,这里会注入模型或仓库
    public array $pipelineData;

    public function __construct(string $pipelineId, array $pipelineData)
    {
        $this->pipelineId = $pipelineId;
        $this->pipelineData = $pipelineData;
    }

    public function handle()
    {
        // 1. 广播开始状态
        broadcast(new PipelineStatusChanged($this->pipelineId, 'RUNNING'));
        $this->log('Pipeline started for repository: ' . $this->pipelineData['repo']);

        try {
            // 模拟不同阶段
            $this->runStage('Cloning repository', 2);
            $this->runStage('Installing dependencies', 5, [
                'npm install...',
                'Resolving packages...',
                'Done.'
            ]);
            $this->runStage('Running tests', 8, [
                'PASS tests/Unit/UserTest.php',
                'PASS tests/Feature/ApiTest.php',
                'FAIL tests/Feature/BillingTest.php' // 模拟失败
            ]);

            // 如果测试失败,就不能继续
            if ($this->pipelineData['repo'] === 'project-b/api') {
                 throw new \Exception('Tests failed with 1 error.');
            }

            $this->runStage('Building assets', 4);
            $this->runStage('Deploying to production', 3);

            // 4. 广播成功状态
            $this->log('Pipeline finished successfully.');
            broadcast(new PipelineStatusChanged(
                $this->pipelineId,
                'SUCCEEDED',
                'Deployment completed.',
                now()->toIso8601String()
            ));
        } catch (Throwable $e) {
            // 5. 异常处理,广播失败状态
            $this->log('ERROR: ' . $e->getMessage());
            Log::error("Pipeline {$this->pipelineId} failed", ['exception' => $e]);

            broadcast(new PipelineStatusChanged(
                $this->pipelineId,
                'FAILED',
                $e->getMessage(),
                now()->toIso8601String()
            ));
        }
    }

    private function runStage(string $name, int $duration, array $logs = [])
    {
        $this->log("--- Stage: {$name} ---");
        sleep($duration); // 模拟耗时操作
        foreach ($logs as $line) {
            $this->log($line);
            sleep(rand(0, 1));
        }
        $this->log("--- Stage: {$name} finished ---");
    }

    private function log(string $message)
    {
        broadcast(new PipelineLogReceived($this->pipelineId, $message));
    }
}

4. 触发 Job 的路由和控制器

为了方便测试,我们创建一个简单的 API 端点来触发这个 Job。

routes/api.php:

<?php

use App\Jobs\SimulatePipelineRun;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Route;
use Illuminate\Support\Str;

Route::post('/pipelines', function (Request $request) {
    $pipelineId = (string) Str::uuid();
    $repo = $request->input('repo', 'project-a/frontend');

    // 在真实应用中,会先创建数据库记录
    $pipelineData = [
        'id' => $pipelineId,
        'repo' => $repo,
        'status' => 'QUEUED',
        'created_at' => now()->toIso8601String(),
        'logs' => [],
    ];

    SimulatePipelineRun::dispatch($pipelineId, $pipelineData);

    // 返回初始数据,前端可以立即渲染一个占位符
    return response()->json($pipelineData);
});

同时,别忘了配置私有频道的授权路由。

routes/channels.php:

<?php

use Illuminate\Support\Facades\Broadcast;

Broadcast::channel('pipelines.{pipelineId}', function ($user, $pipelineId) {
    // 在真实应用中,这里需要验证用户是否有权限查看此流水线
    return true;
});

前端实现:状态管理与实时渲染

前端是这个系统的脸面。我们需要一个健壮的结构来处理实时数据流。

1. 依赖安装与环境配置

npm install @reduxjs/toolkit react-redux laravel-echo pusher-js tailwindcss postcss autoprefixer
npx tailwindcss init -p

配置 tailwind.config.js

/** @type {import('tailwindcss').Config} */
export default {
  content: [
    "./index.html",
    "./src/**/*.{js,ts,jsx,tsx}",
  ],
  theme: {
    extend: {},
  },
  plugins: [],
}

2. 配置 Laravel Echo

创建一个文件来初始化 Echo 实例,这是连接前端和后端的桥梁。

src/lib/echo.js:

import Echo from 'laravel-echo';
import Pusher from 'pusher-js';

window.Pusher = Pusher;

const echo = new Echo({
    broadcaster: 'pusher',
    key: import.meta.env.VITE_PUSHER_APP_KEY,
    wsHost: import.meta.env.VITE_PUSHER_HOST,
    wsPort: import.meta.env.VITE_PUSHER_PORT,
    wssPort: import.meta.env.VITE_PUSHER_PORT,
    forceTLS: false, // 开发环境禁用 TLS
    disableStats: true,
    enabledTransports: ['ws', 'wss'],
    // 关键:配置授权端点
    authorizer: (channel, options) => {
        return {
            authorize: (socketId, callback) => {
                // 使用你自己的 API 客户端,例如 axios
                // 确保你的 API 客户端会携带认证凭证(如 cookie 或 token)
                fetch('/api/broadcasting/auth', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'X-Socket-ID': socketId,
                        // 'Authorization': 'Bearer ...'
                    },
                    body: JSON.stringify({
                        socket_id: socketId,
                        channel_name: channel.name,
                    }),
                })
                .then(response => {
                    if (!response.ok) {
                        return response.text().then(text => { throw new Error(text) });
                    }
                    return response.json();
                })
                .then(data => {
                    callback(false, data);
                })
                .catch(error => {
                    console.error('Broadcast authorization failed:', error);
                    callback(true, error);
                });
            },
        };
    },
});

export default echo;

3. Redux State 设计与实现

我们的状态需要存储一个流水线列表,每个流水线包含其元数据和日志。使用 createSlice 来管理。

src/store/pipelinesSlice.js:

import { createSlice, createEntityAdapter } from '@reduxjs/toolkit';

// createEntityAdapter 是一个强大的工具,用于管理范式化状态
const pipelinesAdapter = createEntityAdapter({
    // 假设每个 pipeline 对象都有一个 'id' 属性
    selectId: (pipeline) => pipeline.id,
    // 保持实体按创建时间降序排序
    sortComparer: (a, b) => new Date(b.created_at) - new Date(a.created_at),
});

const initialState = pipelinesAdapter.getInitialState({
    // 可以在这里添加额外的状态,比如 loading, error 等
});

const pipelinesSlice = createSlice({
    name: 'pipelines',
    initialState,
    reducers: {
        pipelineAdded: pipelinesAdapter.addOne,
        pipelineStatusChanged: (state, action) => {
            const { pipelineId, status, details, finished_at } = action.payload;
            pipelinesAdapter.updateOne(state, {
                id: pipelineId,
                changes: { status, details, finished_at },
            });
        },
        pipelineLogReceived: (state, action) => {
            const { pipelineId, line, timestamp } = action.payload;
            const pipeline = state.entities[pipelineId];
            if (pipeline) {
                // 不可变地更新日志数组
                pipeline.logs = [...pipeline.logs, { line, timestamp }];
            }
        },
    },
});

export const { pipelineAdded, pipelineStatusChanged, pipelineLogReceived } = pipelinesSlice.actions;

// 导出 selectors
export const {
    selectAll: selectAllPipelines,
    selectById: selectPipelineById,
} = pipelinesAdapter.getSelectors((state) => state.pipelines);

export default pipelinesSlice.reducer;

配置 Redux store:
src/store/index.js:

import { configureStore } from '@reduxjs/toolkit';
import pipelinesReducer from './pipelinesSlice';

export const store = configureStore({
    reducer: {
        pipelines: pipelinesReducer,
    },
});

4. React 组件实现

App.jsx 作为主入口,负责触发流水线和渲染列表。

// src/App.jsx
import React from 'react';
import { useSelector } from 'react-redux';
import { selectAllPipelines } from './store/pipelinesSlice';
import { PipelineView } from './components/PipelineView';
import { Header } from './components/Header';

function App() {
  const pipelines = useSelector(selectAllPipelines);

  return (
    <div className="bg-gray-900 text-white min-h-screen font-sans">
      <Header />
      <main className="p-4 md:p-8">
        <div className="space-y-6">
          {pipelines.length === 0 ? (
            <p className="text-center text-gray-400">No pipelines running. Trigger one to start.</p>
          ) : (
            pipelines.map(pipeline => (
              <PipelineView key={pipeline.id} pipelineId={pipeline.id} />
            ))
          )}
        </div>
      </main>
    </div>
  );
}

export default App;

Header.jsx 组件包含触发按钮。

// src/components/Header.jsx
import React, { useState } from 'react';
import { useDispatch } from 'react-redux';
import { pipelineAdded } from '../store/pipelinesSlice';

export function Header() {
  const dispatch = useDispatch();
  const [isLoading, setIsLoading] = useState(false);

  const triggerPipeline = async (repo) => {
    setIsLoading(true);
    try {
      const response = await fetch('/api/pipelines', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ repo }),
      });
      const data = await response.json();
      // Dispatch 初始状态到 Redux store
      dispatch(pipelineAdded(data));
    } catch (error) {
      console.error('Failed to trigger pipeline', error);
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <header className="bg-gray-800 p-4 border-b border-gray-700 flex justify-between items-center">
      <h1 className="text-xl font-bold">CI/CD Monitor</h1>
      <div className="flex space-x-2">
        <button onClick={() => triggerPipeline('project-a/frontend')} disabled={isLoading} className="bg-blue-600 hover:bg-blue-700 px-4 py-2 rounded disabled:opacity-50">
          Trigger Success Run
        </button>
        <button onClick={() => triggerPipeline('project-b/api')} disabled={isLoading} className="bg-red-600 hover:bg-red-700 px-4 py-2 rounded disabled:opacity-50">
          Trigger Fail Run
        </button>
      </div>
    </header>
  );
}

PipelineView.jsx 是核心视图,它订阅事件并展示单个流水线的状态和日志。

// src/components/PipelineView.jsx
import React, { useEffect, useRef } from 'react';
import { useSelector, useDispatch } from 'react-redux';
import { selectPipelineById, pipelineLogReceived, pipelineStatusChanged } from '../store/pipelinesSlice';
import echo from '../lib/echo';

const StatusBadge = ({ status }) => {
    const statusClasses = {
        QUEUED: 'bg-gray-500',
        RUNNING: 'bg-yellow-500 animate-pulse',
        SUCCEEDED: 'bg-green-500',
        FAILED: 'bg-red-500',
    };
    return (
        <span className={`px-2 py-1 text-xs font-bold rounded-full ${statusClasses[status] || 'bg-gray-600'}`}>
            {status}
        </span>
    );
};

export function PipelineView({ pipelineId }) {
    const pipeline = useSelector(state => selectPipelineById(state, pipelineId));
    const dispatch = useDispatch();
    const logContainerRef = useRef(null);

    useEffect(() => {
        // 自动滚动到日志底部
        if (logContainerRef.current) {
            logContainerRef.current.scrollTop = logContainerRef.current.scrollHeight;
        }
    }, [pipeline?.logs.length]);

    useEffect(() => {
        // 组件挂载时订阅频道
        const channel = echo.private(`pipelines.${pipelineId}`);

        channel
            .listen('PipelineStatusChanged', (e) => {
                console.log('Received Status Change:', e);
                dispatch(pipelineStatusChanged(e));
            })
            .listen('PipelineLogReceived', (e) => {
                console.log('Received Log:', e);
                dispatch(pipelineLogReceived(e));
            });
        
        // 组件卸载时取消订阅,防止内存泄漏
        return () => {
            channel.stopListening('PipelineStatusChanged');
            channel.stopListening('PipelineLogReceived');
            echo.leave(`pipelines.${pipelineId}`);
        };
    }, [pipelineId, dispatch]);

    if (!pipeline) {
        return null;
    }

    return (
        <div className="bg-gray-800 rounded-lg shadow-lg overflow-hidden">
            <div className="p-4 flex justify-between items-center border-b border-gray-700">
                <div>
                    <h2 className="font-bold text-lg">{pipeline.repo}</h2>
                    <p className="text-sm text-gray-400">ID: {pipeline.id}</p>
                </div>
                <StatusBadge status={pipeline.status} />
            </div>
            <div
                ref={logContainerRef}
                className="p-4 h-64 overflow-y-auto bg-black font-mono text-sm"
            >
                {pipeline.logs.map((log, index) => (
                    <div key={index} className="flex">
                        <span className="text-gray-500 mr-4">{new Date(log.timestamp).toLocaleTimeString()}</span>
                        <pre className={log.line.includes('ERROR') ? 'text-red-400' : 'text-gray-300'}>{log.line}</pre>
                    </div>
                ))}
            </div>
        </div>
    );
}

5. ESLint 配置

一个基础的 .eslintrc.cjs 文件来统一规范。

module.exports = {
  root: true,
  env: { browser: true, es2020: true },
  extends: [
    'eslint:recommended',
    'plugin:react/recommended',
    'plugin:react/jsx-runtime',
    'plugin:react-hooks/recommended',
  ],
  ignorePatterns: ['dist', '.eslintrc.cjs'],
  parserOptions: { ecmaVersion: 'latest', sourceType: 'module' },
  settings: { react: { version: '18.2' } },
  plugins: ['react-refresh'],
  rules: {
    'react-refresh/only-export-components': [
      'warn',
      { allowConstantExport: true },
    ],
    'react/prop-types': 'off', // 在这个项目中我们暂时关闭 prop-types 检查
    'no-console': 'warn', // 生产环境中警告 console.log
  },
}

局限性与未来迭代方向

这套方案为我们的内部监控平台打下了坚实的基础,但它并非完美。当前的实现存在一些可以优化的点:

  1. 前端性能瓶颈: 当日志量非常巨大时(例如数万行),在单个 div 中渲染所有日志会导致浏览器性能下降。未来的优化方向是引入虚拟滚动(Virtualized List),只渲染视口内可见的日志行。

  2. WebSocket 服务器的可用性: soketi 目前是单点部署。在更严肃的生产环境中,需要部署一个高可用的 soketi 集群,并使用 Redis 作为其横向扩展的后端,再通过负载均衡器对外提供服务。

  3. 日志持久化与历史追溯: 当前方案的日志仅存在于前端 Redux store 的内存中。刷新页面会导致日志丢失。一个完整的方案需要将流水线状态和日志持久化到数据库(如 PostgreSQL 或 Elasticsearch),API 端点应能拉取历史日志,实现前端状态的水合(Hydration)。

  4. 授权粒度: 目前的频道授权逻辑 (return true;) 过于粗放。一个真正的多项目、多团队环境中,必须实现基于用户角色和项目权限的精细化授权,确保用户只能看到他们有权访问的流水线。


  目录