为什么单个 Agent 不够用了?MAS 核心概念拆解
2024–2025 年 Agent 从实验室走向生产,但很多团队很快发现:把所有任务塞给一个 LLM Agent,系统会在规模化时崩溃。Google 内部 Agent Bake-Off 实验显示,采用分布式多 Agent 架构后处理时间从 1 小时降至 10 分钟(约 6 倍提升)。AdaptOrch(2026)进一步证明:编排拓扑的选择对系统性能的影响比底层模型更大,在 SWE-bench 等基准上可带来 12–23% 的性能提升。
上下文窗口瓶颈:复杂任务的中间结果会把上下文塞满,后续推理质量骤降。
专业能力稀释:一个 Agent 既检索、又写代码、又做审核,样样都做但样样不精(jack-of-all-trades)。
串行执行低效:所有子任务顺序执行,总耗时是每步之和,无法并发。
单点故障(SPOF):一旦这个 Agent 出问题,整个流程全部停摆。
核心结论:多 Agent 协作架构(MAS)正是为解决上述问题而生——编排拓扑 > 模型选择。
MAS 定义:多 Agent 协作系统(Multi-Agent System)是由多个独立 AI Agent 组成的系统,这些 Agent 通过明确的通信协议和编排机制协作完成单个 Agent 无法高效完成的复杂任务。
| 特征 | 描述 |
|---|---|
| 角色专一 | 只负责一个明确定义的子任务(检索、推理、生成、验证等) |
| 工具访问 | 拥有完成自身任务所需的特定工具集 |
| 状态隔离 | 维护自己的上下文和内存,不污染其他 Agent |
| 可替换性 | 可以独立升级、替换,不影响整体系统 |
三种控制模式:
| 模式 | 结构 | 优点 | 缺点 |
|---|---|---|---|
| 集中式(Centralized) | Orchestrator 统一调度 A/B/C | 可审计、可控 | 单点瓶颈 |
| 分散式(Decentralized) | Agent 点对点网状通信 | 高弹性、低延迟 | 调试难、非确定性高 |
| 层级式(Hierarchical) | 顶层 Orchestrator → Team Lead → Worker | 平衡控制性与扩展性 | 设计复杂度中等 |
LangGraph vs CrewAI vs AutoGen:框架对比与选型建议
选对框架能省掉大量自研编排与状态管理代码。下表覆盖架构范式、语言支持、学习曲线、状态管理、HITL、可观测性、生产就绪度、原型速度、Azure 集成与典型场景。
| 维度 | LangGraph | CrewAI | AutoGen(微软) |
|---|---|---|---|
| 架构范式 | 状态机图 | 角色制团队 | 对话式多 Agent |
| 编程语言 | Python / JS/TS | Python | Python / .NET |
| 学习曲线 | 较陡 | 平缓 | 中等 |
| 状态管理 | 原生支持 | 需自实现 | 有限支持 |
| Human-in-the-Loop | 原生 interrupt() | 需自实现 | 支持 |
| 可观测性 | LangSmith(商业) | 有限 | Azure Monitor |
| 生产就绪度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 快速原型 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| Azure 集成 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
| 适合场景 | 复杂有状态工作流 | 角色制内容流水线 | 对话式协作与辩论 |
选型建议:
选 LangGraph:生产级可靠性(金融、医疗、合规)、复杂状态持久化、精细 HITL 控制、条件分支与循环的精确表达。
选 CrewAI:1–2 天快速验证 Idea、团队用「角色」直觉理解 Agent、内容生成与研究报告等角色制场景。
选 AutoGen:微软/Azure 技术栈、需要 Agent 多轮辩论与迭代推理、研究或快速实验不同对话模式。
编排拓扑的选择对系统性能的影响比底层模型的选择更大——先定模式与框架,再选模型。
六大编排设计模式与 MCP+A2A 双层通信协议
以下六种模式覆盖生产中 95% 以上的多 Agent 场景。模式一顺序流水线:Agent A 输出作为 B 输入,严格线性执行,适合文章创作、代码审查等固定流程。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
query: str
retrieved_docs: str
analysis: str
final_report: str
def retrieval_agent(state: PipelineState):
docs = search_knowledge_base(state["query"])
return {"retrieved_docs": docs}
def analysis_agent(state: PipelineState):
result = llm.invoke(f"分析以下内容:{state['retrieved_docs']}")
return {"analysis": result.content}
def writer_agent(state: PipelineState):
report = llm.invoke(f"根据分析撰写报告:{state['analysis']}")
return {"final_report": report.content}
builder = StateGraph(PipelineState)
builder.add_node("retriever", retrieval_agent)
builder.add_node("analyzer", analysis_agent)
builder.add_node("writer", writer_agent)
builder.add_edge(START, "retriever")
builder.add_edge("retriever", "analyzer")
builder.add_edge("analyzer", "writer")
builder.add_edge("writer", END)
pipeline = builder.compile()
模式二并行扇出/扇入:多个 Agent 同时处理独立子任务,总耗时 = max(T1…Tn)。LangGraph Send API 配合 Annotated[list, operator.add] Reducer 自动聚合并行结果。
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
research_results: Annotated[list, operator.add]
final_synthesis: str
def supervisor(state: ResearchState):
subtasks = [
{"query": state["query"], "source": "academic"},
{"query": state["query"], "source": "industry"},
{"query": state["query"], "source": "news"},
]
return [Send("research_worker", task) for task in subtasks]
def research_worker(state: dict):
result = search_by_source(state["query"], state["source"])
return {"research_results": [result]}
def synthesizer(state: ResearchState):
combined = "\n".join(state["research_results"])
synthesis = llm.invoke(f"综合以下研究结果:{combined}")
return {"final_synthesis": synthesis.content}
builder = StateGraph(ResearchState)
builder.add_node("research_worker", research_worker)
builder.add_node("synthesizer", synthesizer)
builder.add_conditional_edges(START, supervisor, ["research_worker"])
builder.add_edge("research_worker", "synthesizer")
builder.add_edge("synthesizer", END)
graph = builder.compile()
模式三层级主管-工人:Supervisor 做意图识别与路由,Worker 执行专业子任务。推荐关键字快速通道 + LLM 精确路由双层优化。
KEYWORD_ROUTING = {
"代码": "code_agent", "code": "code_agent",
"搜索": "search_agent", "查询": "search_agent",
"数据": "data_agent",
}
def supervisor_with_fast_path(state):
query = state["query"].lower()
for keyword, agent_name in KEYWORD_ROUTING.items():
if keyword in query:
return {"next": agent_name}
routing_prompt = f"""
用户请求:{state['query']}
可用Agent:code_agent, search_agent, data_agent
请返回最合适的Agent名称,只返回名称。
"""
decision = llm.invoke(routing_prompt)
return {"next": decision.content.strip()}
模式四群体协作(Swarm):Agent 点对点传递任务,无中央协调者,靠终止规则停止。生产环境慎用,须设硬性轮数上限。AutoGen GroupChat + max_round 示例:
import autogen
reviewer_1 = autogen.AssistantAgent(
name="SecurityReviewer",
system_message="你是一位安全专家,专注于代码中的安全漏洞。"
)
reviewer_2 = autogen.AssistantAgent(
name="PerformanceReviewer",
system_message="你是一位性能专家,专注于代码的效率和资源使用。"
)
human_proxy = autogen.UserProxyAgent(
name="CodeAuthor",
human_input_mode="NEVER",
max_consecutive_auto_reply=2,
is_termination_msg=lambda x: "APPROVED" in x.get("content", "")
)
groupchat = autogen.GroupChat(
agents=[human_proxy, reviewer_1, reviewer_2],
messages=[],
max_round=6
)
manager = autogen.GroupChatManager(groupchat=groupchat)
模式五黑板架构(Blackboard):所有 Agent 共享结构化工作空间,满足前提条件时主动读写黑板,适合小时级异步任务与异构服务协作。模式六混合模式(Hybrid):Intent 路由器 + Supervisor + 并行扇出 + 质量保障流水线组合,是企业内容生成系统的典型架构。
MCP + A2A 双层协议(Linux Foundation Agentic AI Foundation 治理):MCP 垂直层统一 Agent ↔ 工具/数据库/API;A2A 水平层标准化 Agent ↔ Agent 任务委托与能力发现。
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("data-agent-mcp")
@app.list_tools()
async def list_tools():
return [
Tool(
name="query_customer_db",
description="查询客户数据库,支持按ID、姓名、邮箱检索",
inputSchema={
"type": "object",
"properties": {
"field": {"type": "string", "enum": ["id", "name", "email"]},
"value": {"type": "string"}
},
"required": ["field", "value"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_customer_db":
result = db.query(arguments["field"], arguments["value"])
return [TextContent(type="text", text=str(result))]
A2A Agent Card(/.well-known/agent.json)声明能力;Orchestrator 通过 JSON-RPC 2.0 委托任务:
import httpx
async def discover_and_delegate(agent_url: str, task: str):
card_response = await httpx.get(f"{agent_url}/.well-known/agent.json")
agent_card = card_response.json()
available_skills = [s["id"] for s in agent_card["skills"]]
if "web_research" not in available_skills:
raise ValueError(f"Agent {agent_card['name']} 不支持 web_research 技能")
payload = {
"jsonrpc": "2.0",
"method": "message/send",
"id": "task-001",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": task}]
}
}
}
response = await httpx.post(agent_card["url"], json=payload)
return response.json()
延伸阅读:工具接入细节见本站从 0 开发 MCP Server 教程;协议战略视角见MCP 为何成为 AI 时代的 HTTP 协议。
生产级多 Agent 系统六步 Runbook
选型与拓扑设计:用决策树确定编排模式(流水线 / 扇出 / 层级 / 黑板 / 混合),Agent 数量控制在 3–8 个,避免过度工程。
状态持久化:使用 PostgresSaver 做检查点存储,支持跨进程恢复与断点续传;每个会话绑定 thread_id。
Human-in-the-Loop:高风险操作用 interrupt() 暂停等待人工确认,合规行业必备。
熔断与重试:为外部 Agent 调用封装 CircuitBreaker,失败阈值触发 OPEN 状态,防止级联故障。
Token 预算:部署 TokenBudgetManager,在每次 Agent 调用前检查剩余预算,防止费用失控。
可观测性接入:OpenTelemetry traced_agent_call 携带 correlation_id;追踪 MONITORING_METRICS 指标;抽样运行 LLM-as-Judge 评估输出质量。
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import interrupt
with PostgresSaver.from_conn_string("postgresql://user:pass@localhost/agentdb") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-session-12345"}}
result = graph.invoke({"query": "分析Q2财报"}, config)
def high_risk_action_agent(state):
proposed_action = plan_action(state)
human_decision = interrupt({
"proposed_action": proposed_action,
"risk_level": "HIGH",
"message": "此操作将修改生产数据库,请确认是否执行"
})
if human_decision["approved"]:
return execute_action(proposed_action)
return {"status": "cancelled", "reason": human_decision.get("reason")}
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "CLOSED"
self.last_failure_time = None
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker OPEN - Agent 暂时不可用")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
return wrapper
class TokenBudgetManager:
def __init__(self, total_budget: int = 100_000):
self.total_budget = total_budget
self.used_tokens = 0
self.agent_usage = {}
def check_budget(self, agent_name: str, estimated_tokens: int) -> bool:
remaining = self.total_budget - self.used_tokens
if estimated_tokens > remaining:
raise BudgetExceededException(
f"Agent {agent_name} 请求 {estimated_tokens} tokens,"
f"但剩余预算仅 {remaining} tokens"
)
return True
def record_usage(self, agent_name: str, actual_tokens: int):
self.used_tokens += actual_tokens
self.agent_usage[agent_name] = self.agent_usage.get(agent_name, 0) + actual_tokens
可观测性硬数据、四大踩坑与 2026 趋势
MAST 研究团队对 1642 个多 Agent 执行追踪的分析揭示了故障分布:
| 故障类型 | 占比 | 说明 |
|---|---|---|
| 系统设计问题 | 41.77% | 步骤重复、工具选择错误、上下文溢出、缺少终止条件 |
| Agent 间不对齐 | 36.94% | 交接时上下文丢失、幻觉成为下一个 Agent 的「事实」 |
| 任务验证失败 | 21.30% | 过早终止、不完整验证 |
57% vs 8%:57% 的组织已有 Agent 在生产运行,但仅 8% 完成了 LLM 可观测性实施——大量错误以 HTTP 200 返回,监控面板全绿但输出错误。
Google Bake-Off 6×:分布式多 Agent 架构将处理时间从 1 小时降至 10 分钟,子 Agent 可独立升级。
AdaptOrch 12–23%:正确编排拓扑在 SWE-bench 等基准上带来 12–23% 性能提升,优于单纯换模型。
from opentelemetry import trace
import uuid
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_call(agent_name: str, task: dict, correlation_id: str = None):
if not correlation_id:
correlation_id = str(uuid.uuid4())
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("correlation.id", correlation_id)
span.set_attribute("task.type", task.get("type", "unknown"))
try:
result = agent_registry[agent_name].run(task)
span.set_attribute("agent.tokens_used", result.get("tokens", 0))
span.set_attribute("agent.status", "success")
return result
except Exception as e:
span.set_attribute("agent.status", "error")
span.set_attribute("error.message", str(e))
raise
MONITORING_METRICS = {
"task_success_rate": "端到端任务完成率(目标:>85%)",
"e2e_latency_p95": "P95端到端延迟(目标:<30s)",
"total_cost_per_task": "每次任务平均Token成本",
"agent_error_rate": "各Agent错误率(目标:<5%)",
"agent_retry_count": "重试次数(高重试 = 需要调查)",
"tool_call_budget_usage": "工具调用次数/预算比",
"output_quality_score": "输出质量评分",
"goal_alignment_score": "目标一致性评分",
"hallucination_rate": "幻觉检测率",
}
def evaluate_agent_output(original_task: str, agent_output: str) -> dict:
evaluation_prompt = f"""
原始任务:{original_task}
Agent输出:{agent_output}
请从任务完成度、准确性、相关性、幻觉检测四个维度评分(1-5分),
以JSON返回:{{"completeness": x, "accuracy": x, "relevance": x,
"hallucination_detected": true/false, "comments": "..."}}
"""
evaluation = llm.invoke(evaluation_prompt)
return json.loads(evaluation.content)
四大踩坑与防坑:
上下文污染:Agent A 幻觉传给 B/C,全链路基于错误前提。防坑:每个交接点用 validate_agent_output 做 Schema 与置信度校验。
无限循环:重试或工具调用失控,Token 费用暴涨。防坑:设置 MAX_ITERATIONS、MAX_TOOL_CALLS_PER_AGENT、MAX_TOTAL_TOKENS 硬性上限。
过度工程:简单两步链拆成 8 个 Agent。防坑:先从顺序流水线开始,生产最佳数量 3–8 个。
Demo 鸿沟:内部 Demo 漂亮,上线后边缘输入频繁失败。防坑:ProductionGuardrails 做输入长度、注入检测与 PII 过滤。
并行同步:Send API 扇出后慢分支未完成就触发 Supervisor 重跑。防坑:LangGraph defer=True 创建显式同步屏障。
def validate_agent_output(output: dict, schema: dict) -> bool:
jsonschema.validate(output, schema)
if output.get("confidence_score", 1.0) < 0.7:
raise LowConfidenceError(f"Agent 输出置信度过低: {output['confidence_score']}")
required_fields = schema.get("required", [])
missing = [f for f in required_fields if not output.get(f)]
if missing:
raise MissingFieldsError(f"输出缺少必填字段: {missing}")
return True
MAX_ITERATIONS = 10
MAX_TOOL_CALLS_PER_AGENT = 20
MAX_TOTAL_TOKENS = 50_000
class ProductionGuardrails:
def validate_input(self, user_input: str) -> str:
if len(user_input) > 10000:
raise InputTooLongError("输入超过10000字符限制")
injection_patterns = ["ignore previous instructions", "forget everything"]
for pattern in injection_patterns:
if pattern.lower() in user_input.lower():
raise PromptInjectionError("检测到潜在的提示注入攻击")
return user_input.strip()
def validate_output(self, output: str) -> str:
output = self.pii_filter.redact(output)
if self.content_classifier.is_harmful(output):
raise HarmfulContentError("输出包含有害内容")
return output
builder.add_node("supervisor", supervisor_node, defer=True)
你的任务是否有明确的线性依赖步骤?
├─ 是 → 子任务是否可以并发执行?
│ ├─ 否 → 【顺序流水线】
│ └─ 是 → 【并行扇出 + 顺序流水线 混合】
└─ 否 → 是否有一个 Agent 具有决策权威?
├─ 是 → 规模是否足够大需要子团队?
│ ├─ 否 → 【Supervisor-Worker 层级模式】
│ └─ 是 → 【层级式(Supervisors of Supervisors)】
└─ 否 → 任务是否是长时间异步的?
├─ 是 → 【黑板架构】
└─ 否 → Agent 数量是否 ≤ 5?
├─ 是 → 【Swarm(注意设置终止条件)】
└─ 否 → 【考虑重新拆分为层级模式】
五条核心要点:① 编排拓扑 > 模型选择;② 从顺序流水线开始,有证据再增加 Agent;③ MCP + A2A 是新项目标准协议;④ 可观测性不是可选项(57% vs 8% 差距);⑤ 生产 Agent 数量 3–8 个最佳。
2026 趋势:联邦编排(多团队子编排器共享路由策略)、多模态多 Agent(视觉/音频与文本混合协作)、自适应拓扑选择(AdaptOrch 方向)、EU AI Act 要求完整决策审计链。
注意:在笔记本本地跑多 Agent 编排会遭遇休眠断连、内存不足与网络抖动;廉价 Linux VPS 又无法原生运行 Xcode 与 Apple Silicon 优化推理。多 Agent 架构是 2026 年 AI 工程的核心竞争力,但生产环境需要稳定宿主。
对于需要 7×24 多 Agent 编排常驻、PostgresSaver 检查点持久化、MCP Server 与 Cursor / Claude Code 协作的生产环境,MESHLAUNCH 的 Mac Mini 云端租赁通常是更优解:独占 Apple Silicon、按天/周/月弹性下单,作为 LangGraph 工作流与本地向量检索的稳定宿主,让你的编排资产从「绑定特定供应商」变为团队自有的可移植能力。
需要复杂状态管理、HITL 与合规审计选 LangGraph;1–2 天快速原型与角色制内容流水线选 CrewAI;微软/Azure 技术栈与多轮辩论协作选 AutoGen。多 Agent 编排宿主方案见租赁价格页。
MCP 是垂直层,统一 Agent 访问工具、数据库与 API(写一次,到处用);A2A 是水平层,通过 Agent Card 与 JSON-RPC 2.0 实现 Agent 间任务委托与能力发现。两者均由 Linux Foundation Agentic AI Foundation 治理,建议新项目直接采用双层架构。
至少需要 PostgreSQL 检查点持久化(PostgresSaver)、OpenTelemetry 分布式追踪、Token 预算与熔断器,以及 7×24 稳定宿主避免笔记本休眠中断长时会话。Mac Mini 裸金属可同时运行 LangGraph 工作流、MCP Server 与本地向量检索。部署与网络问题见帮助中心。