最初,我们团队监控 CI/CD 流水线的方式非常原始:在多个终端窗口里 tail -f
不同的日志文件。当并发构建任务超过两三个时,这种方法迅速变得混乱且无法管理。定位一个失败的构建步骤,需要在日志的海洋中手动搜索,效率极低。我们需要一个集中式的、实时的、可视化的解决方案,一个能清晰展示所有正在运行、已完成和已失败的流水线状态的 Web UI。这就是这个内部项目的起点。
初步构想与技术选型决策
我们的目标是构建一个内部开发者平台(IDP)的核心组件:一个实时日志查看器。它的核心需求是低延迟、状态同步准确,并且 UI 必须能够清晰地展示大量信息。
后端技术栈:Laravel + Soketi
- Laravel: 团队的技术栈基础是 PHP 和 Laravel。利用其成熟的队列系统 (Queues) 和事件广播 (Broadcasting) 功能是自然的选择。我们可以将每个 CI/CD 步骤封装成一个
Job
,在执行过程中通过广播事件来实时更新前端状态。 - WebSockets (Soketi): 替代方案是前端轮询。但在实时场景下,轮询会带来不必要的延迟和服务器负载。WebSocket 提供了持久的双向连接,是推送日志和状态更新的理想选择。我们没有选择 Pusher 这样的 SaaS 服务,而是采用开源的
soketi
进行自托管。在内部工具场景下,这能更好地控制成本和数据隐私,同时避免了外部依赖的潜在不稳定性。
前端技术栈:React + Redux Toolkit + Tailwind CSS
- React: 前端标准,无需赘言。
- Redux Toolkit: 为什么不直接用 React 的
useState
或useContext
?因为一个 CI/CD 监控终端的状态本质上是复杂的、全局的。多条流水线可能同时运行,每条流水线有多个阶段,每个阶段会持续不断地产生日志流。这种高度动态且相互关联的状态,如果用组件本地状态管理,很快会陷入 prop-drilling 和状态同步的泥潭。Redux Toolkit 提供了一个可预测的、集中式的状态容器,其createSlice
和createEntityAdapter
等工具能极大地简化对这种范式化数据的管理。 - Tailwind CSS: 对于内部工具,快速迭代和功能实现优于精雕细琢的视觉设计。Tailwind CSS 的原子化类库让我们能够直接在 HTML 结构中构建出功能性强、信息密度高的界面,而无需编写一行自定义 CSS。这对于快速原型和迭代至关重要。
代码质量:ESLint
- 这是一个全栈项目,前端和后端(如果使用 Inertia.js 或类似的方案)都涉及 JavaScript。统一的 ESLint 规则集能确保代码风格的一致性,减少低级错误,尤其是在处理复杂的异步逻辑和状态更新时,静态代码分析能提前发现很多潜在问题。
后端实现:从队列任务到事件广播
我们的核心是在 Laravel 中模拟一个 CI/CD 流水线。一个流水线被触发后,会被分发到队列中执行。在执行过程中,它会广播多个事件来通知前端。
1. 环境配置与依赖
首先,安装必要的包并配置广播驱动。
composer require pusher/pusher-php-server
在 .env
文件中,我们将广播驱动指向自托管的 soketi
实例。
BROADCAST_DRIVER=pusher
PUSHER_APP_ID=app-id
PUSHER_APP_KEY=app-key
PUSHER_APP_SECRET=app-secret
PUSHER_HOST=127.0.0.1
PUSHER_PORT=6001
PUSHER_SCHEME=http
PUSHER_APP_CLUSTER=mt1
# 使用 Redis 作为队列驱动
QUEUE_CONNECTION=redis
为了运行 soketi
,我们使用一个简单的 docker-compose.yml
。
# docker-compose.yml
version: '3'
services:
soketi:
image: 'quay.io/soketi/soketi:1.6-16-alpine'
environment:
SOKETI_DEBUG: '1'
SOKETI_METRICS_SERVER_PORT: '9601'
ports:
- '6001:6001'
- '9601:9601'
networks:
- sail # 或者你的项目网络
networks:
sail:
driver: bridge
2. 定义广播事件
我们需要为流水线的生命周期定义一系列事件。所有事件都应实现 ShouldBroadcast
接口。
app/Events/PipelineStatusChanged.php
:
<?php
namespace App\Events;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class PipelineStatusChanged implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public string $pipelineId;
public string $status;
public ?string $details;
public ?string $finished_at;
/**
* Create a new event instance.
*
* @return void
*/
public function __construct(string $pipelineId, string $status, ?string $details = null, ?string $finished_at = null)
{
$this->pipelineId = $pipelineId;
$this->status = $status;
$this->details = $details;
$this->finished_at = $finished_at;
}
/**
* Get the channels the event should broadcast on.
*
* @return \Illuminate\Broadcasting\Channel|array
*/
public function broadcastOn()
{
// 使用私有频道确保只有授权用户能接收
return new PrivateChannel('pipelines.' . $this->pipelineId);
}
}
app/Events/PipelineLogReceived.php
:
<?php
namespace App\Events;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class PipelineLogReceived implements ShouldBroadcast
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public string $pipelineId;
public string $line;
public string $timestamp;
public function __construct(string $pipelineId, string $line)
{
$this->pipelineId = $pipelineId;
$this->line = $line;
$this->timestamp = now()->toIso8601String();
}
public function broadcastOn()
{
return new PrivateChannel('pipelines.' . $this->pipelineId);
}
}
3. 创建模拟流水线的 Job
这个 Job 是核心。它会模拟一个多阶段的构建过程,并在每个关键节点广播事件。
app/Jobs/SimulatePipelineRun.php
:
<?php
namespace App\Jobs;
use App\Events\PipelineLogReceived;
use App\Events\PipelineStatusChanged;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Throwable;
class SimulatePipelineRun implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public string $pipelineId;
// 在真实项目中,这里会注入模型或仓库
public array $pipelineData;
public function __construct(string $pipelineId, array $pipelineData)
{
$this->pipelineId = $pipelineId;
$this->pipelineData = $pipelineData;
}
public function handle()
{
// 1. 广播开始状态
broadcast(new PipelineStatusChanged($this->pipelineId, 'RUNNING'));
$this->log('Pipeline started for repository: ' . $this->pipelineData['repo']);
try {
// 模拟不同阶段
$this->runStage('Cloning repository', 2);
$this->runStage('Installing dependencies', 5, [
'npm install...',
'Resolving packages...',
'Done.'
]);
$this->runStage('Running tests', 8, [
'PASS tests/Unit/UserTest.php',
'PASS tests/Feature/ApiTest.php',
'FAIL tests/Feature/BillingTest.php' // 模拟失败
]);
// 如果测试失败,就不能继续
if ($this->pipelineData['repo'] === 'project-b/api') {
throw new \Exception('Tests failed with 1 error.');
}
$this->runStage('Building assets', 4);
$this->runStage('Deploying to production', 3);
// 4. 广播成功状态
$this->log('Pipeline finished successfully.');
broadcast(new PipelineStatusChanged(
$this->pipelineId,
'SUCCEEDED',
'Deployment completed.',
now()->toIso8601String()
));
} catch (Throwable $e) {
// 5. 异常处理,广播失败状态
$this->log('ERROR: ' . $e->getMessage());
Log::error("Pipeline {$this->pipelineId} failed", ['exception' => $e]);
broadcast(new PipelineStatusChanged(
$this->pipelineId,
'FAILED',
$e->getMessage(),
now()->toIso8601String()
));
}
}
private function runStage(string $name, int $duration, array $logs = [])
{
$this->log("--- Stage: {$name} ---");
sleep($duration); // 模拟耗时操作
foreach ($logs as $line) {
$this->log($line);
sleep(rand(0, 1));
}
$this->log("--- Stage: {$name} finished ---");
}
private function log(string $message)
{
broadcast(new PipelineLogReceived($this->pipelineId, $message));
}
}
4. 触发 Job 的路由和控制器
为了方便测试,我们创建一个简单的 API 端点来触发这个 Job。
routes/api.php
:
<?php
use App\Jobs\SimulatePipelineRun;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Route;
use Illuminate\Support\Str;
Route::post('/pipelines', function (Request $request) {
$pipelineId = (string) Str::uuid();
$repo = $request->input('repo', 'project-a/frontend');
// 在真实应用中,会先创建数据库记录
$pipelineData = [
'id' => $pipelineId,
'repo' => $repo,
'status' => 'QUEUED',
'created_at' => now()->toIso8601String(),
'logs' => [],
];
SimulatePipelineRun::dispatch($pipelineId, $pipelineData);
// 返回初始数据,前端可以立即渲染一个占位符
return response()->json($pipelineData);
});
同时,别忘了配置私有频道的授权路由。
routes/channels.php
:
<?php
use Illuminate\Support\Facades\Broadcast;
Broadcast::channel('pipelines.{pipelineId}', function ($user, $pipelineId) {
// 在真实应用中,这里需要验证用户是否有权限查看此流水线
return true;
});
前端实现:状态管理与实时渲染
前端是这个系统的脸面。我们需要一个健壮的结构来处理实时数据流。
1. 依赖安装与环境配置
npm install @reduxjs/toolkit react-redux laravel-echo pusher-js tailwindcss postcss autoprefixer
npx tailwindcss init -p
配置 tailwind.config.js
:
/** @type {import('tailwindcss').Config} */
export default {
content: [
"./index.html",
"./src/**/*.{js,ts,jsx,tsx}",
],
theme: {
extend: {},
},
plugins: [],
}
2. 配置 Laravel Echo
创建一个文件来初始化 Echo 实例,这是连接前端和后端的桥梁。
src/lib/echo.js
:
import Echo from 'laravel-echo';
import Pusher from 'pusher-js';
window.Pusher = Pusher;
const echo = new Echo({
broadcaster: 'pusher',
key: import.meta.env.VITE_PUSHER_APP_KEY,
wsHost: import.meta.env.VITE_PUSHER_HOST,
wsPort: import.meta.env.VITE_PUSHER_PORT,
wssPort: import.meta.env.VITE_PUSHER_PORT,
forceTLS: false, // 开发环境禁用 TLS
disableStats: true,
enabledTransports: ['ws', 'wss'],
// 关键:配置授权端点
authorizer: (channel, options) => {
return {
authorize: (socketId, callback) => {
// 使用你自己的 API 客户端,例如 axios
// 确保你的 API 客户端会携带认证凭证(如 cookie 或 token)
fetch('/api/broadcasting/auth', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Socket-ID': socketId,
// 'Authorization': 'Bearer ...'
},
body: JSON.stringify({
socket_id: socketId,
channel_name: channel.name,
}),
})
.then(response => {
if (!response.ok) {
return response.text().then(text => { throw new Error(text) });
}
return response.json();
})
.then(data => {
callback(false, data);
})
.catch(error => {
console.error('Broadcast authorization failed:', error);
callback(true, error);
});
},
};
},
});
export default echo;
3. Redux State 设计与实现
我们的状态需要存储一个流水线列表,每个流水线包含其元数据和日志。使用 createSlice
来管理。
src/store/pipelinesSlice.js
:
import { createSlice, createEntityAdapter } from '@reduxjs/toolkit';
// createEntityAdapter 是一个强大的工具,用于管理范式化状态
const pipelinesAdapter = createEntityAdapter({
// 假设每个 pipeline 对象都有一个 'id' 属性
selectId: (pipeline) => pipeline.id,
// 保持实体按创建时间降序排序
sortComparer: (a, b) => new Date(b.created_at) - new Date(a.created_at),
});
const initialState = pipelinesAdapter.getInitialState({
// 可以在这里添加额外的状态,比如 loading, error 等
});
const pipelinesSlice = createSlice({
name: 'pipelines',
initialState,
reducers: {
pipelineAdded: pipelinesAdapter.addOne,
pipelineStatusChanged: (state, action) => {
const { pipelineId, status, details, finished_at } = action.payload;
pipelinesAdapter.updateOne(state, {
id: pipelineId,
changes: { status, details, finished_at },
});
},
pipelineLogReceived: (state, action) => {
const { pipelineId, line, timestamp } = action.payload;
const pipeline = state.entities[pipelineId];
if (pipeline) {
// 不可变地更新日志数组
pipeline.logs = [...pipeline.logs, { line, timestamp }];
}
},
},
});
export const { pipelineAdded, pipelineStatusChanged, pipelineLogReceived } = pipelinesSlice.actions;
// 导出 selectors
export const {
selectAll: selectAllPipelines,
selectById: selectPipelineById,
} = pipelinesAdapter.getSelectors((state) => state.pipelines);
export default pipelinesSlice.reducer;
配置 Redux store:src/store/index.js
:
import { configureStore } from '@reduxjs/toolkit';
import pipelinesReducer from './pipelinesSlice';
export const store = configureStore({
reducer: {
pipelines: pipelinesReducer,
},
});
4. React 组件实现
App.jsx
作为主入口,负责触发流水线和渲染列表。
// src/App.jsx
import React from 'react';
import { useSelector } from 'react-redux';
import { selectAllPipelines } from './store/pipelinesSlice';
import { PipelineView } from './components/PipelineView';
import { Header } from './components/Header';
function App() {
const pipelines = useSelector(selectAllPipelines);
return (
<div className="bg-gray-900 text-white min-h-screen font-sans">
<Header />
<main className="p-4 md:p-8">
<div className="space-y-6">
{pipelines.length === 0 ? (
<p className="text-center text-gray-400">No pipelines running. Trigger one to start.</p>
) : (
pipelines.map(pipeline => (
<PipelineView key={pipeline.id} pipelineId={pipeline.id} />
))
)}
</div>
</main>
</div>
);
}
export default App;
Header.jsx
组件包含触发按钮。
// src/components/Header.jsx
import React, { useState } from 'react';
import { useDispatch } from 'react-redux';
import { pipelineAdded } from '../store/pipelinesSlice';
export function Header() {
const dispatch = useDispatch();
const [isLoading, setIsLoading] = useState(false);
const triggerPipeline = async (repo) => {
setIsLoading(true);
try {
const response = await fetch('/api/pipelines', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ repo }),
});
const data = await response.json();
// Dispatch 初始状态到 Redux store
dispatch(pipelineAdded(data));
} catch (error) {
console.error('Failed to trigger pipeline', error);
} finally {
setIsLoading(false);
}
};
return (
<header className="bg-gray-800 p-4 border-b border-gray-700 flex justify-between items-center">
<h1 className="text-xl font-bold">CI/CD Monitor</h1>
<div className="flex space-x-2">
<button onClick={() => triggerPipeline('project-a/frontend')} disabled={isLoading} className="bg-blue-600 hover:bg-blue-700 px-4 py-2 rounded disabled:opacity-50">
Trigger Success Run
</button>
<button onClick={() => triggerPipeline('project-b/api')} disabled={isLoading} className="bg-red-600 hover:bg-red-700 px-4 py-2 rounded disabled:opacity-50">
Trigger Fail Run
</button>
</div>
</header>
);
}
PipelineView.jsx
是核心视图,它订阅事件并展示单个流水线的状态和日志。
// src/components/PipelineView.jsx
import React, { useEffect, useRef } from 'react';
import { useSelector, useDispatch } from 'react-redux';
import { selectPipelineById, pipelineLogReceived, pipelineStatusChanged } from '../store/pipelinesSlice';
import echo from '../lib/echo';
const StatusBadge = ({ status }) => {
const statusClasses = {
QUEUED: 'bg-gray-500',
RUNNING: 'bg-yellow-500 animate-pulse',
SUCCEEDED: 'bg-green-500',
FAILED: 'bg-red-500',
};
return (
<span className={`px-2 py-1 text-xs font-bold rounded-full ${statusClasses[status] || 'bg-gray-600'}`}>
{status}
</span>
);
};
export function PipelineView({ pipelineId }) {
const pipeline = useSelector(state => selectPipelineById(state, pipelineId));
const dispatch = useDispatch();
const logContainerRef = useRef(null);
useEffect(() => {
// 自动滚动到日志底部
if (logContainerRef.current) {
logContainerRef.current.scrollTop = logContainerRef.current.scrollHeight;
}
}, [pipeline?.logs.length]);
useEffect(() => {
// 组件挂载时订阅频道
const channel = echo.private(`pipelines.${pipelineId}`);
channel
.listen('PipelineStatusChanged', (e) => {
console.log('Received Status Change:', e);
dispatch(pipelineStatusChanged(e));
})
.listen('PipelineLogReceived', (e) => {
console.log('Received Log:', e);
dispatch(pipelineLogReceived(e));
});
// 组件卸载时取消订阅,防止内存泄漏
return () => {
channel.stopListening('PipelineStatusChanged');
channel.stopListening('PipelineLogReceived');
echo.leave(`pipelines.${pipelineId}`);
};
}, [pipelineId, dispatch]);
if (!pipeline) {
return null;
}
return (
<div className="bg-gray-800 rounded-lg shadow-lg overflow-hidden">
<div className="p-4 flex justify-between items-center border-b border-gray-700">
<div>
<h2 className="font-bold text-lg">{pipeline.repo}</h2>
<p className="text-sm text-gray-400">ID: {pipeline.id}</p>
</div>
<StatusBadge status={pipeline.status} />
</div>
<div
ref={logContainerRef}
className="p-4 h-64 overflow-y-auto bg-black font-mono text-sm"
>
{pipeline.logs.map((log, index) => (
<div key={index} className="flex">
<span className="text-gray-500 mr-4">{new Date(log.timestamp).toLocaleTimeString()}</span>
<pre className={log.line.includes('ERROR') ? 'text-red-400' : 'text-gray-300'}>{log.line}</pre>
</div>
))}
</div>
</div>
);
}
5. ESLint 配置
一个基础的 .eslintrc.cjs
文件来统一规范。
module.exports = {
root: true,
env: { browser: true, es2020: true },
extends: [
'eslint:recommended',
'plugin:react/recommended',
'plugin:react/jsx-runtime',
'plugin:react-hooks/recommended',
],
ignorePatterns: ['dist', '.eslintrc.cjs'],
parserOptions: { ecmaVersion: 'latest', sourceType: 'module' },
settings: { react: { version: '18.2' } },
plugins: ['react-refresh'],
rules: {
'react-refresh/only-export-components': [
'warn',
{ allowConstantExport: true },
],
'react/prop-types': 'off', // 在这个项目中我们暂时关闭 prop-types 检查
'no-console': 'warn', // 生产环境中警告 console.log
},
}
局限性与未来迭代方向
这套方案为我们的内部监控平台打下了坚实的基础,但它并非完美。当前的实现存在一些可以优化的点:
前端性能瓶颈: 当日志量非常巨大时(例如数万行),在单个
div
中渲染所有日志会导致浏览器性能下降。未来的优化方向是引入虚拟滚动(Virtualized List),只渲染视口内可见的日志行。WebSocket 服务器的可用性:
soketi
目前是单点部署。在更严肃的生产环境中,需要部署一个高可用的soketi
集群,并使用 Redis 作为其横向扩展的后端,再通过负载均衡器对外提供服务。日志持久化与历史追溯: 当前方案的日志仅存在于前端 Redux store 的内存中。刷新页面会导致日志丢失。一个完整的方案需要将流水线状态和日志持久化到数据库(如 PostgreSQL 或 Elasticsearch),API 端点应能拉取历史日志,实现前端状态的水合(Hydration)。
授权粒度: 目前的频道授权逻辑 (
return true;
) 过于粗放。一个真正的多项目、多团队环境中,必须实现基于用户角色和项目权限的精细化授权,确保用户只能看到他们有权访问的流水线。