mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1375 字
4 分钟
Agent 架构模式:Handoffs、Fan-out 与 Supervisor
2025-03-22

前言#

在构建复杂 Agent 系统时,选择合适的架构模式至关重要。本章深入讲解三种核心架构模式:Handoffs、Fan-out 和 Supervisor,并扩展讨论 Single Agent vs Multi-Agent 的选型决策、工具增强与知识增强的架构差异,以及记忆系统的架构模式。

一、Handoffs 交接模式#

1.1 什么是 Handoffs#

Handoffs 允许 Agent 将任务转交给更专业的 Agent

# OpenAI Agents SDK 中的 Handoffs 示例
from agents import Agent, handoff
research_agent = Agent(name="researcher", ...)
coding_agent = Agent(name="coder", ...)
review_agent = Agent(name="reviewer", ...)
# 定义交接函数
def research_handoff(message: str):
return research_agent
def code_handoff(message: str):
return coding_agent
# 使用 handoff
orchestrator = Agent(
name="orchestrator",
handoffs=[research_handoff, code_handoff]
)

1.2 交接执行流程#

sequenceDiagram participant User participant Orchestrator participant ResearchAgent participant CodingAgent User->>Orchestrator: 请求分析代码问题 Orchestrator->>ResearchAgent: 交接研究任务 ResearchAgent-->>Orchestrator: 返回研究结果 Orchestrator->>CodingAgent: 交接编码任务 CodingAgent-->>User: 返回代码解决方案

1.3 Handoffs 的使用场景#

场景说明
专业分工编码 Agent vs 审查 Agent
路由选择根据意图选择不同 Agent
任务分解复杂任务交给专业处理

1.4 交接注意事项#

# Handoffs 需要传递上下文
def create_handoff_with_context(agent, context):
"""交接时带上关键上下文信息"""
def handoff_fn(message: str):
# 将上下文注入消息
enhanced_message = f"{context}\n\n用户请求: {message}"
return agent
return handoff_fn

1.5 Handoffs 的上下文管理策略#

交接过程中上下文丢失是最常见的问题。以下是几种实用的上下文管理策略:

from dataclasses import dataclass, field
from typing import Any
@dataclass
class HandoffContext:
"""交接上下文容器"""
original_query: str
accumulated_findings: list[str] = field(default_factory=list)
tool_results: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
parent_agent: str = ""
depth: int = 0 # 防止无限递归交接
def add_finding(self, finding: str):
self.accumulated_findings.append(finding)
def to_prompt_segment(self) -> str:
findings_text = "\n".join(f"- {f}" for f in self.accumulated_findings)
return f"""
<交接上下文>
原始请求: {self.original_query}
已有发现:
{findings_text}
深度: {self.depth}
</交接上下文>
"""

深度限制防止无限交接循环:

MAX_HANDOFF_DEPTH = 3
def safe_handoff(context: HandoffContext, target_agent: Agent) -> Any:
if context.depth >= MAX_HANDOFF_DEPTH:
return "已达到最大交接深度,返回当前结果。"
new_context = HandoffContext(
original_query=context.original_query,
accumulated_findings=context.accumulated_findings.copy(),
depth=context.depth + 1,
parent_agent=context.parent_agent,
)
return target_agent.run(new_context)

二、Fan-out/Fan-in 并行模式#

2.1 原理#

Fan-out 将任务分发给多个 Agent,Fan-in 将结果汇总

# Fan-out/Fan-in 示例
async def parallel_research(query: str, sources: list):
# Fan-out: 分发任务给多个研究 Agent
tasks = []
for source in sources:
task = research_agent.run(f"研究 {source}{query}")
tasks.append(task)
# Fan-in: 汇总结果
results = await asyncio.gather(*tasks)
synthesis = synthesis_agent.run(f"汇总: {results}")
return synthesis

2.2 执行流程#

graph TB A["用户请求"] --> B["Fan-out 分发"] B --> C["Agent 1"] B --> D["Agent 2"] B --> E["Agent 3"] C --> F["Fan-in 汇总"] D --> F E --> F F --> G["最终回答"]

2.3 超时与错误处理#

import asyncio
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
async def fan_out_with_retry(query: str, sources: list):
try:
tasks = [
research_agent.run(query, source=source)
for source in sources
]
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=30.0
)
# 过滤错误结果
valid_results = [r for r in results if not isinstance(r, Exception)]
return synthesis_agent.run(valid_results)
except asyncio.TimeoutError:
return "部分研究超时,返回已有结果"

2.4 Fan-in 的汇总策略#

多路结果汇总不只是简单拼接,需要根据任务特点选择不同的合并策略:

from enum import Enum
from typing import Any
class MergeStrategy(Enum):
VOTE = "vote" # 多数投票
RANK = "rank" # 排序取最优
INTERSECT = "intersect" # 取交集
UNION = "union" # 取并集
SUMMARIZE = "summarize" # LLM 摘要合并
async def fan_in(results: list[Any], strategy: MergeStrategy) -> Any:
"""根据策略合并并行结果"""
if strategy == MergeStrategy.VOTE:
# 投票策略:多数一致的答案胜出
from collections import Counter
counter = Counter(str(r) for r in results)
return counter.most_common(1)[0][0]
elif strategy == MergeStrategy.RANK:
# 排序策略:用 LLM 评估每个结果质量
scored = []
for r in results:
score = await judge_quality(r)
scored.append((score, r))
scored.sort(reverse=True)
return scored[0][1]
elif strategy == MergeStrategy.SUMMARIZE:
# 摘要策略:让 LLM 合并多个来源的信息
combined = "\n\n---\n\n".join(str(r) for r in results)
return await synthesis_llm.complete(
f"将以下多路研究结果合并为一份完整报告:\n{combined}"
)
elif strategy == MergeStrategy.INTERSECT:
# 交集策略:只保留所有来源一致认可的信息
sets = [set(normalize(r)) for r in results]
return set.intersection(*sets)
elif strategy == MergeStrategy.UNION:
# 并集策略:汇总所有来源的独特信息
sets = [set(normalize(r)) for r in results]
return set.union(*sets)

三、Supervisor 监督模式#

3.1 原理#

Supervisor 模式中,一个协调者决定调用哪个子 Agent:

from enum import Enum
class Supervisor:
def __init__(self):
self.agents = {
"research": research_agent,
"code": coding_agent,
"review": review_agent
}
self.supervisor_llm = supervisor_llm
async def route(self, message: str) -> str:
"""LLM 决定调用哪个 Agent"""
decision = await self.supervisor_llm.complete(
f"决定使用哪个 Agent: {message}"
)
return decision.agent_choice
async def run(self, message: str):
agent_name = await self.route(message)
agent = self.agents[agent_name]
return await agent.run(message)

3.2 流程图#

flowchart TD A["用户请求"] --> B["Supervisor LLM"] B --> C{"判断"} C -->|"研究"| D["Research Agent"] C -->|"编码"| E["Coding Agent"] C -->|"审查"| F["Review Agent"] D --> G["返回结果"] E --> G F --> G

3.3 带状态追踪的 Supervisor#

生产环境中 Supervisor 需要追踪任务进度,防止重复调用和遗漏步骤:

from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class TaskState:
task_id: str
original_query: str
status: str = "pending" # pending / in_progress / completed / failed
assigned_agents: list[str] = field(default_factory=list)
results: dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
class StatefulSupervisor:
def __init__(self):
self.agents: dict[str, Agent] = {}
self.active_tasks: dict[str, TaskState] = {}
async def run(self, query: str) -> Any:
task_id = generate_task_id()
state = TaskState(task_id=task_id, original_query=query)
self.active_tasks[task_id] = state
# Supervisor 决策循环
for step in range(MAX_STEPS):
state.status = "in_progress"
state.updated_at = datetime.now()
# 决定下一步
next_action = await self._decide(state)
if next_action.action == "complete":
state.status = "completed"
return self._compile_result(state)
# 分配给子 Agent
agent_name = next_action.agent_name
state.assigned_agents.append(agent_name)
try:
result = await self.agents[agent_name].run(
next_action.prompt, context=state.results
)
state.results[agent_name] = result
except Exception as e:
state.results[agent_name] = f"Error: {str(e)}"
state.status = "failed"
return "达到最大步骤数,任务未完成。"

四、模式对比#

模式适用场景复杂度并行度
Handoffs专业分工明确串行
Fan-out/Fan-in任务可分解、独立完全并行
Supervisor需要智能路由串行

五、组合模式实战#

5.1 复杂工作流#

async def complex_workflow(user_request: str):
# 1. Supervisor 决定任务类型
task_type = await supervisor.route(user_request)
# 2. 根据类型选择模式
if task_type == "research_report":
# Fan-out 研究 + Handoffs 给不同专家
results = await fan_out_research(user_request)
return await handoff_to_report_agent(results)
elif task_type == "code_task":
# Handoffs 给编码 Agent
return await handoff_to_coder(user_request)

5.2 多级 Supervisor#

graph TD A["顶层 Supervisor"] --> B["研发 Supervisor"] A --> C["运营 Supervisor"] B --> D["前端 Agent"] B --> E["后端 Agent"] C --> F["客服 Agent"] C --> G["分析 Agent"]

5.3 组合模式的工程化考量#

组合多种模式时,需要特别注意以下工程细节:

@dataclass
class WorkflowConfig:
"""工作流配置"""
max_parallel_agents: int = 5 # 最大并行 Agent 数
handoff_timeout_seconds: float = 30 # 交接超时
supervisor_max_retries: int = 3 # Supervisor 重试次数
enable_tracing: bool = True # 是否开启追踪
cost_budget_usd: float = 1.0 # 单次工作流成本预算
class HybridWorkflow:
"""混合工作流引擎:组合 Handoffs + Fan-out + Supervisor"""
def __init__(self, config: WorkflowConfig):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_parallel_agents)
self.tracer = get_tracer() if config.enable_tracing else None
async def execute(self, request: str) -> dict:
total_cost = 0.0
# Phase 1: Supervisor 分类
with self._trace("supervisor_classify"):
classification = await supervisor.classify(request)
# Phase 2: 根据分类选择执行策略
if classification.type == "parallel_research":
results = await self._fan_out_phase(classification.subtasks)
elif classification.type == "sequential_pipeline":
results = await self._handoff_phase(classification.pipeline)
else:
results = await self._single_agent_phase(classification.agent, request)
# Phase 3: 结果验证
with self._trace("validate"):
validated = await self._validate_results(results)
return {"results": validated, "cost_usd": total_cost}
async def _fan_out_phase(self, subtasks: list) -> list:
async def run_with_semaphore(task):
async with self.semaphore:
return await agent.run(task)
tasks = [run_with_semaphore(t) for t in subtasks]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in raw_results if not isinstance(r, Exception)]
async def _handoff_phase(self, pipeline: list) -> list:
results = []
context = ""
for step in pipeline:
agent = self.agents[step["agent"]]
result = await agent.run(step["prompt"], context=context)
context += f"\n{result}"
results.append(result)
return results

六、Single Agent vs Multi-Agent 决策框架#

6.1 决策树#

不是所有场景都需要 Multi-Agent。Single Agent 适合简单任务,Multi-Agent 适合复杂协作。以下是决策框架:

flowchart TD A["任务需求"] --> B{"需要多种专业能力?"} B -->|"否"| C{"步骤可并行?"} B -->|"是"| D{"子任务独立?"} C -->|"否"| E["Single Agent"] C -->|"是"| F["Fan-out/Fan-in"] D -->|"否"| G["Supervisor 模式"] D -->|"是"| H{"需要上下文传递?"} H -->|"是"| G H -->|"否"| F

6.2 选型对比#

维度Single AgentMulti-Agent
开发复杂度
调试难度
Token 成本高(多次系统提示词)
任务完成质量简单任务好,复杂一般复杂任务好
并行能力
适用场景单一领域、步骤少于 5多领域、需要协作

6.3 实际选型示例#

class ArchitectureSelector:
"""根据任务特征自动选择架构"""
def select(self, task_description: str) -> str:
analysis = self._analyze(task_description)
# 规则 1:步骤少于 3 且领域单一 → Single Agent
if analysis.step_count <= 3 and analysis.domain_count == 1:
return "single_agent"
# 规则 2:需要搜索+编码+审查 → Supervisor
if analysis.requires_domains(["research", "coding", "review"]):
return "supervisor"
# 规则 3:多个独立数据源 → Fan-out
if analysis.parallel_data_sources > 1:
return "fan_out_fan_in"
# 规则 4:线性流程且步骤明确 → Handoffs
if analysis.is_sequential and analysis.step_count > 3:
return "handoffs"
return "supervisor" # 默认用 Supervisor

七、ReAct vs Plan-and-Execute vs Reflection 架构#

7.1 三种核心推理架构对比#

前面介绍了通信模式(Handoffs、Fan-out、Supervisor),这里补充三种推理架构。它们决定了 Agent 内部的思考方式。

架构核心思想优势劣势
ReAct边想边做,交替推理行动灵活、实时可能陷入循环
Plan-Execute先规划再执行全局视野计划可能过时
Reflection执行后自我反思改进持续优化额外 Token

7.2 ReAct 架构实现#

from typing import TypedDict
class ReActState(TypedDict):
thought: str
action: str
action_input: dict
observation: str
REACT_PROMPT = """你是一个使用 ReAct 范式的智能助手。
可用工具:
{tool_descriptions}
严格按照以下格式回答:
Thought: 思考当前状况和下一步
Action: 工具名称
Action Input: {{"param": "value"}}
Observation: (工具返回结果)
... (可以重复多次 Thought/Action/Observation)
Thought: 我现在知道最终答案了
Final Answer: 最终答案
注意: 最多执行 {max_steps} 步。
Question: {question}"""
async def react_loop(question: str, tools: dict, max_steps: int = 5) -> str:
"""ReAct 推理循环"""
messages = [
{"role": "system", "content": REACT_PROMPT.format(
tool_descriptions=format_tools(tools),
max_steps=max_steps,
question=question
)}
]
for step in range(max_steps):
response = await llm.complete(messages)
messages.append({"role": "assistant", "content": response})
# 解析 Action
action, action_input = parse_action(response)
if action == "Final Answer":
return parse_final_answer(response)
# 执行工具
if action in tools:
observation = await tools[action](**action_input)
else:
observation = f"错误: 工具 {action} 不存在"
messages.append({"role": "user", "content": f"Observation: {observation}"})
return "达到最大步数限制,未能完成任务。"

7.3 Plan-and-Execute 架构实现#

@dataclass
class PlanStep:
step_id: int
description: str
tool: str | None = None
tool_input: dict | None = None
status: str = "pending" # pending / running / done / failed
result: Any = None
@dataclass
class Plan:
goal: str
steps: list[PlanStep]
current_step: int = 0
PLAN_PROMPT = """为以下任务制定执行计划。
任务: {goal}
可用工具: {tools}
输出 JSON 格式的计划:
{{
"steps": [
{{"description": "步骤描述", "tool": "工具名", "tool_input": {{"param": "value"}}}},
...
]
}}"""
async def plan_and_execute(goal: str, tools: dict) -> str:
"""Plan-and-Execute 执行器"""
# Phase 1: 制定计划
plan_response = await planner_llm.complete(
PLAN_PROMPT.format(goal=goal, tools=format_tools(tools))
)
plan = parse_plan(plan_response)
# Phase 2: 逐步执行
for step in plan.steps:
step.status = "running"
try:
if step.tool and step.tool in tools:
step.result = await tools[step.tool](**step.tool_input)
else:
# 无工具的推理步骤
step.result = await executor_llm.complete(
f"任务: {step.description}\n上下文: {get_previous_results(plan)}"
)
step.status = "done"
except Exception as e:
step.status = "failed"
step.result = str(e)
# 重新规划
new_plan = await replan(plan, str(e))
plan.steps.extend(new_plan.steps)
# Phase 3: 汇总结果
return await synthesize(plan)

7.4 Reflection(自我反思)架构#

REFLECT_PROMPT = """你刚完成了以下任务:
任务: {task}
执行过程: {trajectory}
最终结果: {result}
请反思:
1. 结果是否完整准确地完成了任务?
2. 有没有更高效的方法?
3. 中间步骤有没有可以优化的地方?
给出改进建议和评分 (0-10)。"""
async def reflective_agent(task: str, max_iterations: int = 3) -> str:
"""带反思的 Agent"""
best_result = None
best_score = 0
for i in range(max_iterations):
# 执行任务
result = await execute_agent(task, previous_reflection=best_result)
# 反思评估
reflection = await reflector_llm.complete(
REFLECT_PROMPT.format(
task=task,
trajectory=get_trajectory(),
result=result
)
)
score = parse_score(reflection)
if score > best_score:
best_score = score
best_result = result
# 如果分数够高,提前结束
if score >= 8.0:
break
return best_result

7.5 架构选择流程#

flowchart TD A["任务特点"] --> B{"任务是否可预先规划?"} B -->|"是"| C{"执行过程会出错吗?"} B -->|"否"| D["ReAct 架构"] C -->|"经常出错"| E["Reflection 架构"] C -->|"通常顺利"| F["Plan-and-Execute"] D --> G["搜索、客服、实时问答"] F --> H["报告生成、数据分析"] E --> I["代码生成、复杂推理"]

八、工具增强 vs 知识增强架构#

8.1 两种增强路径#

Agent 的能力增强有两个方向:工具增强(Tool-augmented)和知识增强(Knowledge-augmented)。

flowchart LR subgraph ToolAugmented["工具增强架构"] T1["LLM"] --> T2["搜索引擎"] T1 --> T3["代码执行器"] T1 --> T4["API 调用"] T1 --> T5["数据库"] end subgraph KnowledgeAugmented["知识增强架构"] K1["LLM"] --> K2["向量数据库"] K1 --> K3["知识图谱"] K1 --> K4["文档检索"] K1 --> K5["FAQ 库"] end

8.2 工具增强架构详解#

工具增强让 Agent 能够执行实际操作,适合需要与外部系统交互的场景:

from langchain.tools import tool
from langchain.agents import create_react_agent
@tool
def search_web(query: str) -> str:
"""搜索互联网获取最新信息"""
return web_search_api(query)
@tool
def execute_python(code: str) -> str:
"""执行 Python 代码并返回结果"""
return sandbox.run(code, timeout=30)
@tool
def query_database(sql: str) -> str:
"""查询数据库"""
# 安全检查:只允许 SELECT
if not sql.strip().upper().startswith("SELECT"):
return "错误: 只允许 SELECT 查询"
return db.execute(sql)
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件"""
return email_service.send(to, subject, body)
# 组合为工具增强 Agent
tools = [search_web, execute_python, query_database, send_email]
tool_agent = create_react_agent(llm, tools, prompt)

8.3 知识增强架构详解#

知识增强让 Agent 能够利用私有数据,适合需要专业领域知识的场景:

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
class KnowledgeAugmentedAgent:
"""知识增强 Agent"""
def __init__(self, knowledge_sources: list[str]):
# 构建向量索引
self.vectorstore = Chroma.from_documents(
documents=load_documents(knowledge_sources),
embedding=OpenAIEmbeddings()
)
async def answer(self, query: str) -> str:
# 1. 检索相关知识
docs = self.vectorstore.similarity_search(query, k=5)
context = format_documents(docs)
# 2. 基于知识生成回答
response = await llm.complete(f"""
基于以下知识回答问题。如果知识中没有相关信息,请明确说明。
知识:
{context}
问题: {query}
""")
return response

8.4 混合架构:工具 + 知识#

生产环境通常需要两者结合:

class HybridAgent:
"""工具 + 知识混合增强"""
def __init__(self):
self.vectorstore = Chroma(...) # 知识库
self.tools = [search, calculator] # 工具集
async def process(self, query: str) -> str:
# Step 1: 先查知识库
relevant_docs = self.vectorstore.similarity_search(query, k=3)
knowledge_context = format_documents(relevant_docs)
# Step 2: 检查知识库是否足够
coverage = await assess_coverage(query, knowledge_context)
if coverage.score >= 0.8:
# 知识库已覆盖,直接回答
return await self._answer_from_knowledge(query, knowledge_context)
else:
# 知识不足,使用工具补充
return await self._answer_with_tools(query, knowledge_context)

8.5 选型建议#

场景增强方式典型实现
客服问答知识增强RAG + FAQ 库
数据分析工具增强SQL + Python 执行器
研究助手混合搜索 + 文献库
代码助手工具增强代码执行 + Git 操作
合规审查知识增强法规向量库 + 规则引擎

九、记忆架构模式#

9.1 三层记忆架构#

Agent 的记忆系统通常分为三层,每层有不同的存储介质和访问模式:

flowchart TB A["工作记忆<br/>Working Memory"] --> B["短期记忆<br/>Short-term Memory"] B --> C["长期记忆<br/>Long-term Memory"] A --> A1["当前对话上下文<br/>存储: 内存/Token 窗口"] B --> B1["会话级信息<br/>存储: Redis/Session"] C --> C1["持久化知识<br/>存储: 向量数据库"] style A fill:#90EE90 style B fill:#87CEEB style C fill:#DDA0DD

9.2 工作记忆实现#

工作记忆就是当前对话的上下文窗口,直接放在 LLM 的 prompt 中:

class WorkingMemory:
"""工作记忆:管理当前对话的上下文窗口"""
def __init__(self, max_tokens: int = 4000):
self.messages: list[dict] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._compress_if_needed()
def get_context(self) -> list[dict]:
return self.messages
def _compress_if_needed(self):
"""当 Token 超限时压缩历史"""
total = count_tokens(self.messages)
if total <= self.max_tokens:
return
# 策略:保留 system + 最近 N 轮 + 旧消息摘要
system = [m for m in self.messages if m["role"] == "system"]
recent = self.messages[-6:] # 最近 3 轮
old = [m for m in self.messages if m not in system and m not in recent]
if old:
summary = summarize_messages(old)
self.messages = system + [{"role": "system", "content": f"历史摘要: {summary}"}] + recent

9.3 短期记忆实现#

短期记忆跨对话轮次保持信息,使用外部存储:

import redis
import json
class ShortTermMemory:
"""短期记忆:会话级别的信息存储"""
def __init__(self, session_id: str, ttl: int = 3600):
self.client = redis.Redis()
self.session_id = session_id
self.ttl = ttl
def save(self, key: str, value: Any):
"""保存会话级信息"""
full_key = f"session:{self.session_id}:{key}"
self.client.setex(full_key, self.ttl, json.dumps(value))
def load(self, key: str) -> Any | None:
"""读取会话级信息"""
full_key = f"session:{self.session_id}:{key}"
data = self.client.get(full_key)
return json.loads(data) if data else None
def save_user_preference(self, preference: dict):
"""保存用户在当前会话中表达的偏好"""
existing = self.load("preferences") or {}
existing.update(preference)
self.save("preferences", existing)

9.4 长期记忆实现#

长期记忆使用向量数据库存储,支持语义检索:

from datetime import datetime
class LongTermMemory:
"""长期记忆:持久化的知识和经验"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
async def store(self, content: str, metadata: dict | None = None):
"""存储一条长期记忆"""
memory_entry = {
"content": content,
"timestamp": datetime.now().isoformat(),
"access_count": 0,
**(metadata or {})
}
self.vectorstore.add_documents([memory_entry])
async def recall(self, query: str, k: int = 5) -> list[dict]:
"""语义检索相关记忆"""
results = self.vectorstore.similarity_search(query, k=k)
# 更新访问计数(越常访问的记忆越重要)
for doc in results:
doc.metadata["access_count"] += 1
doc.metadata["last_accessed"] = datetime.now().isoformat()
return results
async def consolidate(self):
"""记忆巩固:合并相似记忆,删除过时记忆"""
all_memories = self.vectorstore.get_all()
# 找出相似度很高的记忆对
merged = set()
for i, m1 in enumerate(all_memories):
if i in merged:
continue
for j, m2 in enumerate(all_memories[i+1:], i+1):
if j in merged:
continue
similarity = compute_similarity(m1, m2)
if similarity > 0.95:
# 合并为一条更强的记忆
combined = merge_memories(m1, m2)
await self.store(combined.content, combined.metadata)
merged.add(i)
merged.add(j)
# 删除过时记忆(超过 90 天且访问次数为 0)
cutoff = datetime.now() - timedelta(days=90)
for memory in all_memories:
if (parse_date(memory.metadata.get("timestamp")) < cutoff
and memory.metadata.get("access_count", 0) == 0):
self.vectorstore.delete(memory.id)

9.5 情景记忆模式#

情景记忆存储具体的事件和经历,支持按时间和情境检索:

@dataclass
class EpisodicMemory:
"""情景记忆:记录具体事件"""
event: str # 事件描述
timestamp: datetime # 发生时间
context: str # 上下文
outcome: str # 结果
emotional_valence: float # 情感正负值 (-1 to 1)
lessons: list[str] # 从中学到的教训
class EpisodicMemoryStore:
def __init__(self, vectorstore):
self.vectorstore = vectorstore
async def record_episode(self, event: EpisodicMemory):
"""记录一次经历"""
doc = Document(
page_content=f"{event.event} -> {event.outcome}",
metadata={
"timestamp": event.timestamp.isoformat(),
"lessons": event.lessons,
"valence": event.emotional_valence,
}
)
self.vectorstore.add_documents([doc])
async def recall_similar_episodes(self, current_situation: str, k: int = 3):
"""回忆类似的经历,避免重蹈覆辙"""
results = self.vectorstore.similarity_search(
current_situation, k=k
)
# 优先返回负面经历的教训(避免犯同样错误)
results.sort(key=lambda x: x.metadata.get("valence", 0))
return results

十、真实场景架构示例#

10.1 客服系统架构#

flowchart TD U["用户消息"] --> R["路由层<br/>意图分类"] R -->|"查询类"| KB["知识库 Agent<br/>RAG + FAQ"] R -->|"操作类"| OP["操作 Agent<br/>工单/退款"] R -->|"投诉类"| SP["投诉处理 Agent<br/>升级流程"] KB --> V["结果验证"] OP --> V SP --> V V --> U2["用户回复"] KB -.->|"未解决"| H["人工客服"] SP -.->|"严重"| H
class CustomerSupportArchitecture:
"""客服系统架构"""
def __init__(self):
self.router = IntentRouter()
self.kb_agent = KnowledgeBaseAgent(vectorstore=faq_store)
self.ops_agent = OperationsAgent(systems=[ticket_system, refund_api])
self.complaint_agent = ComplaintAgent(escalation=human_queue)
async def handle(self, message: str, user_id: str) -> str:
# 路由
intent = await self.router.classify(message)
if intent.type == "inquiry":
result = await self.kb_agent.answer(message, user_id)
elif intent.type == "operation":
result = await self.ops_agent.process(message, user_id)
elif intent.type == "complaint":
result = await self.complaint_agent.handle(message, user_id)
else:
result = await self.kb_agent.answer(message, user_id)
# 验证
if result.confidence < 0.7:
return "让我帮您转接人工客服..."
return result.response

10.2 研究助手架构#

class ResearchAssistantArchitecture:
"""研究助手:Fan-out + Supervisor 混合架构"""
def __init__(self):
self.supervisor = ResearchSupervisor()
self.search_agent = SearchAgent()
self.arxiv_agent = ArxivAgent()
self.analysis_agent = AnalysisAgent()
self.writing_agent = WritingAgent()
async def research(self, topic: str, depth: str = "medium") -> str:
# Phase 1: Fan-out 搜索
search_tasks = [
self.search_agent.search(topic, source="web"),
self.search_agent.search(topic, source="news"),
self.arxiv_agent.search(topic),
]
raw_results = await asyncio.gather(*search_tasks)
# Phase 2: Supervisor 筛选
filtered = await self.supervisor.filter_relevant(raw_results, threshold=0.6)
# Phase 3: 分析
analysis = await self.analysis_agent.analyze(filtered, topic)
# Phase 4: 生成报告
report = await self.writing_agent.write_report(
topic=topic,
sources=filtered,
analysis=analysis
)
return report

10.3 编程助手架构#

class CodingAssistantArchitecture:
"""编程助手:Handoffs 串行流水线"""
def __init__(self):
self.understand_agent = CodeUnderstandingAgent()
self.plan_agent = CodePlanningAgent()
self.code_agent = CodeGenerationAgent()
self.review_agent = CodeReviewAgent()
self.test_agent = TestGenerationAgent()
async def implement(self, requirement: str, codebase: dict) -> dict:
# Step 1: 理解需求和代码库
context = await self.understand_agent.analyze(requirement, codebase)
# Step 2: 制定实现计划
plan = await self.plan_agent.create_plan(requirement, context)
# Step 3: 编写代码
code = await self.code_agent.generate(plan, context)
# Step 4: 代码审查
review = await self.review_agent.review(code, plan)
if review.has_issues:
code = await self.code_agent.fix(code, review.issues)
# Step 5: 生成测试
tests = await self.test_agent.generate(code, plan)
return {
"code": code,
"tests": tests,
"review": review.summary,
}

十一、总结#

模式核心优势典型框架
Handoffs专业化分工OpenAI Agents SDK
Fan-out/in并行加速LangChain
Supervisor智能路由AutoGen

11.1 架构选型速查#

场景需求推荐架构
简单问答Single Agent + RAG
多专业领域协作Supervisor
多源数据并行研究Fan-out/Fan-in
流水线式任务处理Handoffs
需要反复试错Reflection
复杂多步任务Plan-and-Execute
混合型复杂系统Supervisor + Fan-out

选择合适模式让 Agent 系统更清晰、高效!

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Agent 架构模式:Handoffs、Fan-out 与 Supervisor
https://blog.souloss.com/posts/machine-learning/agent-guide/agent-architecture-patterns/
作者
Souloss
发布于
2025-03-22
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时