前言
在构建复杂 Agent 系统时,选择合适的架构模式至关重要。本章深入讲解三种核心架构模式:Handoffs、Fan-out 和 Supervisor,并扩展讨论 Single Agent vs Multi-Agent 的选型决策、工具增强与知识增强的架构差异,以及记忆系统的架构模式。
一、Handoffs 交接模式
1.1 什么是 Handoffs
Handoffs 允许 Agent 将任务转交给更专业的 Agent:
# OpenAI Agents SDK 中的 Handoffs 示例from agents import Agent, handoff
research_agent = Agent(name="researcher", ...)coding_agent = Agent(name="coder", ...)review_agent = Agent(name="reviewer", ...)
# 定义交接函数def research_handoff(message: str): return research_agent
def code_handoff(message: str): return coding_agent
# 使用 handofforchestrator = Agent( name="orchestrator", handoffs=[research_handoff, code_handoff])1.2 交接执行流程
1.3 Handoffs 的使用场景
| 场景 | 说明 |
|---|---|
| 专业分工 | 编码 Agent vs 审查 Agent |
| 路由选择 | 根据意图选择不同 Agent |
| 任务分解 | 复杂任务交给专业处理 |
1.4 交接注意事项
# Handoffs 需要传递上下文def create_handoff_with_context(agent, context): """交接时带上关键上下文信息""" def handoff_fn(message: str): # 将上下文注入消息 enhanced_message = f"{context}\n\n用户请求: {message}" return agent return handoff_fn1.5 Handoffs 的上下文管理策略
交接过程中上下文丢失是最常见的问题。以下是几种实用的上下文管理策略:
from dataclasses import dataclass, fieldfrom typing import Any
@dataclassclass HandoffContext: """交接上下文容器""" original_query: str accumulated_findings: list[str] = field(default_factory=list) tool_results: dict[str, Any] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict) parent_agent: str = "" depth: int = 0 # 防止无限递归交接
def add_finding(self, finding: str): self.accumulated_findings.append(finding)
def to_prompt_segment(self) -> str: findings_text = "\n".join(f"- {f}" for f in self.accumulated_findings) return f"""<交接上下文>原始请求: {self.original_query}已有发现:{findings_text}深度: {self.depth}</交接上下文>"""深度限制防止无限交接循环:
MAX_HANDOFF_DEPTH = 3
def safe_handoff(context: HandoffContext, target_agent: Agent) -> Any: if context.depth >= MAX_HANDOFF_DEPTH: return "已达到最大交接深度,返回当前结果。"
new_context = HandoffContext( original_query=context.original_query, accumulated_findings=context.accumulated_findings.copy(), depth=context.depth + 1, parent_agent=context.parent_agent, ) return target_agent.run(new_context)二、Fan-out/Fan-in 并行模式
2.1 原理
Fan-out 将任务分发给多个 Agent,Fan-in 将结果汇总:
# Fan-out/Fan-in 示例async def parallel_research(query: str, sources: list): # Fan-out: 分发任务给多个研究 Agent tasks = [] for source in sources: task = research_agent.run(f"研究 {source} 的 {query}") tasks.append(task)
# Fan-in: 汇总结果 results = await asyncio.gather(*tasks) synthesis = synthesis_agent.run(f"汇总: {results}")
return synthesis2.2 执行流程
2.3 超时与错误处理
import asynciofrom tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))async def fan_out_with_retry(query: str, sources: list): try: tasks = [ research_agent.run(query, source=source) for source in sources ] results = await asyncio.wait_for( asyncio.gather(*tasks, return_exceptions=True), timeout=30.0 ) # 过滤错误结果 valid_results = [r for r in results if not isinstance(r, Exception)] return synthesis_agent.run(valid_results) except asyncio.TimeoutError: return "部分研究超时,返回已有结果"2.4 Fan-in 的汇总策略
多路结果汇总不只是简单拼接,需要根据任务特点选择不同的合并策略:
from enum import Enumfrom typing import Any
class MergeStrategy(Enum): VOTE = "vote" # 多数投票 RANK = "rank" # 排序取最优 INTERSECT = "intersect" # 取交集 UNION = "union" # 取并集 SUMMARIZE = "summarize" # LLM 摘要合并
async def fan_in(results: list[Any], strategy: MergeStrategy) -> Any: """根据策略合并并行结果""" if strategy == MergeStrategy.VOTE: # 投票策略:多数一致的答案胜出 from collections import Counter counter = Counter(str(r) for r in results) return counter.most_common(1)[0][0]
elif strategy == MergeStrategy.RANK: # 排序策略:用 LLM 评估每个结果质量 scored = [] for r in results: score = await judge_quality(r) scored.append((score, r)) scored.sort(reverse=True) return scored[0][1]
elif strategy == MergeStrategy.SUMMARIZE: # 摘要策略:让 LLM 合并多个来源的信息 combined = "\n\n---\n\n".join(str(r) for r in results) return await synthesis_llm.complete( f"将以下多路研究结果合并为一份完整报告:\n{combined}" )
elif strategy == MergeStrategy.INTERSECT: # 交集策略:只保留所有来源一致认可的信息 sets = [set(normalize(r)) for r in results] return set.intersection(*sets)
elif strategy == MergeStrategy.UNION: # 并集策略:汇总所有来源的独特信息 sets = [set(normalize(r)) for r in results] return set.union(*sets)三、Supervisor 监督模式
3.1 原理
Supervisor 模式中,一个协调者决定调用哪个子 Agent:
from enum import Enum
class Supervisor: def __init__(self): self.agents = { "research": research_agent, "code": coding_agent, "review": review_agent } self.supervisor_llm = supervisor_llm
async def route(self, message: str) -> str: """LLM 决定调用哪个 Agent""" decision = await self.supervisor_llm.complete( f"决定使用哪个 Agent: {message}" ) return decision.agent_choice
async def run(self, message: str): agent_name = await self.route(message) agent = self.agents[agent_name] return await agent.run(message)3.2 流程图
3.3 带状态追踪的 Supervisor
生产环境中 Supervisor 需要追踪任务进度,防止重复调用和遗漏步骤:
from dataclasses import dataclass, fieldfrom datetime import datetime
@dataclassclass TaskState: task_id: str original_query: str status: str = "pending" # pending / in_progress / completed / failed assigned_agents: list[str] = field(default_factory=list) results: dict[str, Any] = field(default_factory=dict) created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now)
class StatefulSupervisor: def __init__(self): self.agents: dict[str, Agent] = {} self.active_tasks: dict[str, TaskState] = {}
async def run(self, query: str) -> Any: task_id = generate_task_id() state = TaskState(task_id=task_id, original_query=query) self.active_tasks[task_id] = state
# Supervisor 决策循环 for step in range(MAX_STEPS): state.status = "in_progress" state.updated_at = datetime.now()
# 决定下一步 next_action = await self._decide(state)
if next_action.action == "complete": state.status = "completed" return self._compile_result(state)
# 分配给子 Agent agent_name = next_action.agent_name state.assigned_agents.append(agent_name)
try: result = await self.agents[agent_name].run( next_action.prompt, context=state.results ) state.results[agent_name] = result except Exception as e: state.results[agent_name] = f"Error: {str(e)}"
state.status = "failed" return "达到最大步骤数,任务未完成。"四、模式对比
| 模式 | 适用场景 | 复杂度 | 并行度 |
|---|---|---|---|
| Handoffs | 专业分工明确 | 中 | 串行 |
| Fan-out/Fan-in | 任务可分解、独立 | 高 | 完全并行 |
| Supervisor | 需要智能路由 | 中 | 串行 |
五、组合模式实战
5.1 复杂工作流
async def complex_workflow(user_request: str): # 1. Supervisor 决定任务类型 task_type = await supervisor.route(user_request)
# 2. 根据类型选择模式 if task_type == "research_report": # Fan-out 研究 + Handoffs 给不同专家 results = await fan_out_research(user_request) return await handoff_to_report_agent(results)
elif task_type == "code_task": # Handoffs 给编码 Agent return await handoff_to_coder(user_request)5.2 多级 Supervisor
5.3 组合模式的工程化考量
组合多种模式时,需要特别注意以下工程细节:
@dataclassclass WorkflowConfig: """工作流配置""" max_parallel_agents: int = 5 # 最大并行 Agent 数 handoff_timeout_seconds: float = 30 # 交接超时 supervisor_max_retries: int = 3 # Supervisor 重试次数 enable_tracing: bool = True # 是否开启追踪 cost_budget_usd: float = 1.0 # 单次工作流成本预算
class HybridWorkflow: """混合工作流引擎:组合 Handoffs + Fan-out + Supervisor"""
def __init__(self, config: WorkflowConfig): self.config = config self.semaphore = asyncio.Semaphore(config.max_parallel_agents) self.tracer = get_tracer() if config.enable_tracing else None
async def execute(self, request: str) -> dict: total_cost = 0.0
# Phase 1: Supervisor 分类 with self._trace("supervisor_classify"): classification = await supervisor.classify(request)
# Phase 2: 根据分类选择执行策略 if classification.type == "parallel_research": results = await self._fan_out_phase(classification.subtasks) elif classification.type == "sequential_pipeline": results = await self._handoff_phase(classification.pipeline) else: results = await self._single_agent_phase(classification.agent, request)
# Phase 3: 结果验证 with self._trace("validate"): validated = await self._validate_results(results)
return {"results": validated, "cost_usd": total_cost}
async def _fan_out_phase(self, subtasks: list) -> list: async def run_with_semaphore(task): async with self.semaphore: return await agent.run(task)
tasks = [run_with_semaphore(t) for t in subtasks] raw_results = await asyncio.gather(*tasks, return_exceptions=True) return [r for r in raw_results if not isinstance(r, Exception)]
async def _handoff_phase(self, pipeline: list) -> list: results = [] context = "" for step in pipeline: agent = self.agents[step["agent"]] result = await agent.run(step["prompt"], context=context) context += f"\n{result}" results.append(result) return results六、Single Agent vs Multi-Agent 决策框架
6.1 决策树
不是所有场景都需要 Multi-Agent。Single Agent 适合简单任务,Multi-Agent 适合复杂协作。以下是决策框架:
6.2 选型对比
| 维度 | Single Agent | Multi-Agent |
|---|---|---|
| 开发复杂度 | 低 | 高 |
| 调试难度 | 低 | 高 |
| Token 成本 | 低 | 高(多次系统提示词) |
| 任务完成质量 | 简单任务好,复杂一般 | 复杂任务好 |
| 并行能力 | 无 | 有 |
| 适用场景 | 单一领域、步骤少于 5 | 多领域、需要协作 |
6.3 实际选型示例
class ArchitectureSelector: """根据任务特征自动选择架构"""
def select(self, task_description: str) -> str: analysis = self._analyze(task_description)
# 规则 1:步骤少于 3 且领域单一 → Single Agent if analysis.step_count <= 3 and analysis.domain_count == 1: return "single_agent"
# 规则 2:需要搜索+编码+审查 → Supervisor if analysis.requires_domains(["research", "coding", "review"]): return "supervisor"
# 规则 3:多个独立数据源 → Fan-out if analysis.parallel_data_sources > 1: return "fan_out_fan_in"
# 规则 4:线性流程且步骤明确 → Handoffs if analysis.is_sequential and analysis.step_count > 3: return "handoffs"
return "supervisor" # 默认用 Supervisor七、ReAct vs Plan-and-Execute vs Reflection 架构
7.1 三种核心推理架构对比
前面介绍了通信模式(Handoffs、Fan-out、Supervisor),这里补充三种推理架构。它们决定了 Agent 内部的思考方式。
| 架构 | 核心思想 | 优势 | 劣势 |
|---|---|---|---|
| ReAct | 边想边做,交替推理行动 | 灵活、实时 | 可能陷入循环 |
| Plan-Execute | 先规划再执行 | 全局视野 | 计划可能过时 |
| Reflection | 执行后自我反思改进 | 持续优化 | 额外 Token |
7.2 ReAct 架构实现
from typing import TypedDict
class ReActState(TypedDict): thought: str action: str action_input: dict observation: str
REACT_PROMPT = """你是一个使用 ReAct 范式的智能助手。
可用工具:{tool_descriptions}
严格按照以下格式回答:Thought: 思考当前状况和下一步Action: 工具名称Action Input: {{"param": "value"}}Observation: (工具返回结果)... (可以重复多次 Thought/Action/Observation)Thought: 我现在知道最终答案了Final Answer: 最终答案
注意: 最多执行 {max_steps} 步。
Question: {question}"""
async def react_loop(question: str, tools: dict, max_steps: int = 5) -> str: """ReAct 推理循环""" messages = [ {"role": "system", "content": REACT_PROMPT.format( tool_descriptions=format_tools(tools), max_steps=max_steps, question=question )} ]
for step in range(max_steps): response = await llm.complete(messages) messages.append({"role": "assistant", "content": response})
# 解析 Action action, action_input = parse_action(response)
if action == "Final Answer": return parse_final_answer(response)
# 执行工具 if action in tools: observation = await tools[action](**action_input) else: observation = f"错误: 工具 {action} 不存在"
messages.append({"role": "user", "content": f"Observation: {observation}"})
return "达到最大步数限制,未能完成任务。"7.3 Plan-and-Execute 架构实现
@dataclassclass PlanStep: step_id: int description: str tool: str | None = None tool_input: dict | None = None status: str = "pending" # pending / running / done / failed result: Any = None
@dataclassclass Plan: goal: str steps: list[PlanStep] current_step: int = 0
PLAN_PROMPT = """为以下任务制定执行计划。
任务: {goal}
可用工具: {tools}
输出 JSON 格式的计划:{{ "steps": [ {{"description": "步骤描述", "tool": "工具名", "tool_input": {{"param": "value"}}}}, ... ]}}"""
async def plan_and_execute(goal: str, tools: dict) -> str: """Plan-and-Execute 执行器"""
# Phase 1: 制定计划 plan_response = await planner_llm.complete( PLAN_PROMPT.format(goal=goal, tools=format_tools(tools)) ) plan = parse_plan(plan_response)
# Phase 2: 逐步执行 for step in plan.steps: step.status = "running"
try: if step.tool and step.tool in tools: step.result = await tools[step.tool](**step.tool_input) else: # 无工具的推理步骤 step.result = await executor_llm.complete( f"任务: {step.description}\n上下文: {get_previous_results(plan)}" ) step.status = "done" except Exception as e: step.status = "failed" step.result = str(e)
# 重新规划 new_plan = await replan(plan, str(e)) plan.steps.extend(new_plan.steps)
# Phase 3: 汇总结果 return await synthesize(plan)7.4 Reflection(自我反思)架构
REFLECT_PROMPT = """你刚完成了以下任务:
任务: {task}执行过程: {trajectory}最终结果: {result}
请反思:1. 结果是否完整准确地完成了任务?2. 有没有更高效的方法?3. 中间步骤有没有可以优化的地方?
给出改进建议和评分 (0-10)。"""
async def reflective_agent(task: str, max_iterations: int = 3) -> str: """带反思的 Agent""" best_result = None best_score = 0
for i in range(max_iterations): # 执行任务 result = await execute_agent(task, previous_reflection=best_result)
# 反思评估 reflection = await reflector_llm.complete( REFLECT_PROMPT.format( task=task, trajectory=get_trajectory(), result=result ) ) score = parse_score(reflection)
if score > best_score: best_score = score best_result = result
# 如果分数够高,提前结束 if score >= 8.0: break
return best_result7.5 架构选择流程
八、工具增强 vs 知识增强架构
8.1 两种增强路径
Agent 的能力增强有两个方向:工具增强(Tool-augmented)和知识增强(Knowledge-augmented)。
8.2 工具增强架构详解
工具增强让 Agent 能够执行实际操作,适合需要与外部系统交互的场景:
from langchain.tools import toolfrom langchain.agents import create_react_agent
@tooldef search_web(query: str) -> str: """搜索互联网获取最新信息""" return web_search_api(query)
@tooldef execute_python(code: str) -> str: """执行 Python 代码并返回结果""" return sandbox.run(code, timeout=30)
@tooldef query_database(sql: str) -> str: """查询数据库""" # 安全检查:只允许 SELECT if not sql.strip().upper().startswith("SELECT"): return "错误: 只允许 SELECT 查询" return db.execute(sql)
@tooldef send_email(to: str, subject: str, body: str) -> str: """发送邮件""" return email_service.send(to, subject, body)
# 组合为工具增强 Agenttools = [search_web, execute_python, query_database, send_email]tool_agent = create_react_agent(llm, tools, prompt)8.3 知识增强架构详解
知识增强让 Agent 能够利用私有数据,适合需要专业领域知识的场景:
from langchain.vectorstores import Chromafrom langchain.embeddings import OpenAIEmbeddings
class KnowledgeAugmentedAgent: """知识增强 Agent"""
def __init__(self, knowledge_sources: list[str]): # 构建向量索引 self.vectorstore = Chroma.from_documents( documents=load_documents(knowledge_sources), embedding=OpenAIEmbeddings() )
async def answer(self, query: str) -> str: # 1. 检索相关知识 docs = self.vectorstore.similarity_search(query, k=5) context = format_documents(docs)
# 2. 基于知识生成回答 response = await llm.complete(f"""基于以下知识回答问题。如果知识中没有相关信息,请明确说明。
知识:{context}
问题: {query}""") return response8.4 混合架构:工具 + 知识
生产环境通常需要两者结合:
class HybridAgent: """工具 + 知识混合增强"""
def __init__(self): self.vectorstore = Chroma(...) # 知识库 self.tools = [search, calculator] # 工具集
async def process(self, query: str) -> str: # Step 1: 先查知识库 relevant_docs = self.vectorstore.similarity_search(query, k=3) knowledge_context = format_documents(relevant_docs)
# Step 2: 检查知识库是否足够 coverage = await assess_coverage(query, knowledge_context)
if coverage.score >= 0.8: # 知识库已覆盖,直接回答 return await self._answer_from_knowledge(query, knowledge_context) else: # 知识不足,使用工具补充 return await self._answer_with_tools(query, knowledge_context)8.5 选型建议
| 场景 | 增强方式 | 典型实现 |
|---|---|---|
| 客服问答 | 知识增强 | RAG + FAQ 库 |
| 数据分析 | 工具增强 | SQL + Python 执行器 |
| 研究助手 | 混合 | 搜索 + 文献库 |
| 代码助手 | 工具增强 | 代码执行 + Git 操作 |
| 合规审查 | 知识增强 | 法规向量库 + 规则引擎 |
九、记忆架构模式
9.1 三层记忆架构
Agent 的记忆系统通常分为三层,每层有不同的存储介质和访问模式:
9.2 工作记忆实现
工作记忆就是当前对话的上下文窗口,直接放在 LLM 的 prompt 中:
class WorkingMemory: """工作记忆:管理当前对话的上下文窗口"""
def __init__(self, max_tokens: int = 4000): self.messages: list[dict] = [] self.max_tokens = max_tokens
def add(self, role: str, content: str): self.messages.append({"role": role, "content": content}) self._compress_if_needed()
def get_context(self) -> list[dict]: return self.messages
def _compress_if_needed(self): """当 Token 超限时压缩历史""" total = count_tokens(self.messages) if total <= self.max_tokens: return
# 策略:保留 system + 最近 N 轮 + 旧消息摘要 system = [m for m in self.messages if m["role"] == "system"] recent = self.messages[-6:] # 最近 3 轮
old = [m for m in self.messages if m not in system and m not in recent] if old: summary = summarize_messages(old) self.messages = system + [{"role": "system", "content": f"历史摘要: {summary}"}] + recent9.3 短期记忆实现
短期记忆跨对话轮次保持信息,使用外部存储:
import redisimport json
class ShortTermMemory: """短期记忆:会话级别的信息存储"""
def __init__(self, session_id: str, ttl: int = 3600): self.client = redis.Redis() self.session_id = session_id self.ttl = ttl
def save(self, key: str, value: Any): """保存会话级信息""" full_key = f"session:{self.session_id}:{key}" self.client.setex(full_key, self.ttl, json.dumps(value))
def load(self, key: str) -> Any | None: """读取会话级信息""" full_key = f"session:{self.session_id}:{key}" data = self.client.get(full_key) return json.loads(data) if data else None
def save_user_preference(self, preference: dict): """保存用户在当前会话中表达的偏好""" existing = self.load("preferences") or {} existing.update(preference) self.save("preferences", existing)9.4 长期记忆实现
长期记忆使用向量数据库存储,支持语义检索:
from datetime import datetime
class LongTermMemory: """长期记忆:持久化的知识和经验"""
def __init__(self, vectorstore): self.vectorstore = vectorstore
async def store(self, content: str, metadata: dict | None = None): """存储一条长期记忆""" memory_entry = { "content": content, "timestamp": datetime.now().isoformat(), "access_count": 0, **(metadata or {}) } self.vectorstore.add_documents([memory_entry])
async def recall(self, query: str, k: int = 5) -> list[dict]: """语义检索相关记忆""" results = self.vectorstore.similarity_search(query, k=k) # 更新访问计数(越常访问的记忆越重要) for doc in results: doc.metadata["access_count"] += 1 doc.metadata["last_accessed"] = datetime.now().isoformat() return results
async def consolidate(self): """记忆巩固:合并相似记忆,删除过时记忆""" all_memories = self.vectorstore.get_all()
# 找出相似度很高的记忆对 merged = set() for i, m1 in enumerate(all_memories): if i in merged: continue for j, m2 in enumerate(all_memories[i+1:], i+1): if j in merged: continue similarity = compute_similarity(m1, m2) if similarity > 0.95: # 合并为一条更强的记忆 combined = merge_memories(m1, m2) await self.store(combined.content, combined.metadata) merged.add(i) merged.add(j)
# 删除过时记忆(超过 90 天且访问次数为 0) cutoff = datetime.now() - timedelta(days=90) for memory in all_memories: if (parse_date(memory.metadata.get("timestamp")) < cutoff and memory.metadata.get("access_count", 0) == 0): self.vectorstore.delete(memory.id)9.5 情景记忆模式
情景记忆存储具体的事件和经历,支持按时间和情境检索:
@dataclassclass EpisodicMemory: """情景记忆:记录具体事件""" event: str # 事件描述 timestamp: datetime # 发生时间 context: str # 上下文 outcome: str # 结果 emotional_valence: float # 情感正负值 (-1 to 1) lessons: list[str] # 从中学到的教训
class EpisodicMemoryStore: def __init__(self, vectorstore): self.vectorstore = vectorstore
async def record_episode(self, event: EpisodicMemory): """记录一次经历""" doc = Document( page_content=f"{event.event} -> {event.outcome}", metadata={ "timestamp": event.timestamp.isoformat(), "lessons": event.lessons, "valence": event.emotional_valence, } ) self.vectorstore.add_documents([doc])
async def recall_similar_episodes(self, current_situation: str, k: int = 3): """回忆类似的经历,避免重蹈覆辙""" results = self.vectorstore.similarity_search( current_situation, k=k ) # 优先返回负面经历的教训(避免犯同样错误) results.sort(key=lambda x: x.metadata.get("valence", 0)) return results十、真实场景架构示例
10.1 客服系统架构
class CustomerSupportArchitecture: """客服系统架构"""
def __init__(self): self.router = IntentRouter() self.kb_agent = KnowledgeBaseAgent(vectorstore=faq_store) self.ops_agent = OperationsAgent(systems=[ticket_system, refund_api]) self.complaint_agent = ComplaintAgent(escalation=human_queue)
async def handle(self, message: str, user_id: str) -> str: # 路由 intent = await self.router.classify(message)
if intent.type == "inquiry": result = await self.kb_agent.answer(message, user_id) elif intent.type == "operation": result = await self.ops_agent.process(message, user_id) elif intent.type == "complaint": result = await self.complaint_agent.handle(message, user_id) else: result = await self.kb_agent.answer(message, user_id)
# 验证 if result.confidence < 0.7: return "让我帮您转接人工客服..." return result.response10.2 研究助手架构
class ResearchAssistantArchitecture: """研究助手:Fan-out + Supervisor 混合架构"""
def __init__(self): self.supervisor = ResearchSupervisor() self.search_agent = SearchAgent() self.arxiv_agent = ArxivAgent() self.analysis_agent = AnalysisAgent() self.writing_agent = WritingAgent()
async def research(self, topic: str, depth: str = "medium") -> str: # Phase 1: Fan-out 搜索 search_tasks = [ self.search_agent.search(topic, source="web"), self.search_agent.search(topic, source="news"), self.arxiv_agent.search(topic), ] raw_results = await asyncio.gather(*search_tasks)
# Phase 2: Supervisor 筛选 filtered = await self.supervisor.filter_relevant(raw_results, threshold=0.6)
# Phase 3: 分析 analysis = await self.analysis_agent.analyze(filtered, topic)
# Phase 4: 生成报告 report = await self.writing_agent.write_report( topic=topic, sources=filtered, analysis=analysis )
return report10.3 编程助手架构
class CodingAssistantArchitecture: """编程助手:Handoffs 串行流水线"""
def __init__(self): self.understand_agent = CodeUnderstandingAgent() self.plan_agent = CodePlanningAgent() self.code_agent = CodeGenerationAgent() self.review_agent = CodeReviewAgent() self.test_agent = TestGenerationAgent()
async def implement(self, requirement: str, codebase: dict) -> dict: # Step 1: 理解需求和代码库 context = await self.understand_agent.analyze(requirement, codebase)
# Step 2: 制定实现计划 plan = await self.plan_agent.create_plan(requirement, context)
# Step 3: 编写代码 code = await self.code_agent.generate(plan, context)
# Step 4: 代码审查 review = await self.review_agent.review(code, plan) if review.has_issues: code = await self.code_agent.fix(code, review.issues)
# Step 5: 生成测试 tests = await self.test_agent.generate(code, plan)
return { "code": code, "tests": tests, "review": review.summary, }十一、总结
| 模式 | 核心优势 | 典型框架 |
|---|---|---|
| Handoffs | 专业化分工 | OpenAI Agents SDK |
| Fan-out/in | 并行加速 | LangChain |
| Supervisor | 智能路由 | AutoGen |
11.1 架构选型速查
| 场景需求 | 推荐架构 |
|---|---|
| 简单问答 | Single Agent + RAG |
| 多专业领域协作 | Supervisor |
| 多源数据并行研究 | Fan-out/Fan-in |
| 流水线式任务处理 | Handoffs |
| 需要反复试错 | Reflection |
| 复杂多步任务 | Plan-and-Execute |
| 混合型复杂系统 | Supervisor + Fan-out |
选择合适模式让 Agent 系统更清晰、高效!
参考资料
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






