AI Agent 编排系统从线性流程到事件驱动架构的演进一、从单 Agent 到多 Agent 协作复杂任务的编排困境在 AI 产品的早期阶段我们使用简单的单 Agent 架构就能满足需求。用户提出一个问题Agent 调用一次 LLM 就给出答案。但随着功能不断丰富我们发现越来越多的任务需要多个步骤协同完成数据分析需要先查询数据库再进行计算最后生成图表客户支持需要先理解意图检索知识库再调用 CRM 系统。最初我们用简单的 Python 脚本串联这些步骤但很快就遇到了问题流程硬编码在代码中修改需要重新部署错误处理逻辑分散在各个步骤中难以维护没有可视化的流程设计工具产品经理无法参与流程设计。这些问题并非我们独有几乎所有从简单对话机器人向复杂工作流演进的团队都会遇到。技术如果不服务于真实的业务灵活性那只是僵硬的代码堆砌。我们需要一套可编排、可监控、可调试的 Agent 协作框架。二、事件驱动的 Agent 编排架构设计解耦与弹性的核心flowchart TB subgraph 接入层 A[用户请求] -- B[API Gateway] B -- C[事件总线] end subgraph 编排层 C -- D[Workflow Engine] D -- E[状态管理器] E -- F[任务调度器] end subgraph 执行层 F -- G[Agent 1] F -- H[Agent 2] F -- I[Agent 3] G -- J[工具调用] H -- J I -- J end subgraph 持久化层 K[状态存储] L[执行日志] M[监控指标] end E -- K G -- L H -- L I -- L G -- M H -- M I -- M2.1 工作流定义与 DSL 设计我们设计了一套简单的领域特定语言DSL来定义 Agent 工作流让产品经理和开发者都能参与流程设计。工作流由任务、条件分支、循环等基本元素组成from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Callable from enum import Enum import json class TaskType(Enum): AGENT agent TOOL tool CONDITION condition PARALLEL parallel LOOP loop dataclass class Task: id: str type: TaskType name: str config: Dict[str, Any] field(default_factorydict) dependencies: List[str] field(default_factorylist) on_error: Optional[str] None # 错误处理策略 dataclass class Workflow: id: str name: str tasks: List[Task] entry_point: str version: str 1.0 class WorkflowDefinitionLoader: 工作流定义加载器 staticmethod def from_dict(data: Dict[str, Any]) - Workflow: 从字典加载工作流定义 tasks [ Task( idt[id], typeTaskType(t[type]), namet[name], configt.get(config, {}), dependenciest.get(dependencies, []), on_errort.get(on_error) ) for t in data[tasks] ] return Workflow( iddata[id], namedata[name], taskstasks, entry_pointdata[entry_point], versiondata.get(version, 1.0) ) staticmethod def from_json(json_str: str) - Workflow: 从 JSON 字符串加载 return WorkflowDefinitionLoader.from_dict(json.loads(json_str)) # 工作流定义示例 workflow_def { id: data_analysis, name: 数据分析工作流, entry_point: query_data, tasks: [ { id: query_data, type: tool, name: 查询数据库, config: {tool: database_query}, dependencies: [] }, { id: analyze_data, type: agent, name: 数据分析 Agent, config: {agent_id: data_analyst}, dependencies: [query_data] }, { id: generate_chart, type: tool, name: 生成图表, config: {tool: chart_generator}, dependencies: [analyze_data] }, { id: generate_report, type: agent, name: 报告生成 Agent, config: {agent_id: report_writer}, dependencies: [generate_chart] } ] } workflow WorkflowDefinitionLoader.from_dict(workflow_def)2.2 状态机与持久化设计工作流执行是一个状态机每个任务都有明确的生命周期待执行、执行中、成功、失败、已取消。我们使用 Redis 缓存当前状态使用 PostgreSQL 持久化完整的执行历史import uuid from datetime import datetime from enum import Enum from dataclasses import dataclass, field from typing import Dict, Any, Optional class TaskStatus(Enum): PENDING pending RUNNING running SUCCESS success FAILED failed CANCELLED cancelled dataclass class TaskExecution: id: str task_id: str workflow_id: str execution_id: str status: TaskStatus input_data: Dict[str, Any] output_data: Optional[Dict[str, Any]] None error: Optional[str] None started_at: Optional[datetime] None completed_at: Optional[datetime] None dataclass class WorkflowExecution: id: str workflow_id: str status: TaskStatus tasks: Dict[str, TaskExecution] field(default_factorydict) input_data: Dict[str, Any] field(default_factorydict) output_data: Optional[Dict[str, Any]] None started_at: Optional[datetime] None completed_at: Optional[datetime] None class StateManager: 状态管理器负责工作流和任务状态的持久化 def __init__(self, redis_client, db_connection): self.redis redis_client self.db db_connection def create_workflow_execution(self, workflow_id: str, input_data: Dict[str, Any]) - str: 创建工作流执行实例 execution_id str(uuid.uuid4()) execution WorkflowExecution( idexecution_id, workflow_idworkflow_id, statusTaskStatus.PENDING, input_datainput_data, started_atdatetime.now() ) # 保存到 Redis 缓存 cache_key fworkflow:{execution_id} self.redis.setex(cache_key, 3600, json.dumps({ id: execution.id, workflow_id: execution.workflow_id, status: execution.status.value, started_at: execution.started_at.isoformat() })) return execution_id def update_task_status(self, execution_id: str, task_id: str, status: TaskStatus, output: Optional[Dict] None, error: Optional[str] None): 更新任务状态 # 更新缓存 cache_key fworkflow:{execution_id}:task:{task_id} update_data { status: status.value, updated_at: datetime.now().isoformat() } if output: update_data[output] output if error: update_data[error] error self.redis.hset(cache_key, mappingupdate_data) def get_ready_tasks(self, execution_id: str, workflow: Workflow) - List[Task]: 获取可以执行的任务所有依赖已完成 ready_tasks [] for task in workflow.tasks: # 检查任务是否已处理过 cache_key fworkflow:{execution_id}:task:{task.id} if self.redis.exists(cache_key): task_data self.redis.hgetall(cache_key) status TaskStatus(task_data.get(bstatus, bpending).decode()) if status ! TaskStatus.PENDING: continue # 检查所有依赖是否已成功完成 all_deps_ready True for dep_id in task.dependencies: dep_key fworkflow:{execution_id}:task:{dep_id} if not self.redis.exists(dep_key): all_deps_ready False break dep_data self.redis.hgetall(dep_key) dep_status TaskStatus(dep_data.get(bstatus, bpending).decode()) if dep_status ! TaskStatus.SUCCESS: all_deps_ready False break if all_deps_ready: ready_tasks.append(task) return ready_tasks2.3 事件总线与任务调度我们使用事件总线来解耦工作流引擎和执行器import asyncio from typing import Dict, Any, Callable from abc import ABC, abstractmethod class Event(ABC): 事件基类 abstractmethod def to_dict(self) - Dict[str, Any]: pass class TaskScheduledEvent(Event): 任务调度事件 def __init__(self, execution_id: str, task: Task, input_data: Dict[str, Any]): self.execution_id execution_id self.task task self.input_data input_data def to_dict(self) - Dict[str, Any]: return { type: task_scheduled, execution_id: self.execution_id, task_id: self.task.id, task_type: self.task.type.value, input_data: self.input_data } class TaskCompletedEvent(Event): 任务完成事件 def __init__(self, execution_id: str, task_id: str, output_data: Dict[str, Any]): self.execution_id execution_id self.task_id task_id self.output_data output_data def to_dict(self) - Dict[str, Any]: return { type: task_completed, execution_id: self.execution_id, task_id: self.task_id, output_data: self.output_data } class EventBus: 事件总线 def __init__(self): self.subscribers: Dict[str, List[Callable]] {} def subscribe(self, event_type: str, handler: Callable): 订阅事件 if event_type not in self.subscribers: self.subscribers[event_type] [] self.subscribers[event_type].append(handler) async def publish(self, event: Event): 发布事件 event_dict event.to_dict() event_type event_dict[type] if event_type in self.subscribers: for handler in self.subscribers[event_type]: await handler(event_dict) class WorkflowEngine: 工作流引擎 def __init__(self, event_bus: EventBus, state_manager: StateManager): self.event_bus event_bus self.state_manager state_manager # 订阅任务完成事件 self.event_bus.subscribe(task_completed, self.on_task_completed) async def start_workflow(self, workflow: Workflow, input_data: Dict[str, Any]) - str: 启动工作流 execution_id self.state_manager.create_workflow_execution( workflow.id, input_data ) # 调度初始任务 await self.schedule_ready_tasks(execution_id, workflow, input_data) return execution_id async def schedule_ready_tasks(self, execution_id: str, workflow: Workflow, context: Dict[str, Any]): 调度可执行的任务 ready_tasks self.state_manager.get_ready_tasks(execution_id, workflow) for task in ready_tasks: # 更新任务状态为运行中 self.state_manager.update_task_status( execution_id, task.id, TaskStatus.RUNNING ) # 发布任务调度事件 event TaskScheduledEvent(execution_id, task, context) await self.event_bus.publish(event) async def on_task_completed(self, event_dict: Dict[str, Any]): 处理任务完成事件 execution_id event_dict[execution_id] task_id event_dict[task_id] output_data event_dict[output_data] # 更新任务状态 self.state_manager.update_task_status( execution_id, task_id, TaskStatus.SUCCESS, outputoutput_data ) # 继续调度后续任务 # 这里需要加载工作流定义并获取上下文 # 省略具体实现三、条件分支与并行执行复杂流程的灵活控制3.1 条件分支实现条件分支让工作流可以根据中间结果选择不同的执行路径class ConditionEvaluator: 条件评估器 staticmethod def evaluate(condition: str, context: Dict[str, Any]) - bool: 评估条件表达式 支持简单的比较运算: , !, , , , 支持逻辑运算: and, or, not # 这里使用安全的表达式评估 # 生产环境中应该使用更严格的解析器 try: # 创建安全的全局命名空间 safe_globals { __builtins__: {}, True: True, False: False, None: None } # 将上下文注入局部命名空间 safe_locals context.copy() # 安全评估 result eval(condition, safe_globals, safe_locals) return bool(result) except Exception as e: print(f条件评估失败: {e}) return False # 使用示例 context {data_count: 1500, has_error: False} condition data_count 1000 and not has_error result ConditionEvaluator.evaluate(condition, context) print(f条件 {condition} 评估结果: {result})3.2 并行任务执行对于可以并行处理的任务我们支持并行执行以提高效率class ParallelTaskExecutor: 并行任务执行器 def __init__(self, max_concurrency: int 5): self.max_concurrency max_concurrency self.semaphore asyncio.Semaphore(max_concurrency) async def execute_task(self, task: Task, input_data: Dict[str, Any]) - Dict[str, Any]: 执行单个任务带并发控制 async with self.semaphore: # 实际任务执行逻辑 print(f执行任务: {task.name}) await asyncio.sleep(1) # 模拟执行时间 return {result: f{task.name} 执行完成} async def execute_parallel(self, tasks: List[Task], input_data: Dict[str, Any]) - Dict[str, Dict[str, Any]]: 并行执行多个任务 coroutines [ self.execute_task(task, input_data) for task in tasks ] results await asyncio.gather(*coroutines, return_exceptionsTrue) # 整理结果 output {} for task, result in zip(tasks, results): if isinstance(result, Exception): output[task.id] {error: str(result)} else: output[task.id] result return output四、可观测性与调试工具生产环境的必备设施4.1 完整的执行日志与追踪每个工作流执行都会生成详细的日志包括任务的输入输出、执行时间、错误信息等import logging from datetime import datetime class WorkflowLogger: 工作流执行日志记录器 def __init__(self, log_directory: str ./workflow_logs): self.log_directory log_directory import os os.makedirs(log_directory, exist_okTrue) # 配置日志 self.logger logging.getLogger(workflow_engine) self.logger.setLevel(logging.DEBUG) # 文件处理器 file_handler logging.FileHandler( f{log_directory}/workflow_{datetime.now().strftime(%Y%m%d)}.log ) file_handler.setLevel(logging.DEBUG) # 控制台处理器 console_handler logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式化 formatter logging.Formatter( %(asctime)s - %(name)s - %(levelname)s - %(message)s ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def log_task_start(self, execution_id: str, task_id: str, task_name: str, input_data: Dict): 记录任务开始 self.logger.info( f[Execution: {execution_id}] Task {task_name} ({task_id}) started ) self.logger.debug(fInput data: {json.dumps(input_data, ensure_asciiFalse)}) def log_task_complete(self, execution_id: str, task_id: str, task_name: str, output_data: Dict, duration_seconds: float): 记录任务完成 self.logger.info( f[Execution: {execution_id}] Task {task_name} ({task_id}) fcompleted in {duration_seconds:.2f}s ) self.logger.debug(fOutput data: {json.dumps(output_data, ensure_asciiFalse)}) def log_task_error(self, execution_id: str, task_id: str, task_name: str, error: Exception): 记录任务错误 self.logger.error( f[Execution: {execution_id}] Task {task_name} ({task_id}) ffailed: {str(error)} )4.2 可视化的执行追踪界面为了帮助开发者调试工作流我们构建了可视化界面可以查看工作流的执行进度检查每个任务的输入输出回放执行过程分析性能瓶颈五、架构权衡与适用边界编排系统的选型考量架构模式优点缺点适用场景线性脚本简单直接难以维护、扩展性差简单、固定流程事件驱动解耦、高弹性复杂度高复杂、动态流程状态机清晰、可控状态定义繁琐有明确状态转换的场景4.1 何时需要编排系统构建编排系统有一定的复杂度不是所有场景都需要。建议在以下情况下考虑工作流步骤超过 5 个需要支持条件分支和并行执行工作流需要频繁变更希望非技术人员能参与需要完整的执行历史和监控如果只是简单的 2-3 个步骤用普通的函数调用可能更合适。4.2 与现有框架的对比目前有一些开源的 Agent 编排框架如 LangGraph、AutoGen 等。我们最终选择自己构建主要是因为需要与现有的基础设施深度集成对性能和延迟有较高要求需要支持特定的业务规则但对于大多数团队来说先评估现有框架是否满足需求是更高效的选择。五、总结AI Agent 编排系统是复杂业务流程的核心基础设施。从简单的线性脚本到事件驱动架构我们经历了多次迭代最终构建了一套灵活、可靠、可观测的编排系统。这套系统的核心要素包括声明式的工作流定义 DSL、基于状态机的执行引擎、事件驱动的任务调度、完整的可观测性工具。在选择技术方案时要根据业务复杂度和团队能力做出合适的权衡不要过度设计。对于 AI 创业公司来说灵活的编排系统是快速迭代产品的关键。它让我们可以将复杂的 AI 能力封装成可重用的模块通过可视化的方式组合出新的业务流程大大加速了产品创新的速度。