工作流执行器独立项目方案

工作流执行器独立项目方案

🚀 工作流执行器独立项目方案

📋 项目概述

将工作流执行器抽离为 workflow-engine 独立微服务,使用Redis+BullMQ实现高可用、可扩展的分布式工作流处理系统。 Ran tool

🏗️ 项目架构设计

1. 项目结构

Ran tool workflow-engine/ ├── packages/ │ ├── core/ # 核心执行引擎 │ │ ├── src/ │ │ │ ├── engine/ # 工作流执行器 │ │ │ ├── queue/ # BullMQ队列管理 │ │ │ ├── nodes/ # 节点处理器 │ │ │ ├── cache/ # 缓存管理 │ │ │ ├── monitor/ # 监控和指标 │ │ │ └── types/ # 类型定义 │ │ ├── package.json │ │ └── tsconfig.json │ │ │ ├── api/ # HTTP API服务 │ │ ├── src/ │ │ │ ├── routes/ # 路由定义 │ │ │ ├── middleware/ # 中间件 │ │ │ ├── controllers/ # 控制器 │ │ │ └── server.ts # 服务器入口 │ │ └── package.json │ │ │ ├── worker/ # 工作进程 │ │ ├── src/ │ │ │ ├── workers/ # BullMQ工作进程 │ │ │ ├── handlers/ # 任务处理器 │ │ │ └── worker.ts # 工作进程入口 │ │ └── package.json │ │ │ └── shared/ # 共享模块 │ ├── src/ │ │ ├── types/ # 共享类型 │ │ ├── utils/ # 工具函数 │ │ ├── config/ # 配置管理 │ │ └── constants/ # 常量定义 │ └── package.json │ ├── deploy/ │ ├── docker/ │ │ ├── Dockerfile.api # API服务镜像 │ │ ├── Dockerfile.worker # Worker镜像 │ │ └── docker-compose.yml # 开发环境 │ ├── k8s/ # Kubernetes部署配置 │ └── helm/ # Helm图表 │ ├── scripts/ │ ├── setup.sh # 环境设置 │ ├── deploy.sh # 部署脚本 │ └── test.sh # 测试脚本 │ ├── docs/ # 文档 ├── tests/ # 测试 ├── package.json # 根配置 ├── tsconfig.json # TypeScript配置 ├── docker-compose.yml # 完整部署配置 └── README.md

2. 核心技术栈

Read file: packages/service/package.json json { "name": "genn-workflow-engine", "version": "1.0.0", "private": true, "workspaces": [ "packages/*" ], "dependencies": { "bullmq": "^5.4.0", "ioredis": "^5.4.1", "fastify": "^4.26.2", "mongoose": "^8.10.1", "winston": "^3.11.0", "pino": "^8.19.0", "dotenv": "^16.4.5", "zod": "^3.22.4" }, "devDependencies": { "typescript": "^5.3.3", "@types/node": "^20.11.17", "tsx": "^4.7.1", "vitest": "^1.3.1", "@types/jest": "^29.5.12" } }

🔧 核心组件实现

1. BullMQ队列系统

Ran tool ```typescript // packages/core/src/queue/workflow-queue-manager.ts import { Queue, Worker, Job } from 'bullmq' import Redis from 'ioredis' import { WorkflowJobData, WorkflowJobResult } from '../types'

export class WorkflowQueueManager { private redis: Redis private queues: Map<string, Queue> = new Map() private workers: Map<string, Worker> = new Map()

constructor(redisConfig: Redis.RedisOptions) { this.redis = new Redis(redisConfig) }

/

  • 创建工作流执行队列 / createWorkflowQueue(name: string = 'workflow-execution') { const queue = new Queue<WorkflowJobData, WorkflowJobResult>(name, { connection: this.redis, defaultJobOptions: { removeOnComplete: 100, removeOnFail: 50, attempts: 3, backoff: { type: 'exponential', delay: 2000, }, // 任务超时设置 timeout: 5 60 * 1000, // 5分钟 } })

    this.queues.set(name, queue) return queue }

    /

  • 创建定时任务队列 */ createScheduledQueue(name: string = 'scheduled-workflows') { const queue = new Queue<WorkflowJobData, WorkflowJobResult>(name, { connection: this.redis, defaultJobOptions: { removeOnComplete: 1000, removeOnFail: 100, attempts: 5, backoff: { type: 'exponential', delay: 5000, }, } })

    this.queues.set(name, queue) return queue }

    /

  • 创建优先级队列(用于实时任务) / createPriorityQueue(name: string = 'priority-workflows') { const queue = new Queue<WorkflowJobData, WorkflowJobResult>(name, { connection: this.redis, defaultJobOptions: { removeOnComplete: 50, removeOnFail: 20, attempts: 2, timeout: 30 1000, // 30秒 } })

    this.queues.set(name, queue) return queue }

    /

  • 添加工作流任务 */ async addWorkflowJob( queueName: string, jobData: WorkflowJobData, options?: { priority?: number delay?: number repeat?: { cron: string } } ) { const queue = this.queues.get(queueName) if (!queue) throw new Error(Queue ${queueName} not found)

    const jobOptions: any = { priority: options?.priority || 0, delay: options?.delay, }

    // 定时任务配置 if (options?.repeat) { jobOptions.repeat = options.repeat }

    const job = await queue.add('execute-workflow', jobData, jobOptions) return job }

    /

  • 创建工作流处理器 / createWorkflowWorker( queueName: string, processor: (job: Job) => Promise) { const worker = new Worker<WorkflowJobData, WorkflowJobResult>( queueName, processor, { connection: this.redis, concurrency: 5, // 每个Worker的并发数 maxStalledCount: 3, stalledInterval: 30 1000, // 限流配置 limiter: { max: 100, // 每个时间窗口最大任务数 duration: 60 * 1000, // 时间窗口:1分钟 } } )

    // 事件监听 worker.on('completed', (job) => { console.log(Job ${job.id} completed successfully) })

    worker.on('failed', (job, err) => { console.error(Job ${job?.id} failed:, err) })

    worker.on('stalled', (jobId) => { console.warn(Job ${jobId} stalled) })

    this.workers.set(queueName, worker) return worker }

    /

  • 获取队列状态 */ async getQueueStats(queueName: string) { const queue = this.queues.get(queueName) if (!queue) return null

    const [waiting, active, completed, failed, delayed] = await Promise.all([ queue.getWaiting(), queue.getActive(), queue.getCompleted(), queue.getFailed(), queue.getDelayed(), ])

    return { waiting: waiting.length, active: active.length, completed: completed.length, failed: failed.length, delayed: delayed.length, } }

    /

  • 暂停/恢复队列 */ async pauseQueue(queueName: string) { const queue = this.queues.get(queueName) if (queue) await queue.pause() }

    async resumeQueue(queueName: string) { const queue = this.queues.get(queueName) if (queue) await queue.resume() }

    /

  • 清理队列 */ async cleanQueue(queueName: string, grace: number = 0) { const queue = this.queues.get(queueName) if (!queue) return

    await queue.clean(grace, 100, 'completed') await queue.clean(grace, 50, 'failed') }

    /

  • 关闭所有连接 */ async close() { // 关闭所有workers await Promise.all( Array.from(this.workers.values()).map(worker => worker.close()) )

    // 关闭所有队列 await Promise.all( Array.from(this.queues.values()).map(queue => queue.close()) )

    // 关闭Redis连接 await this.redis.quit() } } ```

2. 分布式工作流执行器

Read file: packages/service/core/workflow/dispatch/index.ts ```typescript // packages/core/src/engine/distributed-workflow-engine.ts import { Job } from 'bullmq' import { WorkflowQueueManager } from '../queue/workflow-queue-manager' import { WorkflowStateManager } from '../state/workflow-state-manager' import { WorkflowExecutor } from './workflow-executor' import { WorkflowJobData, WorkflowJobResult, WorkflowExecutionContext } from '../types'

export class DistributedWorkflowEngine { private queueManager: WorkflowQueueManager private stateManager: WorkflowStateManager private executor: WorkflowExecutor

constructor( redisConfig: any, mongoConfig: any ) { this.queueManager = new WorkflowQueueManager(redisConfig) this.stateManager = new WorkflowStateManager(mongoConfig) this.executor = new WorkflowExecutor()

this.initializeQueues()
this.startWorkers()
  
}

/

  • 初始化队列 */ private initializeQueues() { // 实时执行队列 this.queueManager.createPriorityQueue('realtime')

    // 普通工作流队列 this.queueManager.createWorkflowQueue('workflow')

    // 定时任务队列 this.queueManager.createScheduledQueue('scheduled')

    // 后台任务队列 this.queueManager.createWorkflowQueue('background') }

    /

  • 启动工作进程 */ private startWorkers() { // 实时任务处理器 - 高优先级,低延迟 this.queueManager.createWorkflowWorker('realtime', async (job) => { return this.processRealtimeWorkflow(job) })

    // 普通工作流处理器 this.queueManager.createWorkflowWorker('workflow', async (job) => { return this.processWorkflow(job) })

    // 定时任务处理器 this.queueManager.createWorkflowWorker('scheduled', async (job) => { return this.processScheduledWorkflow(job) })

    // 后台任务处理器 - 支持长时间运行 this.queueManager.createWorkflowWorker('background', async (job) => { return this.processBackgroundWorkflow(job) }) }

    /

  • 提交工作流任务 */ async submitWorkflow( workflowData: WorkflowJobData, options: { type: 'realtime' | 'workflow' | 'scheduled' | 'background' priority?: number delay?: number repeat?: { cron: string } clientId?: string // 用于断线重连 } ) { const { type, ...jobOptions } = options

    // 创建执行上下文 const executionId = exec_${Date.now()}_${Math.random().toString(36).substr(2, 9)} const context: WorkflowExecutionContext = { executionId, workflowId: workflowData.workflowDefinition.id, userId: workflowData.userId, clientId: options.clientId, status: 'pending', createdAt: new Date(), metadata: workflowData.metadata || {} }

    // 保存执行上下文到状态管理器 await this.stateManager.saveExecutionContext(context)

    // 提交到对应队列 const job = await this.queueManager.addWorkflowJob(type, { ...workflowData, executionId, context }, jobOptions)

    return { executionId, jobId: job.id } }

    /

  • 处理实时工作流 */ private async processRealtimeWorkflow(job: Job): Promise { const { executionId, context } = job.data

    try { // 更新状态为运行中 await this.stateManager.updateExecutionStatus(executionId, 'running')

    // 执行工作流 const result = await this.executor.executeWorkflow(job.data, { onProgress: async (progress) => { // 实时更新进度 await this.stateManager.updateExecutionProgress(executionId, progress)

    // 发送进度到客户端(如果连接)
    if (context.clientId) {
      await this.sendProgressToClient(context.clientId, progress)
    }
       
    }, onNodeComplete: async (nodeResult) =

    { // 保存节点执行结果 await this.stateManager.saveNodeResult(executionId, nodeResult) } })

    // 更新最终状态 await this.stateManager.updateExecutionStatus(executionId, 'completed') await this.stateManager.saveExecutionResult(executionId, result)

    return { success: true, executionId, result, completedAt: new Date() } } catch (error) { // 记录错误 await this.stateManager.updateExecutionStatus(executionId, 'failed') await this.stateManager.saveExecutionError(executionId, error as Error)

    throw error } }

    /

  • 处理普通工作流 */ private async processWorkflow(job: Job): Promise { // 与实时工作流类似,但可能有不同的超时和重试策略 return this.processRealtimeWorkflow(job) }

    /

  • 处理定时任务工作流 */ private async processScheduledWorkflow(job: Job): Promise { const { executionId } = job.data

    // 检查是否应该跳过执行(例如,系统维护期间) const shouldSkip = await this.shouldSkipScheduledTask(job.data) if (shouldSkip) { await this.stateManager.updateExecutionStatus(executionId, 'skipped') return { success: true, executionId, skipped: true, reason: 'Scheduled task skipped due to system maintenance' } }

    return this.processWorkflow(job) }

    /

  • 处理后台工作流 */ private async processBackgroundWorkflow(job: Job): Promise { const { executionId } = job.data

    try { // 后台任务可能需要更长的超时时间 const result = await this.executor.executeWorkflow(job.data, { timeout: 30 60 1000, // 30分钟超时 onProgress: async (progress) => { await this.stateManager.updateExecutionProgress(executionId, progress) } })

    await this.stateManager.updateExecutionStatus(executionId, 'completed') return { success: true, executionId, result, completedAt: new Date() } } catch (error) { await this.stateManager.updateExecutionStatus(executionId, 'failed') throw error } }

    /

  • 获取工作流执行状态 */ async getExecutionStatus(executionId: string) { const context = await this.stateManager.getExecutionContext(executionId) const progress = await this.stateManager.getExecutionProgress(executionId) const result = await this.stateManager.getExecutionResult(executionId)

    return { context, progress, result } }

    /

  • 断线重连支持 */ async reconnectClient(clientId: string) { // 获取该客户端的所有活跃执行 const activeExecutions = await this.stateManager.getActiveExecutionsByClient(clientId)

    return activeExecutions.map(execution => ({ executionId: execution.executionId, status: execution.status, progress: execution.progress, startedAt: execution.createdAt })) }

    /

  • 取消工作流执行 */ async cancelExecution(executionId: string) { await this.stateManager.updateExecutionStatus(executionId, 'cancelled')

    // 尝试从队列中移除(如果还在队列中) // 这里需要根据具体的队列实现来处理 }

    /

  • 获取系统统计信息 */ async getSystemStats() { const queueStats = await Promise.all([ this.queueManager.getQueueStats('realtime'), this.queueManager.getQueueStats('workflow'), this.queueManager.getQueueStats('scheduled'), this.queueManager.getQueueStats('background') ])

    const executionStats = await this.stateManager.getExecutionStats()

    return { queues: { realtime: queueStats[0], workflow: queueStats[1], scheduled: queueStats[2], background: queueStats[3] }, executions: executionStats } }

    private async shouldSkipScheduledTask(jobData: WorkflowJobData): Promise { // 实现跳过逻辑,例如系统维护期间 return false }

    private async sendProgressToClient(clientId: string, progress: any) { // 实现客户端进度推送逻辑(WebSocket等) // 这里可以集成到API层 }

    /

  • 关闭引擎 */ async close() { await this.queueManager.close() await this.stateManager.close() } } ```

3. 状态持久化管理器

// packages/core/src/state/workflow-state-manager.ts
import mongoose from 'mongoose'

// 执行上下文Schema
const ExecutionContextSchema = new mongoose.Schema({
  executionId: { type: String, required: true, unique: true },
  workflowId: { type: String, required: true },
  userId: { type: String, required: true },
  clientId: { type: String }, // 用于断线重连
  status: { 
    type: String, 
    enum: ['pending', 'running', 'completed', 'failed', 'cancelled', 'skipped'],
    default: 'pending'
  },
  progress: {
    currentNode: String,
    completedNodes: [String],
    totalNodes: Number,
    percentage: { type: Number, default: 0 }
  },
  result: mongoose.Schema.Types.Mixed,
  error: {
    message: String,
    stack: String,
    timestamp: Date
  },
  metadata: mongoose.Schema.Types.Mixed,
  createdAt: { type: Date, default: Date.now },
  updatedAt: { type: Date, default: Date.now },
  completedAt: Date
})

// 节点执行结果Schema
const NodeResultSchema = new mongoose.Schema({
  executionId: { type: String, required: true },
  nodeId: { type: String, required: true },
  nodeName: String,
  nodeType: String,
  status: { type: String, enum: ['completed', 'failed', 'skipped'] },
  input: mongoose.Schema.Types.Mixed,
  output: mongoose.Schema.Types.Mixed,
  error: {
    message: String,
    stack: String
  },
  executionTime: Number, // 毫秒
  startedAt: Date,
  completedAt: Date
})

export class WorkflowStateManager {
  private ExecutionContext: mongoose.Model<any>
  private NodeResult: mongoose.Model<any>

  constructor(mongoConfig: any) {
    // 连接MongoDB
    mongoose.connect(mongoConfig.uri, mongoConfig.options)
    
    this.ExecutionContext = mongoose.model('ExecutionContext', ExecutionContextSchema)
    this.NodeResult = mongoose.model('NodeResult', NodeResultSchema)
  }

  /**
   * 保存执行上下文
   */
  async saveExecutionContext(context: WorkflowExecutionContext) {
    const doc = new this.ExecutionContext(context)
    return await doc.save()
  }

  /**
   * 更新执行状态
   */
  async updateExecutionStatus(executionId: string, status: string) {
    return await this.ExecutionContext.updateOne(
      { executionId },
      { 
        status, 
        updatedAt: new Date(),
        ...(status === 'completed' && { completedAt: new Date() })
      }
    )
  }

  /**
   * 更新执行进度
   */
  async updateExecutionProgress(executionId: string, progress: any) {
    return await this.ExecutionContext.updateOne(
      { executionId },
      { 
        progress, 
        updatedAt: new Date() 
      }
    )
  }

  /**
   * 保存执行结果
   */
  async saveExecutionResult(executionId: string, result: any) {
    return await this.ExecutionContext.updateOne(
      { executionId },
      { 
        result, 
        updatedAt: new Date() 
      }
    )
  }

  /**
   * 保存执行错误
   */
  async saveExecutionError(executionId: string, error: Error) {
    return await this.ExecutionContext.updateOne(
      { executionId },
      { 
        error: {
          message: error.message,
          stack: error.stack,
          timestamp: new Date()
        },
        updatedAt: new Date() 
      }
    )
  }

  /**
   * 保存节点执行结果
   */
  async saveNodeResult(executionId: string, nodeResult: any) {
    const doc = new this.NodeResult({
      executionId,
      ...nodeResult
    })
    return await doc.save()
  }

  /**
   * 获取执行上下文
   */
  async getExecutionContext(executionId: string) {
    return await this.ExecutionContext.findOne({ executionId }).lean()
  }

  /**
   * 获取执行进度
   */
  async getExecutionProgress(executionId: string) {
    const context = await this.ExecutionContext.findOne({ executionId }, 'progress').lean()
    return context?.progress
  }

  /**
   * 获取执行结果
   */
  async getExecutionResult(executionId: string) {
    const context = await this.ExecutionContext.findOne({ executionId }, 'result').lean()
    return context?.result
  }

  /**
   * 获取客户端的活跃执行
   */
  async getActiveExecutionsByClient(clientId: string) {
    return await this.ExecutionContext.find({
      clientId,
      status: { $in: ['pending', 'running'] }
    }).lean()
  }

  /**
   * 获取节点执行历史
   */
  async getNodeResults(executionId: string) {
    return await this.NodeResult.find({ executionId }).sort({ startedAt: 1 }).lean()
  }

  /**
   * 清理过期数据
   */
  async cleanupExpiredData(daysOld: number = 30) {
    const cutoffDate = new Date()
    cutoffDate.setDate(cutoffDate.getDate() - daysOld)

    // 清理执行上下文
    await this.ExecutionContext.deleteMany({
      status: { $in: ['completed', 'failed', 'cancelled'] },
      updatedAt: { $lt: cutoffDate }
    })

    // 清理节点结果
    await this.NodeResult.deleteMany({
      completedAt: { $lt: cutoffDate }
    })
  }

  /**
   * 获取执行统计
   */
  async getExecutionStats() {
    const stats = await this.ExecutionContext.aggregate([
      {
        $group: {
          _id: '$status',
          count: { $sum: 1 }
        }
      }
    ])

    return stats.reduce((acc, stat) => {
      acc[stat._id] = stat.count
      return acc
    }, {})
  }

  async close() {
    await mongoose.connection.close()
  }
}

4. API服务层

// packages/api/src/server.ts
import Fastify from 'fastify'
import { DistributedWorkflowEngine } from '@genn/workflow-core'
import { WorkflowController } from './controllers/workflow-controller'

const fastify = Fastify({ 
  logger: true,
  bodyLimit: 10485760 // 10MB
})

// 初始化工作流引擎
const workflowEngine = new DistributedWorkflowEngine(
  // Redis配置
  {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD,
    db: parseInt(process.env.REDIS_DB || '0')
  },
  // MongoDB配置
  {
    uri: process.env.MONGODB_URI || 'mongodb://localhost:27017/workflow',
    options: {}
  }
)

// 注册控制器
const workflowController = new WorkflowController(workflowEngine)

// 路由定义
fastify.register(async function (fastify) {
  // 提交工作流执行
  fastify.post('/api/workflow/execute', workflowController.executeWorkflow.bind(workflowController))
  
  // 获取执行状态
  fastify.get('/api/workflow/status/:executionId', workflowController.getExecutionStatus.bind(workflowController))
  
  // 断线重连
  fastify.get('/api/workflow/reconnect/:clientId', workflowController.reconnectClient.bind(workflowController))
  
  // 取消执行
  fastify.post('/api/workflow/cancel/:executionId', workflowController.cancelExecution.bind(workflowController))
  
  // 系统统计
  fastify.get('/api/workflow/stats', workflowController.getSystemStats.bind(workflowController))
  
  // 健康检查
  fastify.get('/health', async () => ({ status: 'ok', timestamp: new Date().toISOString() }))
})

// WebSocket支持(用于实时进度推送)
fastify.register(require('@fastify/websocket'))

fastify.register(async function (fastify) {
  fastify.get('/ws/:clientId', { websocket: true }, (connection, req) => {
    const clientId = (req.params as any).clientId
    
    connection.socket.on('message', async (message) => {
      // 处理客户端消息
      try {
        const data = JSON.parse(message.toString())
        
        if (data.type === 'subscribe') {
          // 订阅执行状态更新
          workflowController.subscribeToUpdates(clientId, connection.socket)
        }
      } catch (error) {
        connection.socket.send(JSON.stringify({
          type: 'error',
          message: 'Invalid message format'
        }))
      }
    })
    
    connection.socket.on('close', () => {
      // 清理订阅
      workflowController.unsubscribeFromUpdates(clientId)
    })
  })
})

// 启动服务器
const start = async () => {
  try {
    const port = parseInt(process.env.PORT || '3000')
    await fastify.listen({ port, host: '0.0.0.0' })
    console.log(`🚀 Workflow Engine API running on port ${port}`)
  } catch (err) {
    fastify.log.error(err)
    process.exit(1)
  }
}

// 优雅关闭
process.on('SIGTERM', async () => {
  console.log('Received SIGTERM, shutting down gracefully...')
  await workflowEngine.close()
  await fastify.close()
  process.exit(0)
})

start()

🐳 部署配置

1. Docker配置

# deploy/docker/Dockerfile.api
FROM node:18-alpine

WORKDIR /app

# 复制package.json和安装依赖
COPY package*.json ./
COPY packages/api/package*.json ./packages/api/
RUN npm ci --only=production

# 复制源代码
COPY packages/api ./packages/api
COPY packages/shared ./packages/shared

# 构建
RUN npm run build

EXPOSE 3000

CMD ["npm", "run", "start:api"]
# deploy/docker/Dockerfile.worker
FROM node:18-alpine

WORKDIR /app

# 复制依赖和构建worker
COPY package*.json ./
COPY packages/worker/package*.json ./packages/worker/
COPY packages/core/package*.json ./packages/core/
RUN npm ci --only=production

COPY packages/worker ./packages/worker
COPY packages/core ./packages/core
COPY packages/shared ./packages/shared

RUN npm run build

CMD ["npm", "run", "start:worker"]

2. Docker Compose配置

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    
  mongodb:
    image: mongo:7
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
      
  workflow-api:
    build:
      context: .
      dockerfile: deploy/docker/Dockerfile.api
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - MONGODB_URI=mongodb://admin:password@mongodb:27017/workflow?authSource=admin
    depends_on:
      - redis
      - mongodb
    restart: unless-stopped
    
  workflow-worker-1:
    build:
      context: .
      dockerfile: deploy/docker/Dockerfile.worker
    environment:
      - NODE_ENV=production
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - MONGODB_URI=mongodb://admin:password@mongodb:27017/workflow?authSource=admin
      - WORKER_ID=worker-1
    depends_on:
      - redis
      - mongodb
    restart: unless-stopped
    
  workflow-worker-2:
    build:
      context: .
      dockerfile: deploy/docker/Dockerfile.worker
    environment:
      - NODE_ENV=production
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - MONGODB_URI=mongodb://admin:password@mongodb:27017/workflow?authSource=admin
      - WORKER_ID=worker-2
    depends_on:
      - redis
      - mongodb
    restart: unless-stopped

  # Nginx负载均衡
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./deploy/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - workflow-api
    restart: unless-stopped

volumes:
  redis_data:
  mongo_data:

3. Kubernetes部署配置

# deploy/k8s/workflow-engine.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: workflow-api
  template:
    metadata:
      labels:
        app: workflow-api
    spec:
      containers:
      - name: workflow-api
        image: genn/workflow-engine-api:latest
        ports:
        - containerPort: 3000
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: MONGODB_URI
          valueFrom:
            secretKeyRef:
              name: mongodb-secret
              key: uri
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-worker
spec:
  replicas: 5
  selector:
    matchLabels:
      app: workflow-worker
  template:
    metadata:
      labels:
        app: workflow-worker
    spec:
      containers:
      - name: workflow-worker
        image: genn/workflow-engine-worker:latest
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: MONGODB_URI
          valueFrom:
            secretKeyRef:
              name: mongodb-secret
              key: uri
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
  name: workflow-api-service
spec:
  selector:
    app: workflow-api
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

🔄 与主系统集成方案

1. API集成

// 在主系统中的集成
class WorkflowEngineClient {
  private baseUrl: string
  private apiKey: string

  constructor(baseUrl: string, apiKey: string) {
    this.baseUrl = baseUrl
    this.apiKey = apiKey
  }

  async executeWorkflow(workflowDefinition: any, options: any) {
    const response = await fetch(`${this.baseUrl}/api/workflow/execute`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${this.apiKey}`
      },
      body: JSON.stringify({
        workflowDefinition,
        ...options
      })
    })

    return response.json()
  }

  async getExecutionStatus(executionId: string) {
    const response = await fetch(`${this.baseUrl}/api/workflow/status/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${this.apiKey}`
      }
    })

    return response.json()
  }

  // WebSocket连接用于实时更新
  connectToUpdates(clientId: string, onUpdate: Function) {
    const ws = new WebSocket(`${this.baseUrl.replace('http', 'ws')}/ws/${clientId}`)
    
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data)
      onUpdate(data)
    }

    return ws
  }
}

📊 性能与扩展性

预期性能指标

指标单实例多实例集群
并发执行100+1000+
队列吞吐1000 jobs/min10000+ jobs/min
响应时间<100ms<50ms
可用性99.5%99.9%

扩展策略

  • 水平扩展: 增加Worker实例数量
  • 垂直扩展: 增加单实例资源配置
  • 队列分片: 按业务类型分离队列
  • 数据库分片: 按时间或用户分片存储

🎯 实施计划

阶段一:基础架构 (2周)

  1. ✅ 项目结构搭建
  2. ✅ BullMQ队列系统
  3. ✅ 基础执行引擎
  4. ✅ 状态持久化

阶段二:核心功能 (2周)

  1. ✅ 分布式执行逻辑
  2. ✅ API服务层
  3. ✅ 断线重连机制
  4. ✅ 监控和统计

阶段三:部署和优化 (1周)

  1. ✅ Docker部署配置
  2. ✅ 性能测试和优化
  3. ✅ 文档和运维指南

这个方案将彻底解决当前的并发问题,提供企业级的工作流执行能力!

使用社交账号登录

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...