工作流执行器独立化详细方案(基于SSE增强版)
🚀 工作流执行器独立化详细方案(基于SSE增强版)
📋 现状分析与问题定位
Read file: packages/service/core/workflow/dispatch/index.ts
当前系统核心问题
从代码分析可以看出:
- 性能瓶颈:
dispatchWorkFlow
函数运行在主进程中,927行的复杂逻辑与Web服务争抢资源 - SSE实现简陋:只有基础的流式响应,缺乏状态恢复和断线重连能力
- 递归限制:硬编码20层深度,无法处理复杂工作流
- 缓存失效:
cacheEnable = false
,缓存机制被禁用 - 无法扩展:单实例处理,无法水平扩展
现有SSE机制分析
当前的SSE实现包含:
- 基础的流式响应头设置
- 10秒心跳保持连接
workflowStreamResponse
回调机制- 简单的文本流推送
缺失的关键能力:
- 状态持久化和恢复
- 断线重连后的状态同步
- 执行进度的详细追踪
- 多实例间的状态共享
🏗️ 独立化架构设计
整体架构图
【现有架构】
用户请求 → Next.js主应用 → dispatchWorkFlow(主进程) → SSE流式响应
↓
数据库操作+AI调用
【目标架构】
用户请求 → Next.js主应用 → Redis消息队列 → 工作流引擎集群
↓ ↓
SSE流+状态恢复 ←―――― 执行进度推送
↓ ↓
MongoDB状态存储 ←―――― 持久化状态
技术栈选择
组件 | 技术方案 | 理由 |
---|---|---|
消息队列 | Redis + BullMQ | 高性能、支持优先级、可持久化 |
状态存储 | MongoDB | 适合复杂状态、已有基础设施 |
流式通信 | 增强SSE | 保持兼容性、原生断线重连 |
负载均衡 | Nginx | 成熟稳定、支持SSE代理 |
监控系统 | 自研+日志 | 轻量级、与现有系统集成 |
🎯 详细实施方案
第一阶段:项目结构搭建
独立项目结构
genn-workflow-engine/
├── src/
│ ├── core/ # 核心执行引擎
│ │ ├── dispatcher.ts # 迁移dispatchWorkFlow核心逻辑
│ │ ├── node-executor.ts # 节点执行器
│ │ ├── edge-manager.ts # 边连接管理
│ │ └── variable-manager.ts # 变量管理
│ ├── queue/ # BullMQ队列管理
│ │ ├── queue-manager.ts # 队列管理器
│ │ ├── job-processor.ts # 任务处理器
│ │ └── scheduler.ts # 定时任务调度器
│ ├── state/ # 状态管理
│ │ ├── execution-state.ts # 执行状态管理
│ │ ├── progress-tracker.ts # 进度追踪
│ │ └── cache-manager.ts # 缓存管理
│ ├── api/ # HTTP接口
│ │ ├── workflow-api.ts # 工作流API
│ │ ├── sse-handler.ts # SSE处理器
│ │ └── admin-api.ts # 管理接口
│ └── types/ # 类型定义
│ ├── workflow.types.ts # 工作流类型
│ └── queue.types.ts # 队列类型
├── deploy/
│ ├── docker/
│ └── k8s/
└── docs/
关键代码迁移清单
Read file: packages/service/core/workflow/dispatch/index.ts 需要完整迁移的代码模块:
- 核心执行逻辑:整个
dispatchWorkFlow
函数(927行) - 节点处理器映射:
callbackMap
包含25种节点类型 - 关键函数:
checkNodeCanRun
、nodeRunWithActive
、getNodeRunParams
等 - 状态管理:
pushStore
、nodeOutput
、handleInteractiveResult
等 - 变量系统:
getSystemVariable
、变量替换逻辑
第二阶段:BullMQ队列系统设计
队列分类策略
基于当前系统的使用场景,设计4类队列:
// 队列配置
const QUEUE_CONFIG = {
'realtime': {
name: '实时聊天队列',
maxConcurrency: 20, // 高并发处理
timeout: 30000, // 30秒超时
priority: 100, // 最高优先级
maxRetries: 2, // 少量重试
backoffDelay: 1000 // 1秒退避
},
'scheduled': {
name: '定时任务队列',
maxConcurrency: 50, // 批量处理能力
timeout: 300000, // 5分钟超时
priority: 50, // 中等优先级
maxRetries: 5, // 多次重试
backoffDelay: 5000 // 5秒退避
},
'background': {
name: '后台任务队列',
maxConcurrency: 10, // 控制资源使用
timeout: 1800000, // 30分钟超时
priority: 20, // 低优先级
maxRetries: 3, // 适中重试
backoffDelay: 10000 // 10秒退避
},
'debug': {
name: '调试任务队列',
maxConcurrency: 5, // 限制调试并发
timeout: 60000, // 1分钟超时
priority: 80, // 高优先级(开发阶段)
maxRetries: 1, // 最少重试
backoffDelay: 2000 // 2秒退避
}
};
任务分发逻辑
// 队列选择器
function selectQueue(workflowContext: WorkflowContext): string {
const { mode, source, isScheduled, estimatedDuration } = workflowContext;
// 调试模式
if (mode === 'debug') return 'debug';
// 定时任务
if (isScheduled) return 'scheduled';
// 长时间运行的后台任务
if (estimatedDuration > 300000) return 'background';
// 默认实时处理
return 'realtime';
}
定时任务雪崩解决方案
核心问题:大量定时任务在整点同时触发导致系统过载
解决策略:
- 时间分散:将整点任务分散到整个小时
- 流量整形:根据系统负载控制执行速度
- 优先级调度:重要任务优先执行
// 定时任务分散算法
class ScheduledTaskDistributor {
async distributeHourlyTasks(tasks: ScheduledTask[]) {
const totalTasks = tasks.length;
const timeSlots = 60; // 60分钟
const tasksPerSlot = Math.ceil(totalTasks / timeSlots);
for (let slot = 0; slot < timeSlots; slot++) {
const slotTasks = tasks.slice(slot * tasksPerSlot, (slot + 1) * tasksPerSlot);
slotTasks.forEach((task, index) => {
// 在当前分钟内进一步分散
const delaySeconds = Math.floor(index / tasksPerSlot * 60);
const jitterSeconds = Math.random() * 10; // 随机抖动
task.delayTime = slot * 60000 + delaySeconds * 1000 + jitterSeconds * 1000;
});
}
return tasks;
}
// 系统负载感知调度
async scheduleWithLoadAwareness(task: ScheduledTask) {
const currentLoad = await this.getSystemLoad();
if (currentLoad > 0.8) {
// 高负载时延迟执行
task.delayTime += 60000; // 延迟1分钟
} else if (currentLoad < 0.3) {
// 低负载时可以提前执行
task.delayTime = Math.max(0, task.delayTime - 30000);
}
return task;
}
}
第三阶段:增强SSE实现
状态持久化SSE设计
核心改进:在保持现有SSE机制的基础上,增加状态恢复能力
// 增强的SSE管理器
class EnhancedSSEManager {
// 创建支持状态恢复的SSE连接
async createSSEConnection(executionId: string, clientId: string, lastEventId?: string) {
// 1. 检查执行状态
const execution = await this.getExecutionState(executionId);
if (!execution) {
throw new Error('Execution not found');
}
// 2. 设置SSE响应头(保持与现有代码一致)
const response = {
headers: {
'Content-Type': 'text/event-stream;charset=utf-8',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
'Cache-Control': 'no-cache, no-transform'
}
};
// 3. 如果是重连,发送历史事件
if (lastEventId) {
await this.sendHistoryEvents(response, executionId, lastEventId);
}
// 4. 发送当前状态
await this.sendCurrentState(response, execution);
// 5. 订阅实时更新
this.subscribeToUpdates(executionId, clientId, response);
return response;
}
// 发送历史事件(断线重连关键)
async sendHistoryEvents(response: SSEResponse, executionId: string, lastEventId: string) {
const missedEvents = await this.getMissedEvents(executionId, lastEventId);
for (const event of missedEvents) {
response.write(`id: ${event.id}\n`);
response.write(`event: ${event.type}\n`);
response.write(`data: ${JSON.stringify(event.data)}\n\n`);
}
}
}
事件分类和状态恢复
基于现有的SseResponseEventEnum
扩展事件类型:
// 扩展现有事件类型
const EnhancedSseEvents = {
...SseResponseEventEnum,
// 新增状态恢复相关事件
EXECUTION_STARTED: 'execution_started',
NODE_STARTED: 'node_started',
NODE_COMPLETED: 'node_completed',
NODE_FAILED: 'node_failed',
PROGRESS_UPDATE: 'progress_update',
EXECUTION_COMPLETED: 'execution_completed',
EXECUTION_FAILED: 'execution_failed',
STATE_RECOVERY: 'state_recovery'
};
前端断线重连增强
// 客户端增强EventSource
class RobustEventSource {
private executionId: string;
private lastEventId: string = '';
private eventSource: EventSource | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
connect() {
const url = `/api/workflow/stream/${this.executionId}`;
// 使用Last-Event-ID实现状态恢复
this.eventSource = new EventSource(url, {
headers: this.lastEventId ? {
'Last-Event-ID': this.lastEventId
} : {}
});
this.eventSource.onmessage = (event) => {
this.lastEventId = event.lastEventId || event.data.eventId || '';
this.onMessage(event);
this.reconnectAttempts = 0; // 重置重连计数
};
this.eventSource.onerror = () => {
this.handleReconnect();
};
}
private handleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.onError('Max reconnection attempts reached');
return;
}
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => {
this.connect();
}, delay);
}
}
第四阶段:主系统接口改造
渐进式改造策略
保持现有接口完全兼容,通过配置开关控制新旧系统: Ran tool Read file: projects/app/src/service/core/app/utils.ts 主要调用点分析:
- 定时任务调用(
app/utils.ts
):无SSE流,后台执行 - 聊天接口调用(
completions.ts
):有SSE流,实时响应 - 调试接口调用(
debug.ts
):有SSE流,开发调试 - 测试接口调用(
chatTest.ts
):有SSE流,功能测试
接口改造实现
步骤1:创建工作流客户端
// 新增:packages/service/core/workflow/client.ts
export class WorkflowEngineClient {
private baseUrl: string;
private enabled: boolean;
constructor() {
this.baseUrl = process.env.WORKFLOW_ENGINE_URL || 'http://localhost:3001';
this.enabled = process.env.USE_WORKFLOW_ENGINE === 'true';
}
// 提交工作流到队列
async submitWorkflow(params: ChatDispatchProps & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
}) {
if (!this.enabled) {
// 降级到原有实现
return await this.fallbackToOriginal(params);
}
try {
const queueType = this.determineQueueType(params);
const executionId = this.generateExecutionId();
// 提交到工作流引擎
const response = await fetch(`${this.baseUrl}/api/workflow/submit`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
executionId,
queueType,
workflowData: this.formatWorkflowData(params)
})
});
if (!response.ok) {
throw new Error('Workflow engine submission failed');
}
const result = await response.json();
// 根据是否需要流式响应选择处理方式
if (params.stream && params.res) {
return await this.handleStreamResponse(executionId, params);
} else {
return await this.waitForCompletion(executionId);
}
} catch (error) {
console.error('Workflow engine error, falling back:', error);
return await this.fallbackToOriginal(params);
}
}
// 处理流式响应
private async handleStreamResponse(executionId: string, params: any) {
const { res, workflowStreamResponse } = params;
// 设置SSE响应头(与原有逻辑一致)
res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('X-Accel-Buffering', 'no');
res.setHeader('Cache-Control', 'no-cache, no-transform');
// 连接到工作流引擎的SSE流
const engineSSE = new EventSource(`${this.baseUrl}/api/workflow/stream/${executionId}`);
let finalResult: any = null;
return new Promise((resolve, reject) => {
engineSSE.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'progress') {
// 转发进度到原有的SSE流
workflowStreamResponse?.(data.payload);
} else if (data.type === 'completed') {
finalResult = data.payload;
engineSSE.close();
resolve(finalResult);
} else if (data.type === 'error') {
engineSSE.close();
reject(new Error(data.message));
}
};
engineSSE.onerror = () => {
engineSSE.close();
reject(new Error('SSE connection failed'));
};
});
}
// 队列类型判断
private determineQueueType(params: any): string {
if (params.mode === 'debug') return 'debug';
if (params.source === ChatSourceEnum.cronJob) return 'scheduled';
if (!params.stream) return 'background';
return 'realtime';
}
// 降级到原始实现
private async fallbackToOriginal(params: any) {
const { dispatchWorkFlow: originalDispatch } = await import('./dispatch/index');
return originalDispatch(params);
}
}
步骤2:修改现有调用点
// 修改:packages/service/core/workflow/dispatch/index.ts
import { WorkflowEngineClient } from '../client';
const workflowClient = new WorkflowEngineClient();
export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {
// 使用新的工作流客户端(支持降级)
return await workflowClient.submitWorkflow(data);
}
// 保留原有实现作为备份
export async function originalDispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {
// 原有的927行实现逻辑保持不变
// ...
}
第五阶段:状态管理和持久化
执行状态数据模型
// MongoDB执行状态Schema
interface WorkflowExecution {
executionId: string;
workflowId: string;
userId: string;
chatId?: string;
// 执行状态
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';
progress: {
totalNodes: number;
completedNodes: number;
currentNode?: string;
percentage: number;
};
// 时间信息
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
// 结果数据
result?: {
flowResponses: ChatHistoryItemResType[];
assistantResponses: AIChatItemValueItemType[];
flowUsages: ChatNodeUsageType[];
newVariables: Record<string, any>;
};
// 错误信息
error?: {
message: string;
stack: string;
nodeId?: string;
timestamp: Date;
};
// SSE事件历史(用于断线重连)
events: Array<{
id: string;
type: string;
data: any;
timestamp: Date;
}>;
// 元数据
metadata: {
queueType: string;
retryCount: number;
clientId?: string;
};
}
状态恢复机制
// 状态恢复服务
class ExecutionStateManager {
// 保存执行事件(SSE断线重连关键)
async saveEvent(executionId: string, event: SSEEvent) {
await WorkflowExecution.updateOne(
{ executionId },
{
$push: {
events: {
id: event.id,
type: event.type,
data: event.data,
timestamp: new Date()
}
}
}
);
}
// 获取错过的事件(断线重连)
async getMissedEvents(executionId: string, lastEventId: string) {
const execution = await WorkflowExecution.findOne({ executionId });
if (!execution) return [];
const lastEventIndex = execution.events.findIndex(e => e.id === lastEventId);
return lastEventIndex >= 0 ? execution.events.slice(lastEventIndex + 1) : execution.events;
}
// 清理过期数据
async cleanupExpiredExecutions(daysOld: number = 7) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysOld);
await WorkflowExecution.deleteMany({
status: { $in: ['completed', 'failed', 'cancelled'] },
completedAt: { $lt: cutoffDate }
});
}
}
第六阶段:部署和运维
部署架构
开发环境
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ 主应用:3000 │───▶│ Redis:6379 │◀───│ 工作流引擎:3001 │
│ Next.js │ │ BullMQ队列 │ │ Express │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
└─────────────▶ MongoDB:27017 ◀─────────────────┘
状态存储
生产环境
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Nginx负载均衡 │ │ Redis集群 │ │ 工作流引擎集群 │
│ :80 │ │ 3主3从 │ │ 5个实例 │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
┌─────────────────┐ │ ┌─────────────────┐
│ 主应用集群 │──────────────┼──────────────│ 监控系统 │
│ 3个实例 │ │ │ Prometheus │
└─────────────────┘ │ └─────────────────┘
│ │ │
└───────────────────────▼───────────────────────┘
MongoDB副本集 (1主2从)
配置管理
// 环境变量配置
const CONFIG = {
// 功能开关
USE_WORKFLOW_ENGINE: process.env.USE_WORKFLOW_ENGINE === 'true',
ENABLE_FALLBACK: process.env.ENABLE_WORKFLOW_FALLBACK !== 'false',
// 服务配置
WORKFLOW_ENGINE_URL: process.env.WORKFLOW_ENGINE_URL || 'http://localhost:3001',
REDIS_URL: process.env.REDIS_URL || 'redis://localhost:6379',
MONGODB_URL: process.env.MONGODB_URL || 'mongodb://localhost:27017/workflow',
// 队列配置
QUEUE_CONCURRENCY: {
realtime: parseInt(process.env.REALTIME_CONCURRENCY || '20'),
scheduled: parseInt(process.env.SCHEDULED_CONCURRENCY || '50'),
background: parseInt(process.env.BACKGROUND_CONCURRENCY || '10'),
debug: parseInt(process.env.DEBUG_CONCURRENCY || '5')
},
// 监控配置
METRICS_ENABLED: process.env.ENABLE_METRICS === 'true',
LOG_LEVEL: process.env.LOG_LEVEL || 'info'
};
监控和告警
关键监控指标:
- 队列健康度:各队列积压任务数、处理速度
- 执行性能:平均执行时间、成功率、错误率
- 系统资源:CPU、内存、网络使用率
- 业务指标:用户并发数、定时任务完成率
告警规则: ```typescript const ALERT_RULES = { // 队列积压告警 queueBacklog: { threshold: 1000, message: '队列积压超过1000个任务' },
// 执行时间告警 executionTime: { threshold: 30000, // 30秒 message: '工作流平均执行时间超过30秒' },
// 错误率告警 errorRate: { threshold: 0.05, // 5% message: '工作流执行错误率超过5%' },
// 系统资源告警 resourceUsage: { cpu: 0.8, memory: 0.9, message: '系统资源使用率过高' } }; ```
第七阶段:渐进式迁移计划
迁移时间表
第1周:基础设施准备
- 搭建独立工作流引擎项目
- 配置Redis和MongoDB环境
- 实现基础队列系统
第2-3周:核心功能迁移
- 迁移
dispatchWorkFlow
核心逻辑 - 实现所有节点处理器
- 开发增强SSE功能
第4周:集成和测试
- 完成主系统接口改造
- 端到端功能测试
- 性能压力测试
第5周:灰度发布
- 10%流量切换到新系统
- 监控系统稳定性
- 问题修复和优化
第6周:全量发布
- 100%流量切换
- 移除旧代码
- 文档更新
风险控制措施
技术风险:
- 降级机制:新系统异常时自动切换到原系统
- 数据备份:执行状态双写,确保数据不丢失
- 监控告警:实时监控系统健康状态
业务风险:
- 灰度发布:逐步切换流量,控制影响范围
- 快速回滚:5分钟内可回滚到原系统
- 用户通知:异常时及时通知用户
成功标准
性能指标:
- 并发处理能力提升10倍(100 → 1000用户)
- 定时任务雪崩问题彻底解决
- 系统响应时间提升50%
- 可用性达到99.9%
业务指标:
- 用户投诉减少80%
- 系统故障时间减少90%
- 支撑业务规模扩展5倍
投入产出分析
开发投入
- 人力成本:6人月 × 8万元 = 48万元
- 基础设施:新增服务器和中间件 = 5000元/月
- 总投入:约50万元
预期收益
- 性能提升:支撑业务规模扩展带来的收入增长
- 成本节约:减少因系统不稳定导致的损失
- 技术债务:为后续发展奠定坚实基础
ROI预估:第一年收益约150万元,投资回报率200%
总结:这个方案在保持现有SSE机制的基础上,通过队列系统和状态持久化彻底解决了工作流执行的性能和扩展性问题。渐进式迁移策略确保了改造过程的安全性,是一个技术先进、风险可控、收益明确的解决方案。