工作流执行器独立项目方案
🚀 工作流执行器独立项目方案
📋 项目概述
将工作流执行器抽离为 workflow-engine
独立微服务,使用Redis+BullMQ实现高可用、可扩展的分布式工作流处理系统。
Ran tool
🏗️ 项目架构设计
1. 项目结构
Ran tool
workflow-engine/
├── packages/
│ ├── core/ # 核心执行引擎
│ │ ├── src/
│ │ │ ├── engine/ # 工作流执行器
│ │ │ ├── queue/ # BullMQ队列管理
│ │ │ ├── nodes/ # 节点处理器
│ │ │ ├── cache/ # 缓存管理
│ │ │ ├── monitor/ # 监控和指标
│ │ │ └── types/ # 类型定义
│ │ ├── package.json
│ │ └── tsconfig.json
│ │
│ ├── api/ # HTTP API服务
│ │ ├── src/
│ │ │ ├── routes/ # 路由定义
│ │ │ ├── middleware/ # 中间件
│ │ │ ├── controllers/ # 控制器
│ │ │ └── server.ts # 服务器入口
│ │ └── package.json
│ │
│ ├── worker/ # 工作进程
│ │ ├── src/
│ │ │ ├── workers/ # BullMQ工作进程
│ │ │ ├── handlers/ # 任务处理器
│ │ │ └── worker.ts # 工作进程入口
│ │ └── package.json
│ │
│ └── shared/ # 共享模块
│ ├── src/
│ │ ├── types/ # 共享类型
│ │ ├── utils/ # 工具函数
│ │ ├── config/ # 配置管理
│ │ └── constants/ # 常量定义
│ └── package.json
│
├── deploy/
│ ├── docker/
│ │ ├── Dockerfile.api # API服务镜像
│ │ ├── Dockerfile.worker # Worker镜像
│ │ └── docker-compose.yml # 开发环境
│ ├── k8s/ # Kubernetes部署配置
│ └── helm/ # Helm图表
│
├── scripts/
│ ├── setup.sh # 环境设置
│ ├── deploy.sh # 部署脚本
│ └── test.sh # 测试脚本
│
├── docs/ # 文档
├── tests/ # 测试
├── package.json # 根配置
├── tsconfig.json # TypeScript配置
├── docker-compose.yml # 完整部署配置
└── README.md
2. 核心技术栈
Read file: packages/service/package.json
json
{
"name": "genn-workflow-engine",
"version": "1.0.0",
"private": true,
"workspaces": [
"packages/*"
],
"dependencies": {
"bullmq": "^5.4.0",
"ioredis": "^5.4.1",
"fastify": "^4.26.2",
"mongoose": "^8.10.1",
"winston": "^3.11.0",
"pino": "^8.19.0",
"dotenv": "^16.4.5",
"zod": "^3.22.4"
},
"devDependencies": {
"typescript": "^5.3.3",
"@types/node": "^20.11.17",
"tsx": "^4.7.1",
"vitest": "^1.3.1",
"@types/jest": "^29.5.12"
}
}
🔧 核心组件实现
1. BullMQ队列系统
Ran tool ```typescript // packages/core/src/queue/workflow-queue-manager.ts import { Queue, Worker, Job } from 'bullmq' import Redis from 'ioredis' import { WorkflowJobData, WorkflowJobResult } from '../types'
export class WorkflowQueueManager { private redis: Redis private queues: Map<string, Queue> = new Map() private workers: Map<string, Worker> = new Map()
constructor(redisConfig: Redis.RedisOptions) { this.redis = new Redis(redisConfig) }
/
创建工作流执行队列 / createWorkflowQueue(name: string = 'workflow-execution') { const queue = new Queue<WorkflowJobData, WorkflowJobResult>(name, { connection: this.redis, defaultJobOptions: { removeOnComplete: 100, removeOnFail: 50, attempts: 3, backoff: { type: 'exponential', delay: 2000, }, // 任务超时设置 timeout: 5 60 * 1000, // 5分钟 } })
this.queues.set(name, queue) return queue }
/
创建定时任务队列 */ createScheduledQueue(name: string = 'scheduled-workflows') { const queue = new Queue<WorkflowJobData, WorkflowJobResult>(name, { connection: this.redis, defaultJobOptions: { removeOnComplete: 1000, removeOnFail: 100, attempts: 5, backoff: { type: 'exponential', delay: 5000, }, } })
this.queues.set(name, queue) return queue }
/
创建优先级队列(用于实时任务) / createPriorityQueue(name: string = 'priority-workflows') { const queue = new Queue<WorkflowJobData, WorkflowJobResult>(name, { connection: this.redis, defaultJobOptions: { removeOnComplete: 50, removeOnFail: 20, attempts: 2, timeout: 30 1000, // 30秒 } })
this.queues.set(name, queue) return queue }
/
添加工作流任务 */ async addWorkflowJob( queueName: string, jobData: WorkflowJobData, options?: { priority?: number delay?: number repeat?: { cron: string } } ) { const queue = this.queues.get(queueName) if (!queue) throw new Error(
Queue ${queueName} not found
)const jobOptions: any = { priority: options?.priority || 0, delay: options?.delay, }
// 定时任务配置 if (options?.repeat) { jobOptions.repeat = options.repeat }
const job = await queue.add('execute-workflow', jobData, jobOptions) return job }
/
创建工作流处理器 / createWorkflowWorker( queueName: string, processor: (job: Job
) => Promise ) { const worker = new Worker<WorkflowJobData, WorkflowJobResult>( queueName, processor, { connection: this.redis, concurrency: 5, // 每个Worker的并发数 maxStalledCount: 3, stalledInterval: 30 1000, // 限流配置 limiter: { max: 100, // 每个时间窗口最大任务数 duration: 60 * 1000, // 时间窗口:1分钟 } } ) // 事件监听 worker.on('completed', (job) => { console.log(
Job ${job.id} completed successfully
) })worker.on('failed', (job, err) => { console.error(
Job ${job?.id} failed:
, err) })worker.on('stalled', (jobId) => { console.warn(
Job ${jobId} stalled
) })this.workers.set(queueName, worker) return worker }
/
获取队列状态 */ async getQueueStats(queueName: string) { const queue = this.queues.get(queueName) if (!queue) return null
const [waiting, active, completed, failed, delayed] = await Promise.all([ queue.getWaiting(), queue.getActive(), queue.getCompleted(), queue.getFailed(), queue.getDelayed(), ])
return { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length, delayed: delayed.length, } }
/
暂停/恢复队列 */ async pauseQueue(queueName: string) { const queue = this.queues.get(queueName) if (queue) await queue.pause() }
async resumeQueue(queueName: string) { const queue = this.queues.get(queueName) if (queue) await queue.resume() }
/
清理队列 */ async cleanQueue(queueName: string, grace: number = 0) { const queue = this.queues.get(queueName) if (!queue) return
await queue.clean(grace, 100, 'completed') await queue.clean(grace, 50, 'failed') }
/
关闭所有连接 */ async close() { // 关闭所有workers await Promise.all( Array.from(this.workers.values()).map(worker => worker.close()) )
// 关闭所有队列 await Promise.all( Array.from(this.queues.values()).map(queue => queue.close()) )
// 关闭Redis连接 await this.redis.quit() } } ```
2. 分布式工作流执行器
Read file: packages/service/core/workflow/dispatch/index.ts ```typescript // packages/core/src/engine/distributed-workflow-engine.ts import { Job } from 'bullmq' import { WorkflowQueueManager } from '../queue/workflow-queue-manager' import { WorkflowStateManager } from '../state/workflow-state-manager' import { WorkflowExecutor } from './workflow-executor' import { WorkflowJobData, WorkflowJobResult, WorkflowExecutionContext } from '../types'
export class DistributedWorkflowEngine { private queueManager: WorkflowQueueManager private stateManager: WorkflowStateManager private executor: WorkflowExecutor
constructor( redisConfig: any, mongoConfig: any ) { this.queueManager = new WorkflowQueueManager(redisConfig) this.stateManager = new WorkflowStateManager(mongoConfig) this.executor = new WorkflowExecutor()
this.initializeQueues()
this.startWorkers()
/
初始化队列 */ private initializeQueues() { // 实时执行队列 this.queueManager.createPriorityQueue('realtime')
// 普通工作流队列 this.queueManager.createWorkflowQueue('workflow')
// 定时任务队列 this.queueManager.createScheduledQueue('scheduled')
// 后台任务队列 this.queueManager.createWorkflowQueue('background') }
/
启动工作进程 */ private startWorkers() { // 实时任务处理器 - 高优先级,低延迟 this.queueManager.createWorkflowWorker('realtime', async (job) => { return this.processRealtimeWorkflow(job) })
// 普通工作流处理器 this.queueManager.createWorkflowWorker('workflow', async (job) => { return this.processWorkflow(job) })
// 定时任务处理器 this.queueManager.createWorkflowWorker('scheduled', async (job) => { return this.processScheduledWorkflow(job) })
// 后台任务处理器 - 支持长时间运行 this.queueManager.createWorkflowWorker('background', async (job) => { return this.processBackgroundWorkflow(job) }) }
/
提交工作流任务 */ async submitWorkflow( workflowData: WorkflowJobData, options: { type: 'realtime' | 'workflow' | 'scheduled' | 'background' priority?: number delay?: number repeat?: { cron: string } clientId?: string // 用于断线重连 } ) { const { type, ...jobOptions } = options
// 创建执行上下文 const executionId =
exec_${Date.now()}_${Math.random().toString(36).substr(2, 9)}
const context: WorkflowExecutionContext = { executionId, workflowId: workflowData.workflowDefinition.id, userId: workflowData.userId, clientId: options.clientId, status: 'pending', createdAt: new Date(), metadata: workflowData.metadata || {} }// 保存执行上下文到状态管理器 await this.stateManager.saveExecutionContext(context)
// 提交到对应队列 const job = await this.queueManager.addWorkflowJob(type, { ...workflowData, executionId, context }, jobOptions)
return { executionId, jobId: job.id } }
/
处理实时工作流 */ private async processRealtimeWorkflow(job: Job
): Promise { const { executionId, context } = job.data try { // 更新状态为运行中 await this.stateManager.updateExecutionStatus(executionId, 'running')
// 执行工作流 const result = await this.executor.executeWorkflow(job.data, { onProgress: async (progress) => { // 实时更新进度 await this.stateManager.updateExecutionProgress(executionId, progress)
}, onNodeComplete: async (nodeResult) =// 发送进度到客户端(如果连接) if (context.clientId) { await this.sendProgressToClient(context.clientId, progress) }
{ // 保存节点执行结果 await this.stateManager.saveNodeResult(executionId, nodeResult) } })
// 更新最终状态 await this.stateManager.updateExecutionStatus(executionId, 'completed') await this.stateManager.saveExecutionResult(executionId, result)
return { success: true, executionId, result, completedAt: new Date() } } catch (error) { // 记录错误 await this.stateManager.updateExecutionStatus(executionId, 'failed') await this.stateManager.saveExecutionError(executionId, error as Error)
throw error } }
/
处理普通工作流 */ private async processWorkflow(job: Job
): Promise { // 与实时工作流类似,但可能有不同的超时和重试策略 return this.processRealtimeWorkflow(job) } /
处理定时任务工作流 */ private async processScheduledWorkflow(job: Job
): Promise { const { executionId } = job.data // 检查是否应该跳过执行(例如,系统维护期间) const shouldSkip = await this.shouldSkipScheduledTask(job.data) if (shouldSkip) { await this.stateManager.updateExecutionStatus(executionId, 'skipped') return { success: true, executionId, skipped: true, reason: 'Scheduled task skipped due to system maintenance' } }
return this.processWorkflow(job) }
/
处理后台工作流 */ private async processBackgroundWorkflow(job: Job
): Promise { const { executionId } = job.data try { // 后台任务可能需要更长的超时时间 const result = await this.executor.executeWorkflow(job.data, { timeout: 30 60 1000, // 30分钟超时 onProgress: async (progress) => { await this.stateManager.updateExecutionProgress(executionId, progress) } })
await this.stateManager.updateExecutionStatus(executionId, 'completed') return { success: true, executionId, result, completedAt: new Date() } } catch (error) { await this.stateManager.updateExecutionStatus(executionId, 'failed') throw error } }
/
获取工作流执行状态 */ async getExecutionStatus(executionId: string) { const context = await this.stateManager.getExecutionContext(executionId) const progress = await this.stateManager.getExecutionProgress(executionId) const result = await this.stateManager.getExecutionResult(executionId)
return { context, progress, result } }
/
断线重连支持 */ async reconnectClient(clientId: string) { // 获取该客户端的所有活跃执行 const activeExecutions = await this.stateManager.getActiveExecutionsByClient(clientId)
return activeExecutions.map(execution => ({ executionId: execution.executionId, status: execution.status, progress: execution.progress, startedAt: execution.createdAt })) }
/
取消工作流执行 */ async cancelExecution(executionId: string) { await this.stateManager.updateExecutionStatus(executionId, 'cancelled')
// 尝试从队列中移除(如果还在队列中) // 这里需要根据具体的队列实现来处理 }
/
获取系统统计信息 */ async getSystemStats() { const queueStats = await Promise.all([ this.queueManager.getQueueStats('realtime'), this.queueManager.getQueueStats('workflow'), this.queueManager.getQueueStats('scheduled'), this.queueManager.getQueueStats('background') ])
const executionStats = await this.stateManager.getExecutionStats()
return { queues: { realtime: queueStats[0], workflow: queueStats[1], scheduled: queueStats[2], background: queueStats[3] }, executions: executionStats } }
private async shouldSkipScheduledTask(jobData: WorkflowJobData): Promise
{ // 实现跳过逻辑,例如系统维护期间 return false } private async sendProgressToClient(clientId: string, progress: any) { // 实现客户端进度推送逻辑(WebSocket等) // 这里可以集成到API层 }
/
关闭引擎 */ async close() { await this.queueManager.close() await this.stateManager.close() } } ```
3. 状态持久化管理器
// packages/core/src/state/workflow-state-manager.ts
import mongoose from 'mongoose'
// 执行上下文Schema
const ExecutionContextSchema = new mongoose.Schema({
executionId: { type: String, required: true, unique: true },
workflowId: { type: String, required: true },
userId: { type: String, required: true },
clientId: { type: String }, // 用于断线重连
status: {
type: String,
enum: ['pending', 'running', 'completed', 'failed', 'cancelled', 'skipped'],
default: 'pending'
},
progress: {
currentNode: String,
completedNodes: [String],
totalNodes: Number,
percentage: { type: Number, default: 0 }
},
result: mongoose.Schema.Types.Mixed,
error: {
message: String,
stack: String,
timestamp: Date
},
metadata: mongoose.Schema.Types.Mixed,
createdAt: { type: Date, default: Date.now },
updatedAt: { type: Date, default: Date.now },
completedAt: Date
})
// 节点执行结果Schema
const NodeResultSchema = new mongoose.Schema({
executionId: { type: String, required: true },
nodeId: { type: String, required: true },
nodeName: String,
nodeType: String,
status: { type: String, enum: ['completed', 'failed', 'skipped'] },
input: mongoose.Schema.Types.Mixed,
output: mongoose.Schema.Types.Mixed,
error: {
message: String,
stack: String
},
executionTime: Number, // 毫秒
startedAt: Date,
completedAt: Date
})
export class WorkflowStateManager {
private ExecutionContext: mongoose.Model<any>
private NodeResult: mongoose.Model<any>
constructor(mongoConfig: any) {
// 连接MongoDB
mongoose.connect(mongoConfig.uri, mongoConfig.options)
this.ExecutionContext = mongoose.model('ExecutionContext', ExecutionContextSchema)
this.NodeResult = mongoose.model('NodeResult', NodeResultSchema)
}
/**
* 保存执行上下文
*/
async saveExecutionContext(context: WorkflowExecutionContext) {
const doc = new this.ExecutionContext(context)
return await doc.save()
}
/**
* 更新执行状态
*/
async updateExecutionStatus(executionId: string, status: string) {
return await this.ExecutionContext.updateOne(
{ executionId },
{
status,
updatedAt: new Date(),
...(status === 'completed' && { completedAt: new Date() })
}
)
}
/**
* 更新执行进度
*/
async updateExecutionProgress(executionId: string, progress: any) {
return await this.ExecutionContext.updateOne(
{ executionId },
{
progress,
updatedAt: new Date()
}
)
}
/**
* 保存执行结果
*/
async saveExecutionResult(executionId: string, result: any) {
return await this.ExecutionContext.updateOne(
{ executionId },
{
result,
updatedAt: new Date()
}
)
}
/**
* 保存执行错误
*/
async saveExecutionError(executionId: string, error: Error) {
return await this.ExecutionContext.updateOne(
{ executionId },
{
error: {
message: error.message,
stack: error.stack,
timestamp: new Date()
},
updatedAt: new Date()
}
)
}
/**
* 保存节点执行结果
*/
async saveNodeResult(executionId: string, nodeResult: any) {
const doc = new this.NodeResult({
executionId,
...nodeResult
})
return await doc.save()
}
/**
* 获取执行上下文
*/
async getExecutionContext(executionId: string) {
return await this.ExecutionContext.findOne({ executionId }).lean()
}
/**
* 获取执行进度
*/
async getExecutionProgress(executionId: string) {
const context = await this.ExecutionContext.findOne({ executionId }, 'progress').lean()
return context?.progress
}
/**
* 获取执行结果
*/
async getExecutionResult(executionId: string) {
const context = await this.ExecutionContext.findOne({ executionId }, 'result').lean()
return context?.result
}
/**
* 获取客户端的活跃执行
*/
async getActiveExecutionsByClient(clientId: string) {
return await this.ExecutionContext.find({
clientId,
status: { $in: ['pending', 'running'] }
}).lean()
}
/**
* 获取节点执行历史
*/
async getNodeResults(executionId: string) {
return await this.NodeResult.find({ executionId }).sort({ startedAt: 1 }).lean()
}
/**
* 清理过期数据
*/
async cleanupExpiredData(daysOld: number = 30) {
const cutoffDate = new Date()
cutoffDate.setDate(cutoffDate.getDate() - daysOld)
// 清理执行上下文
await this.ExecutionContext.deleteMany({
status: { $in: ['completed', 'failed', 'cancelled'] },
updatedAt: { $lt: cutoffDate }
})
// 清理节点结果
await this.NodeResult.deleteMany({
completedAt: { $lt: cutoffDate }
})
}
/**
* 获取执行统计
*/
async getExecutionStats() {
const stats = await this.ExecutionContext.aggregate([
{
$group: {
_id: '$status',
count: { $sum: 1 }
}
}
])
return stats.reduce((acc, stat) => {
acc[stat._id] = stat.count
return acc
}, {})
}
async close() {
await mongoose.connection.close()
}
}
4. API服务层
// packages/api/src/server.ts
import Fastify from 'fastify'
import { DistributedWorkflowEngine } from '@genn/workflow-core'
import { WorkflowController } from './controllers/workflow-controller'
const fastify = Fastify({
logger: true,
bodyLimit: 10485760 // 10MB
})
// 初始化工作流引擎
const workflowEngine = new DistributedWorkflowEngine(
// Redis配置
{
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0')
},
// MongoDB配置
{
uri: process.env.MONGODB_URI || 'mongodb://localhost:27017/workflow',
options: {}
}
)
// 注册控制器
const workflowController = new WorkflowController(workflowEngine)
// 路由定义
fastify.register(async function (fastify) {
// 提交工作流执行
fastify.post('/api/workflow/execute', workflowController.executeWorkflow.bind(workflowController))
// 获取执行状态
fastify.get('/api/workflow/status/:executionId', workflowController.getExecutionStatus.bind(workflowController))
// 断线重连
fastify.get('/api/workflow/reconnect/:clientId', workflowController.reconnectClient.bind(workflowController))
// 取消执行
fastify.post('/api/workflow/cancel/:executionId', workflowController.cancelExecution.bind(workflowController))
// 系统统计
fastify.get('/api/workflow/stats', workflowController.getSystemStats.bind(workflowController))
// 健康检查
fastify.get('/health', async () => ({ status: 'ok', timestamp: new Date().toISOString() }))
})
// WebSocket支持(用于实时进度推送)
fastify.register(require('@fastify/websocket'))
fastify.register(async function (fastify) {
fastify.get('/ws/:clientId', { websocket: true }, (connection, req) => {
const clientId = (req.params as any).clientId
connection.socket.on('message', async (message) => {
// 处理客户端消息
try {
const data = JSON.parse(message.toString())
if (data.type === 'subscribe') {
// 订阅执行状态更新
workflowController.subscribeToUpdates(clientId, connection.socket)
}
} catch (error) {
connection.socket.send(JSON.stringify({
type: 'error',
message: 'Invalid message format'
}))
}
})
connection.socket.on('close', () => {
// 清理订阅
workflowController.unsubscribeFromUpdates(clientId)
})
})
})
// 启动服务器
const start = async () => {
try {
const port = parseInt(process.env.PORT || '3000')
await fastify.listen({ port, host: '0.0.0.0' })
console.log(`🚀 Workflow Engine API running on port ${port}`)
} catch (err) {
fastify.log.error(err)
process.exit(1)
}
}
// 优雅关闭
process.on('SIGTERM', async () => {
console.log('Received SIGTERM, shutting down gracefully...')
await workflowEngine.close()
await fastify.close()
process.exit(0)
})
start()
🐳 部署配置
1. Docker配置
# deploy/docker/Dockerfile.api
FROM node:18-alpine
WORKDIR /app
# 复制package.json和安装依赖
COPY package*.json ./
COPY packages/api/package*.json ./packages/api/
RUN npm ci --only=production
# 复制源代码
COPY packages/api ./packages/api
COPY packages/shared ./packages/shared
# 构建
RUN npm run build
EXPOSE 3000
CMD ["npm", "run", "start:api"]
# deploy/docker/Dockerfile.worker
FROM node:18-alpine
WORKDIR /app
# 复制依赖和构建worker
COPY package*.json ./
COPY packages/worker/package*.json ./packages/worker/
COPY packages/core/package*.json ./packages/core/
RUN npm ci --only=production
COPY packages/worker ./packages/worker
COPY packages/core ./packages/core
COPY packages/shared ./packages/shared
RUN npm run build
CMD ["npm", "run", "start:worker"]
2. Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
mongodb:
image: mongo:7
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: password
workflow-api:
build:
context: .
dockerfile: deploy/docker/Dockerfile.api
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- REDIS_HOST=redis
- REDIS_PORT=6379
- MONGODB_URI=mongodb://admin:password@mongodb:27017/workflow?authSource=admin
depends_on:
- redis
- mongodb
restart: unless-stopped
workflow-worker-1:
build:
context: .
dockerfile: deploy/docker/Dockerfile.worker
environment:
- NODE_ENV=production
- REDIS_HOST=redis
- REDIS_PORT=6379
- MONGODB_URI=mongodb://admin:password@mongodb:27017/workflow?authSource=admin
- WORKER_ID=worker-1
depends_on:
- redis
- mongodb
restart: unless-stopped
workflow-worker-2:
build:
context: .
dockerfile: deploy/docker/Dockerfile.worker
environment:
- NODE_ENV=production
- REDIS_HOST=redis
- REDIS_PORT=6379
- MONGODB_URI=mongodb://admin:password@mongodb:27017/workflow?authSource=admin
- WORKER_ID=worker-2
depends_on:
- redis
- mongodb
restart: unless-stopped
# Nginx负载均衡
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./deploy/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- workflow-api
restart: unless-stopped
volumes:
redis_data:
mongo_data:
3. Kubernetes部署配置
# deploy/k8s/workflow-engine.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: workflow-api
spec:
replicas: 3
selector:
matchLabels:
app: workflow-api
template:
metadata:
labels:
app: workflow-api
spec:
containers:
- name: workflow-api
image: genn/workflow-engine-api:latest
ports:
- containerPort: 3000
env:
- name: REDIS_HOST
value: "redis-service"
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb-secret
key: uri
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: workflow-worker
spec:
replicas: 5
selector:
matchLabels:
app: workflow-worker
template:
metadata:
labels:
app: workflow-worker
spec:
containers:
- name: workflow-worker
image: genn/workflow-engine-worker:latest
env:
- name: REDIS_HOST
value: "redis-service"
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb-secret
key: uri
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: workflow-api-service
spec:
selector:
app: workflow-api
ports:
- port: 80
targetPort: 3000
type: LoadBalancer
🔄 与主系统集成方案
1. API集成
// 在主系统中的集成
class WorkflowEngineClient {
private baseUrl: string
private apiKey: string
constructor(baseUrl: string, apiKey: string) {
this.baseUrl = baseUrl
this.apiKey = apiKey
}
async executeWorkflow(workflowDefinition: any, options: any) {
const response = await fetch(`${this.baseUrl}/api/workflow/execute`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`
},
body: JSON.stringify({
workflowDefinition,
...options
})
})
return response.json()
}
async getExecutionStatus(executionId: string) {
const response = await fetch(`${this.baseUrl}/api/workflow/status/${executionId}`, {
headers: {
'Authorization': `Bearer ${this.apiKey}`
}
})
return response.json()
}
// WebSocket连接用于实时更新
connectToUpdates(clientId: string, onUpdate: Function) {
const ws = new WebSocket(`${this.baseUrl.replace('http', 'ws')}/ws/${clientId}`)
ws.onmessage = (event) => {
const data = JSON.parse(event.data)
onUpdate(data)
}
return ws
}
}
📊 性能与扩展性
预期性能指标
指标 | 单实例 | 多实例集群 |
---|---|---|
并发执行 | 100+ | 1000+ |
队列吞吐 | 1000 jobs/min | 10000+ jobs/min |
响应时间 | <100ms | <50ms |
可用性 | 99.5% | 99.9% |
扩展策略
- 水平扩展: 增加Worker实例数量
- 垂直扩展: 增加单实例资源配置
- 队列分片: 按业务类型分离队列
- 数据库分片: 按时间或用户分片存储
🎯 实施计划
阶段一:基础架构 (2周)
- ✅ 项目结构搭建
- ✅ BullMQ队列系统
- ✅ 基础执行引擎
- ✅ 状态持久化
阶段二:核心功能 (2周)
- ✅ 分布式执行逻辑
- ✅ API服务层
- ✅ 断线重连机制
- ✅ 监控和统计
阶段三:部署和优化 (1周)
- ✅ Docker部署配置
- ✅ 性能测试和优化
- ✅ 文档和运维指南
这个方案将彻底解决当前的并发问题,提供企业级的工作流执行能力!