单个 Agent 能力有限,复杂任务需要多个 Agent 协同作战。
就像一个公司不能只靠一个人:产品经理理解需求、架构师设计方案、工程师实现代码、测试工程师验证质量。每个人专注自己的领域,通过协作完成复杂项目。
Multi-Agent 系统正是这种协作模式的技术实现。
本文要点
- 为什么需要 Multi-Agent 系统
- Agent 间通信模式:同步/异步、消息队列
- 协作架构:星型、链式、网状、层次化
- 任务分解与整合策略
- 冲突处理与共识机制
- 案例分析:AutoGPT、CrewAI、LangChain Agents
- A2A(Agent-to-Agent)协议解析
- 论文解读:Generative Agents
一、为什么需要 Multi-Agent
1.1 单 Agent 的局限性
核心问题:
- 角色混淆:一个 Agent 扮演多个角色,提示词冲突
- 注意力分散:任务复杂时容易顾此失彼
- 错误传播:一步出错,后续全错
- 扩展困难:增加能力需要修改整体逻辑
1.2 Multi-Agent 的优势
┌─────────────────────────────────────────────────────────────┐│ Multi-Agent 优势 │├─────────────────────────────────────────────────────────────┤│ ││ 1. 专家分工 ││ ├── 每个 Agent 专注一个领域 ││ ├── 提示词更精准、输出更稳定 ││ └── 类似人类团队的专业分工 ││ ││ 2. 相互审核 ││ ├── 一个 Agent 输出,另一个 Agent 检查 ││ ├── 减少错误、提升质量 ││ └── 类似 Code Review 机制 ││ ││ 3. 并行处理 ││ ├── 多个 Agent 同时工作 ││ ├── 缩短整体执行时间 ││ └── 适合可并行化的子任务 ││ ││ 4. 灵活扩展 ││ ├── 新增能力只需添加新 Agent ││ ├── 不影响现有 Agent ││ └── 模块化、可维护性强 ││ │└─────────────────────────────────────────────────────────────┘二、Agent 间通信模式
2.1 同步通信
发送方等待接收方响应后继续执行。
特点:
- 实现简单,逻辑清晰
- 阻塞等待,可能造成延迟
- 适合需要立即结果的场景
代码示例:
class SyncCommunication: def __init__(self, agents: dict): self.agents = agents
def call(self, agent_name: str, message: str) -> str: """同步调用其他 Agent""" agent = self.agents[agent_name] # 阻塞等待返回 response = agent.process(message) return response
# 使用示例comm = SyncCommunication({ "researcher": ResearcherAgent(), "writer": WriterAgent()})
# 同步调用:等待 researcher 返回后再继续research_result = comm.call("researcher", "研究 AI Agent 发展")final_output = comm.call("writer", f"基于以下内容写文章:{research_result}")2.2 异步通信
发送方发送消息后不等待,继续执行其他任务。
特点:
- 非阻塞,提高并发效率
- 实现复杂,需要消息队列支持
- 适合长时间任务、批量处理
代码示例:
import asynciofrom dataclasses import dataclassfrom typing import Callable, Awaitable
@dataclassclass Message: sender: str receiver: str content: str callback: Callable[[str], Awaitable[None]]
class AsyncMessageQueue: def __init__(self): self.queue = asyncio.Queue() self.handlers = {}
def register(self, agent_name: str, handler): self.handlers[agent_name] = handler
async def send(self, message: Message): await self.queue.put(message)
async def start_processing(self): while True: message = await self.queue.get() handler = self.handlers.get(message.receiver) if handler: result = await handler(message.content) if message.callback: await message.callback(result)
# 使用示例async def main(): mq = AsyncMessageQueue() mq.register("researcher", researcher_handler) mq.register("writer", writer_handler)
# 发送消息后立即返回,不等待结果 await mq.send(Message( sender="coordinator", receiver="researcher", content="研究 AI Agent 发展", callback=lambda r: print(f"收到结果:{r}") ))2.3 消息队列设计
消息格式标准化:
@dataclassclass AgentMessage: """标准消息格式""" # 元信息 message_id: str # 消息唯一标识 timestamp: float # 时间戳
# 发送方信息 sender: str # 发送方 Agent ID sender_role: str # 发送方角色
# 接收方信息 receiver: str # 接收方 Agent ID(或 "broadcast")
# 消息内容 message_type: str # 消息类型:request/response/notification content: str # 消息内容 metadata: dict # 额外元数据
# 任务关联 task_id: str # 关联的任务 ID parent_message_id: str # 父消息 ID(用于追踪对话链)三、协作架构模式
3.1 星型架构(Star)
一个中心 Agent 协调多个工作 Agent。
特点:
- 中心化控制,协调简单
- 单点故障风险
- 适合任务分发和结果汇总场景
实现示例:
class StarArchitecture: def __init__(self, coordinator, workers: dict): self.coordinator = coordinator self.workers = workers
def execute(self, task: str) -> str: # 1. 协调者分解任务 subtasks = self.coordinator.decompose(task)
# 2. 分发给各个 Worker results = {} for subtask in subtasks: worker_name = subtask.assigned_worker worker = self.workers[worker_name] results[worker_name] = worker.execute(subtask)
# 3. 协调者整合结果 final_result = self.coordinator.integrate(results) return final_result3.2 链式架构(Chain)
Agent 按顺序执行,前一个的输出是后一个的输入。
特点:
- 流程清晰,适合流水线任务
- 环环相扣,一处阻塞全链等待
- 适合软件开发、内容生产等流程化场景
实现示例:
class ChainArchitecture: def __init__(self, agents: list): self.chain = agents # 按顺序排列的 Agent 列表
def execute(self, input_data: str) -> str: data = input_data for agent in self.chain: data = agent.process(data) # 可选:检查是否需要提前终止 if agent.should_stop(): break return data
# 软件开发流水线dev_pipeline = ChainArchitecture([ RequirementAgent(), # 需求分析 ArchitectAgent(), # 架构设计 DeveloperAgent(), # 代码实现 QAAgent(), # 测试验证 DeployerAgent() # 部署发布])
result = dev_pipeline.execute("开发一个用户登录功能")3.3 网状架构(Mesh)
所有 Agent 可直接通信,无中心节点。
特点:
- 高度灵活,任意 Agent 可协作
- 协调复杂,可能出现消息风暴
- 适合需要自由协商的场景
实现示例:
class MeshArchitecture: def __init__(self, agents: dict): self.agents = agents # 每个 Agent 持有对其他 Agent 的引用 for agent in agents.values(): agent.set_peers(agents)
def broadcast(self, sender: str, message: str): """广播消息给所有 Agent""" for name, agent in self.agents.items(): if name != sender: agent.receive(sender, message)
def direct_message(self, sender: str, receiver: str, message: str): """点对点通信""" self.agents[receiver].receive(sender, message)3.4 层次化架构(Hierarchical)
多层管理结构,上级管理下级。
特点:
- 层级分明,职责清晰
- 可扩展性强,易于管理大规模 Agent
- 适合企业级复杂系统
实现示例:
class HierarchicalAgent: def __init__(self, name: str, role: str): self.name = name self.role = role self.subordinates = [] self.superior = None
def add_subordinate(self, agent): agent.superior = self self.subordinates.append(agent)
def delegate(self, task: str): """将任务分配给下属""" results = [] for subordinate in self.subordinates: result = subordinate.execute(task) results.append(result) return self.aggregate(results)
def escalate(self, issue: str): """向上级汇报问题""" if self.superior: return self.superior.handle_escalation(self.name, issue)3.5 架构选择指南
| 架构 | 协调复杂度 | 扩展性 | 容错性 | 适用场景 |
|---|---|---|---|---|
| 星型 | 低 | 中 | 低 | 任务分发、结果汇总 |
| 链式 | 低 | 低 | 低 | 流水线处理 |
| 网状 | 高 | 高 | 高 | 自由协商、去中心化 |
| 层次化 | 中 | 高 | 中 | 企业级复杂系统 |
四、任务分解与整合
4.1 任务分解策略
分解原则:
1. 单一职责 - 每个子任务目标明确 - 避免一个任务包含多个不相关目标
2. 粒度适中 - 太粗:Agent 负担过重 - 太细:协调开销过大
3. 边界清晰 - 输入输出定义明确 - 减少 Agent 间的耦合
4. 可验证性 - 每个子任务有明确的完成标准 - 便于质量检查代码示例:
class TaskDecomposer: def __init__(self, llm): self.llm = llm
def decompose(self, task: str, agents: list) -> list: """使用 LLM 分解任务""" prompt = f""" 任务:{task}
可用的 Agent 角色: {self._format_agents(agents)}
请将任务分解为子任务,每个子任务指定: 1. 子任务描述 2. 负责的 Agent 3. 输入要求 4. 输出格式 5. 前置依赖 """
response = self.llm.generate(prompt) subtasks = self._parse_response(response) return subtasks
def _format_agents(self, agents): return "\n".join([ f"- {a.name}: {a.capabilities}" for a in agents ])4.2 结果整合策略
代码示例:
class ResultIntegrator: def sequential_merge(self, results: list) -> str: """顺序合并""" return "\n\n".join(results)
def voting_decision(self, results: list, voters: list) -> str: """投票决策""" votes = {} for voter in voters: chosen = voter.vote(results) votes[chosen] = votes.get(chosen, 0) + 1 return max(votes, key=votes.get)
def weighted_merge(self, results: list, weights: dict) -> str: """权重融合""" total_weight = sum(weights.values()) weighted_result = "" for agent_name, result in results.items(): weight = weights.get(agent_name, 1) # 根据权重决定内容的比重 weighted_result += result * weight return weighted_result
def llm_synthesis(self, results: dict, llm) -> str: """LLM 综合整合""" prompt = f""" 多个 Agent 的输出结果如下: {self._format_results(results)}
请综合以上信息,生成一个完整、连贯的最终结果。 """ return llm.generate(prompt)五、冲突处理与共识机制
5.1 常见冲突类型
5.2 冲突解决策略
1. 优先级机制
class PriorityResolver: def __init__(self, priorities: dict): self.priorities = priorities # Agent 优先级配置
def resolve(self, conflict): """按优先级解决冲突""" agent_a, agent_b = conflict.parties priority_a = self.priorities.get(agent_a, 0) priority_b = self.priorities.get(agent_b, 0)
winner = agent_a if priority_a >= priority_b else agent_b return conflict.proposals[winner]2. 投票机制
class VotingResolver: def resolve(self, proposals: dict, voters: list) -> str: """投票解决冲突""" votes = {k: 0 for k in proposals.keys()}
for voter in voters: # 每个 Agent 根据自己的判断投票 choice = voter.evaluate(proposals) votes[choice] += 1
return max(votes, key=votes.get)3. 辩论机制
class DebateResolver: def __init__(self, judge_agent): self.judge = judge_agent
def resolve(self, proposals: dict, rounds: int = 3): """辩论解决冲突""" arguments = {k: [] for k in proposals.keys()}
for _ in range(rounds): for agent_name, proposal in proposals.items(): # 每个 Agent 提出论据 arg = agent.generate_argument( proposal, arguments # 可以参考其他 Agent 的论据 ) arguments[agent_name].append(arg)
# 裁决者做出最终决定 decision = self.judge.decide(arguments) return decision4. 共识协议
┌─────────────────────────────────────────────────────────────┐│ 共识协议流程 │├─────────────────────────────────────────────────────────────┤│ ││ 1. 提案阶段 ││ Agent A 提出方案 → 广播给所有 Agent ││ ││ 2. 预投票阶段 ││ 各 Agent 表达初步态度(赞成/反对/弃权) ││ ││ 3. 讨论阶段 ││ 反对者提出修改建议 → 提案者修改方案 ││ ││ 4. 正式投票 ││ 达到多数同意(如 2/3)→ 方案通过 ││ ││ 5. 执行阶段 ││ 所有 Agent 按共识结果执行 ││ │└─────────────────────────────────────────────────────────────┘六、案例分析
6.1 AutoGPT
AutoGPT 是最早引起广泛关注的自主 Agent 项目之一。
核心特点:
- 自主目标分解:自动将大目标拆分为子任务
- 任务队列管理:维护待执行任务列表
- 自我反思:评估执行结果,调整策略
- 长期记忆:使用向量数据库存储历史信息
局限性:
- 容易陷入循环
- 成本难以控制
- 复杂任务成功率较低
6.2 CrewAI
CrewAI 专注于 Multi-Agent 角色扮演协作。
from crewai import Agent, Task, Crew, Process
# 定义 Agent 角色researcher = Agent( role="研究员", goal="深入研究主题,收集全面信息", backstory="你是一位经验丰富的研究员,擅长信息收集和分析", allow_delegation=False, verbose=True)
writer = Agent( role="作家", goal="撰写高质量、引人入胜的文章", backstory="你是一位专业作家,擅长将复杂信息转化为易懂的文章", allow_delegation=True, verbose=True)
editor = Agent( role="编辑", goal="审核文章质量,确保准确性和可读性", backstory="你是一位严谨的编辑,注重细节和质量", allow_delegation=False, verbose=True)
# 定义任务research_task = Task( description="研究 AI Agent 的最新发展趋势", expected_output="详细的研究报告", agent=researcher)
write_task = Task( description="基于研究报告撰写一篇技术文章", expected_output="结构完整的技术文章", agent=writer, context=[research_task] # 依赖研究任务)
edit_task = Task( description="审核并改进文章", expected_output="最终版本的技术文章", agent=editor, context=[write_task] # 依赖写作任务)
# 组建团队crew = Crew( agents=[researcher, writer, editor], tasks=[research_task, write_task, edit_task], process=Process.sequential # 顺序执行)
# 执行result = crew.kickoff()核心特点:
- 角色定义清晰:每个 Agent 有明确的角色、目标和背景
- 任务依赖管理:支持任务间的上下文传递
- 协作模式支持:顺序执行或层级协作
- 人类反馈集成:支持 Human-in-the-loop
6.3 LangGraph
LangGraph 提供基于状态图的 Agent 编排能力。
from langgraph.graph import StateGraph, ENDfrom typing import TypedDict, Annotatedimport operator
class State(TypedDict): messages: Annotated[list, operator.add] next_agent: str
def researcher_node(state: State) -> dict: # 研究员处理 result = researcher_agent.invoke(state["messages"]) return {"messages": [result], "next_agent": "writer"}
def writer_node(state: State) -> dict: # 作家处理 result = writer_agent.invoke(state["messages"]) return {"messages": [result], "next_agent": "reviewer"}
def reviewer_node(state: State) -> dict: # 审核员处理 result = reviewer_agent.invoke(state["messages"])
# 判断是否需要修改 if needs_revision(result): return {"messages": [result], "next_agent": "writer"} return {"messages": [result], "next_agent": END}
# 构建状态图workflow = StateGraph(State)workflow.add_node("researcher", researcher_node)workflow.add_node("writer", writer_node)workflow.add_node("reviewer", reviewer_node)
# 定义边workflow.set_entry_point("researcher")workflow.add_edge("researcher", "writer")workflow.add_edge("writer", "reviewer")workflow.add_conditional_edges( "reviewer", lambda s: s["next_agent"], {"writer": "writer", END: END})
# 编译并执行app = workflow.compile()result = app.invoke({"messages": ["写一篇关于 AI Agent 的文章"], "next_agent": "researcher"})核心特点:
- 状态机模型:清晰地定义 Agent 间的流转逻辑
- 条件分支:根据状态决定下一步行动
- 循环支持:可以实现迭代优化流程
- 可视化:可生成执行流程图
七、A2A 协议
7.1 什么是 A2A
A2A(Agent-to-Agent)是 Google 在 2025 年提出的 Agent 间通信标准协议。
7.2 A2A 与 MCP 的关系
| 协议 | 连接对象 | 解决问题 |
|---|---|---|
| MCP | Agent ↔ 工具/数据源 | AI 与外部资源的标准化连接 |
| A2A | Agent ↔ Agent | Agent 间的标准化协作 |
7.3 A2A 核心概念
┌─────────────────────────────────────────────────────────────┐│ A2A 协议核心概念 │├─────────────────────────────────────────────────────────────┤│ ││ 1. Agent Card(Agent 名片) ││ - 描述 Agent 的能力、接口、认证方式 ││ - 类似 OpenAPI 规范,但针对 Agent ││ ││ 2. Message(消息) ││ - 标准化的消息格式 ││ - 支持文本、文件、结构化数据 ││ ││ 3. Task(任务) ││ - 任务状态管理 ││ - 状态:pending → running → completed/failed ││ ││ 4. Artifact(产物) ││ - 任务执行产生的输出 ││ - 支持多种格式 ││ │└─────────────────────────────────────────────────────────────┘7.4 A2A 消息格式示例
{ "jsonrpc": "2.0", "method": "tasks/send", "params": { "task": { "id": "task-001", "status": { "state": "running" }, "history": [ { "role": "user", "parts": [{ "text": "分析最近的销售数据" }] } ] } }}7.5 实现示例
from a2a import A2AClient, AgentCard
# 定义 Agent 名片my_agent_card = AgentCard( name="SalesAnalyzer", description="分析销售数据的专家 Agent", capabilities=["data_analysis", "report_generation"], endpoints={ "task": "https://my-agent.example.com/a2a/task" })
# 创建 A2A 客户端client = A2AClient()
# 发现其他 Agentother_agent = client.discover("ReportGenerator")
# 发送任务请求response = client.send_task( agent=other_agent, task={ "type": "generate_report", "data": sales_data, "format": "pdf" })八、论文解读:Generative Agents
8.1 论文概述
Generative Agents: Interactive Simulacra of Human Behavior(Park et al., 2023)
斯坦福大学的研究,在虚拟小镇中部署 25 个 AI Agent,观察它们的社交行为和记忆形成。
8.2 核心架构
8.3 关键创新
1. 记忆流(Memory Stream)
class MemoryStream: def __init__(self): self.memories = [] # 按时间顺序存储
def add(self, event: str, importance: float): """添加新记忆""" memory = { "content": event, "timestamp": time.now(), "importance": importance, "access_count": 0 } self.memories.append(memory)
def retrieve(self, query: str, top_k: int = 10): """检索相关记忆""" scores = [] for mem in self.memories: # 综合评分:相关性 + 时近性 + 重要性 recency = self._recency_score(mem) importance = mem["importance"] relevance = self._relevance_score(query, mem) score = 0.5 * relevance + 0.3 * recency + 0.2 * importance scores.append((mem, score))
return sorted(scores, key=lambda x: -x[1])[:top_k]2. 反思机制(Reflection)
Agent 定期反思,形成更高层次的认知。
def reflect(agent, memory_stream): """生成反思""" # 1. 检索最近的记忆 recent_memories = memory_stream.retrieve_recent(n=100)
# 2. 提取关键问题 questions = llm.generate_questions(recent_memories)
# 3. 对每个问题生成洞察 insights = [] for q in questions: relevant = memory_stream.retrieve(q) insight = llm.synthesize(q, relevant) insights.append(insight)
# 4. 存储反思结果 for insight in insights: memory_stream.add(insight, importance=0.9)3. 规划系统(Planning)
规划层级:┌─────────────────────────────────────┐│ 长期计划(年/月) ││ "成为一名受尊敬的教授" │├─────────────────────────────────────┤│ 中期计划(周/天) ││ "完成论文初稿" │├─────────────────────────────────────┤│ 短期计划(小时) ││ "下午 2 点去图书馆查资料" │├─────────────────────────────────────┤│ 即时行动 ││ "走向图书馆" │└─────────────────────────────────────┘8.4 实验发现
- Agent 能够形成稳定的社交关系
- 记忆和反思机制让行为更连贯
- Agent 会主动组织活动(如生日派对)
- 信息传播模式与真实社会类似
8.5 对 Multi-Agent 系统的启示
1. 记忆架构的重要性 - 单一记忆不足以支撑复杂行为 - 需要分层记忆系统
2. 反思提升智能 - 不只是反应式执行 - 定期总结形成更高层次认知
3. 规划需要层级化 - 从长期目标到即时行动 - 动态调整计划
4. 社交行为的涌现 - 简单规则可以产生复杂行为 - Agent 间交互是智能的重要来源九、最佳实践
9.1 设计原则
1. 最小化 Agent 数量 - 从最少 Agent 开始 - 只在必要时增加
2. 明确职责边界 - 每个 Agent 只做一件事 - 避免职责重叠
3. 标准化通信 - 使用统一的消息格式 - 定义清晰的接口
4. 可观测性 - 记录所有 Agent 间通信 - 支持执行链追踪
5. 容错设计 - 单个 Agent 失败不影响整体 - 设置超时和重试机制9.2 常见陷阱
常见问题 FAQ
Q1:什么情况下应该使用 Multi-Agent?
A:当任务满足以下条件时考虑使用:
- 需要多种专业技能(如研究+写作+审核)
- 任务可以分解为相对独立的子任务
- 单 Agent 的输出质量不稳定
- 需要并行处理以提升效率
Q2:如何决定协作架构?
A:
- 任务分发场景 → 星型架构
- 流水线处理 → 链式架构
- 需要自由协商 → 网状架构
- 大规模复杂系统 → 层次化架构
Q3:如何防止 Agent 间通信风暴?
A:
- 限制广播频率
- 使用消息去重
- 设置消息 TTL(过期自动丢弃)
- 实现背压机制(队列满时阻塞生产者)
Q4:如何评估 Multi-Agent 系统效果?
A:
- 任务完成率
- 输出质量(人工或自动评估)
- 执行效率(时间、步数)
- 成本(Token 消耗)
- Agent 间通信效率
Q5:A2A 协议会成为标准吗?
A:A2A 仍处于早期阶段,但方向正确。类似 MCP 标准化 Agent 与工具的连接,A2A 有望标准化 Agent 间的连接。建议关注但不必急于全面采用。
小结
Multi-Agent 系统将复杂任务分解,让多个专业 Agent 协同完成。核心要素:
- 通信模式:同步/异步、消息队列
- 协作架构:星型、链式、网状、层次化
- 任务管理:分解策略、结果整合
- 冲突处理:优先级、投票、辩论、共识
- 标准协议:MCP 连接工具,A2A 连接 Agent
设计 Multi-Agent 系统时,遵循「最小必要」原则:从最简单的架构和最少的 Agent 开始,按需扩展。
下篇预告
Multi-Agent 系统需要处理大量数据、维护状态、协调执行。如何高效实现?
参考资料
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






