多Agent協作架構實戰
從設計模式到生產落地(2026)

六大編排模式 · LangGraph/CrewAI/AutoGen · MCP+A2A · 可觀測性與踩坑指南

多Agent協作架構實戰:從設計模式到生產落地(2026年完整指南)
若你把檢索、撰碼、審核全塞進單一 LLM Agent,規模化時必然撞上上下文溢位、串列延遲與單點故障。本稿面向 AI 工程師與架構師,依序涵蓋:① 單 Agent 四大瓶頸與 MAS 三種控制模式;② LangGraph/CrewAI/AutoGen 選型矩陣;③ 六大編排模式與 MCP+A2A 雙層協定完整程式碼;④ PostgresSaver、熔斷器、Token 預算等正式環境實踐;⑤ MAST 故障分佈數據、五大踩坑與六步生產 Runbook
01

為什麼單一 Agent 已不敷使用?MAS 核心概念拆解

2024–2025 年 Agent 從實驗室走向正式環境,許多團隊很快發現:把所有任務塞給一個 LLM Agent,系統會在規模化時崩潰。Google 內部 Agent Bake-Off 實驗顯示,採用分散式多 Agent 架構後,處理時間從 1 小時降至 10 分鐘(約 6 倍提升)。AdaptOrch(2026)進一步證明:編排拓撲的選擇對系統效能的影響比底層模型更大,在 SWE-bench 等基準上可帶來 12–23% 的效能提升。

01

上下文視窗瓶頸:複雜任務的中間結果會把上下文塞滿,後續推理品質驟降。

02

專業能力稀釋:一個 Agent 既檢索、又寫程式、又做審核,樣樣都做但樣樣不精(jack-of-all-trades)。

03

串列執行低效:所有子任務依序執行,總耗時是每步之和,無法並行。

04

單點故障(SPOF):一旦這個 Agent 出問題,整個流程全部停擺。

05

核心結論:多 Agent 協作架構(MAS)正是為解決上述問題而生——編排拓撲 > 模型選擇。

MAS 定義:多 Agent 協作系統(Multi-Agent System)是由多個獨立 AI Agent 組成的系統,這些 Agent 透過明確的通訊協定與編排機制協作,完成單一 Agent 無法高效處理的複雜任務。

特徵描述
角色專一只負責一個明確定義的子任務(檢索、推理、生成、驗證等)
工具存取擁有完成自身任務所需的特定工具集
狀態隔離維護自己的上下文與記憶體,不污染其他 Agent
可替換性可以獨立升級、替換,不影響整體系統

三種控制模式:

模式結構優點缺點
集中式(Centralized)Orchestrator 統一調度 A/B/C可稽核、可控單點瓶頸
分散式(Decentralized)Agent 點對點網狀通訊高彈性、低延遲除錯難、非確定性高
層級式(Hierarchical)頂層 Orchestrator → Team Lead → Worker平衡控制性與擴展性設計複雜度中等
02

LangGraph vs CrewAI vs AutoGen:框架對照與選型建議

選對框架能省掉大量自研編排與狀態管理程式碼。下表涵蓋架構範式、語言支援、學習曲線、狀態管理、HITL、可觀測性、正式環境就緒度、原型速度、Azure 整合與典型場景。

維度LangGraphCrewAIAutoGen(微軟)
架構範式狀態機圖角色制團隊對話式多 Agent
程式語言Python/JS/TSPythonPython/.NET
學習曲線較陡平緩中等
狀態管理原生支援需自實作有限支援
Human-in-the-Loop原生 interrupt()需自實作支援
可觀測性LangSmith(商業)有限Azure Monitor
正式環境就緒度⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
快速原型⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Azure 整合⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
適合場景複雜有狀態工作流角色制內容流水線對話式協作與辯論

選型建議:

LG

選 LangGraph:正式環境級可靠性(金融、醫療、合規)、複雜狀態持久化、精細 HITL 控制、條件分支與迴圈的精確表達。

CR

選 CrewAI:1–2 天快速驗證 Idea、團隊用「角色」直覺理解 Agent、內容生成與研究報告等角色制場景。

AG

選 AutoGen:微軟/Azure 技術棧、需要 Agent 多輪辯論與迭代推理、研究或快速實驗不同對話模式。

編排拓撲的選擇對系統效能的影響,比底層模型的選擇更大——先定模式與框架,再選模型。

03

六大編排設計模式與 MCP+A2A 雙層通訊協定

以下六種模式涵蓋正式環境中 95% 以上的多 Agent 場景。模式一順序流水線:Agent A 輸出作為 B 輸入,嚴格線性執行,適合文章創作、程式碼審查等固定流程。

python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class PipelineState(TypedDict):
    query: str
    retrieved_docs: str
    analysis: str
    final_report: str

def retrieval_agent(state: PipelineState):
    docs = search_knowledge_base(state["query"])
    return {"retrieved_docs": docs}

def analysis_agent(state: PipelineState):
    result = llm.invoke(f"分析以下內容:{state['retrieved_docs']}")
    return {"analysis": result.content}

def writer_agent(state: PipelineState):
    report = llm.invoke(f"根據分析撰寫報告:{state['analysis']}")
    return {"final_report": report.content}

builder = StateGraph(PipelineState)
builder.add_node("retriever", retrieval_agent)
builder.add_node("analyzer", analysis_agent)
builder.add_node("writer", writer_agent)
builder.add_edge(START, "retriever")
builder.add_edge("retriever", "analyzer")
builder.add_edge("analyzer", "writer")
builder.add_edge("writer", END)
pipeline = builder.compile()

模式二並行扇出/扇入:多個 Agent 同時處理獨立子任務,總耗時 = max(T1…Tn)。LangGraph Send API 配合 Annotated[list, operator.add] Reducer 自動彙整並行結果。

python
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class ResearchState(TypedDict):
    query: str
    research_results: Annotated[list, operator.add]
    final_synthesis: str

def supervisor(state: ResearchState):
    subtasks = [
        {"query": state["query"], "source": "academic"},
        {"query": state["query"], "source": "industry"},
        {"query": state["query"], "source": "news"},
    ]
    return [Send("research_worker", task) for task in subtasks]

def research_worker(state: dict):
    result = search_by_source(state["query"], state["source"])
    return {"research_results": [result]}

def synthesizer(state: ResearchState):
    combined = "\n".join(state["research_results"])
    synthesis = llm.invoke(f"綜合以下研究結果:{combined}")
    return {"final_synthesis": synthesis.content}

builder = StateGraph(ResearchState)
builder.add_node("research_worker", research_worker)
builder.add_node("synthesizer", synthesizer)
builder.add_conditional_edges(START, supervisor, ["research_worker"])
builder.add_edge("research_worker", "synthesizer")
builder.add_edge("synthesizer", END)
graph = builder.compile()

模式三層級主管-工人:Supervisor 做意圖識別與路由,Worker 執行專業子任務。建議採用關鍵字快速通道 + LLM 精確路由雙層最佳化。

python
KEYWORD_ROUTING = {
    "程式碼": "code_agent", "code": "code_agent",
    "搜尋": "search_agent", "查詢": "search_agent",
    "資料": "data_agent",
}

def supervisor_with_fast_path(state):
    query = state["query"].lower()
    for keyword, agent_name in KEYWORD_ROUTING.items():
        if keyword in query:
            return {"next": agent_name}
    routing_prompt = f"""
    使用者請求:{state['query']}
    可用 Agent:code_agent, search_agent, data_agent
    請回傳最合適的 Agent 名稱,只回傳名稱。
    """
    decision = llm.invoke(routing_prompt)
    return {"next": decision.content.strip()}

模式四群體協作(Swarm):Agent 點對點傳遞任務,無中央協調者,靠終止規則停止。正式環境須謹慎使用,並設定硬性輪數上限。AutoGen GroupChat + max_round 範例:

python
import autogen

reviewer_1 = autogen.AssistantAgent(
    name="SecurityReviewer",
    system_message="你是一位安全專家,專注於程式碼中的安全漏洞。"
)
reviewer_2 = autogen.AssistantAgent(
    name="PerformanceReviewer",
    system_message="你是一位效能專家,專注於程式碼的效率與資源使用。"
)
human_proxy = autogen.UserProxyAgent(
    name="CodeAuthor",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=2,
    is_termination_msg=lambda x: "APPROVED" in x.get("content", "")
)
groupchat = autogen.GroupChat(
    agents=[human_proxy, reviewer_1, reviewer_2],
    messages=[],
    max_round=6
)
manager = autogen.GroupChatManager(groupchat=groupchat)

模式五黑板架構(Blackboard):所有 Agent 共享結構化工作空間,滿足前提條件時主動讀寫黑板,適合小時級非同步任務與異質服務協作。模式六混合模式(Hybrid):Intent 路由器 + Supervisor + 並行扇出 + 品質保障流水線組合,是企業內容生成系統的典型架構。

MCP + A2A 雙層協定(Linux Foundation Agentic AI Foundation 治理):MCP 垂直層統一 Agent ↔ 工具/資料庫/API;A2A 水平層標準化 Agent ↔ Agent 任務委託與能力發現。

python
from mcp.server import Server
from mcp.types import Tool, TextContent

app = Server("data-agent-mcp")

@app.list_tools()
async def list_tools():
    return [
        Tool(
            name="query_customer_db",
            description="查詢客戶資料庫,支援依 ID、姓名、電子郵件檢索",
            inputSchema={
                "type": "object",
                "properties": {
                    "field": {"type": "string", "enum": ["id", "name", "email"]},
                    "value": {"type": "string"}
                },
                "required": ["field", "value"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "query_customer_db":
        result = db.query(arguments["field"], arguments["value"])
        return [TextContent(type="text", text=str(result))]

A2A Agent Card(/.well-known/agent.json)宣告能力;Orchestrator 透過 JSON-RPC 2.0 委託任務:

python
import httpx

async def discover_and_delegate(agent_url: str, task: str):
    card_response = await httpx.get(f"{agent_url}/.well-known/agent.json")
    agent_card = card_response.json()
    available_skills = [s["id"] for s in agent_card["skills"]]
    if "web_research" not in available_skills:
        raise ValueError(f"Agent {agent_card['name']} 不支援 web_research 技能")
    payload = {
        "jsonrpc": "2.0",
        "method": "message/send",
        "id": "task-001",
        "params": {
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": task}]
            }
        }
    }
    response = await httpx.post(agent_card["url"], json=payload)
    return response.json()

延伸閱讀:工具接入細節見本站從 0 開發 MCP Server 教學;協定戰略視角見MCP 為何成為 AI 時代的 HTTP 協定

04

正式級多 Agent 系統六步 Runbook

01

選型與拓撲設計:用決策樹確定編排模式(流水線/扇出/層級/黑板/混合),Agent 數量控制在 3–8 個,避免過度工程。

02

狀態持久化:使用 PostgresSaver 做檢查點儲存,支援跨程序恢復與斷點續傳;每個工作階段綁定 thread_id

03

Human-in-the-Loop:高風險操作用 interrupt() 暫停等待人工確認,合規產業必備。

04

熔斷與重試:為外部 Agent 呼叫封裝 CircuitBreaker,失敗閾值觸發 OPEN 狀態,防止級聯故障。

05

Token 預算:部署 TokenBudgetManager,在每次 Agent 呼叫前檢查剩餘預算,防止費用失控。

06

可觀測性接入:OpenTelemetry traced_agent_call 攜帶 correlation_id;追蹤 MONITORING_METRICS 指標;抽樣執行 LLM-as-Judge 評估輸出品質。

python
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import interrupt

with PostgresSaver.from_conn_string("postgresql://user:pass@localhost/agentdb") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "user-session-12345"}}
    result = graph.invoke({"query": "分析 Q2 財報"}, config)

def high_risk_action_agent(state):
    proposed_action = plan_action(state)
    human_decision = interrupt({
        "proposed_action": proposed_action,
        "risk_level": "HIGH",
        "message": "此操作將修改正式環境資料庫,請確認是否執行"
    })
    if human_decision["approved"]:
        return execute_action(proposed_action)
    return {"status": "cancelled", "reason": human_decision.get("reason")}
python
import time
from functools import wraps

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "CLOSED"
        self.last_failure_time = None

    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if self.state == "OPEN":
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise Exception("Circuit breaker OPEN - Agent 暫時無法使用")
            try:
                result = await func(*args, **kwargs)
                if self.state == "HALF_OPEN":
                    self.state = "CLOSED"
                    self.failure_count = 0
                return result
            except Exception:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = "OPEN"
                raise
        return wrapper

class TokenBudgetManager:
    def __init__(self, total_budget: int = 100_000):
        self.total_budget = total_budget
        self.used_tokens = 0
        self.agent_usage = {}

    def check_budget(self, agent_name: str, estimated_tokens: int) -> bool:
        remaining = self.total_budget - self.used_tokens
        if estimated_tokens > remaining:
            raise BudgetExceededException(
                f"Agent {agent_name} 請求 {estimated_tokens} tokens,"
                f"但剩餘預算僅 {remaining} tokens"
            )
        return True

    def record_usage(self, agent_name: str, actual_tokens: int):
        self.used_tokens += actual_tokens
        self.agent_usage[agent_name] = self.agent_usage.get(agent_name, 0) + actual_tokens
05

可觀測性硬數據、五大踩坑與 2026 趨勢

MAST 研究團隊對 1642 個多 Agent 執行追蹤的分析揭示了故障分佈:

故障類型佔比說明
系統設計問題41.77%步驟重複、工具選擇錯誤、上下文溢位、缺少終止條件
Agent 間不對齊36.94%交接時上下文遺失、幻覺成為下一個 Agent 的「事實」
任務驗證失敗21.30%過早終止、不完整驗證
A

57% vs 8%:57% 的組織已有 Agent 在正式環境運行,但僅 8% 完成了 LLM 可觀測性實施——大量錯誤以 HTTP 200 回傳,監控面板全綠但輸出錯誤。

B

Google Bake-Off 6×:分散式多 Agent 架構將處理時間從 1 小時降至 10 分鐘,子 Agent 可獨立升級。

C

AdaptOrch 12–23%:正確編排拓撲在 SWE-bench 等基準上帶來 12–23% 效能提升,優於單純換模型。

python
from opentelemetry import trace
import uuid

tracer = trace.get_tracer("multi-agent-system")

def traced_agent_call(agent_name: str, task: dict, correlation_id: str = None):
    if not correlation_id:
        correlation_id = str(uuid.uuid4())
    with tracer.start_as_current_span(f"agent.{agent_name}") as span:
        span.set_attribute("agent.name", agent_name)
        span.set_attribute("correlation.id", correlation_id)
        span.set_attribute("task.type", task.get("type", "unknown"))
        try:
            result = agent_registry[agent_name].run(task)
            span.set_attribute("agent.tokens_used", result.get("tokens", 0))
            span.set_attribute("agent.status", "success")
            return result
        except Exception as e:
            span.set_attribute("agent.status", "error")
            span.set_attribute("error.message", str(e))
            raise

MONITORING_METRICS = {
    "task_success_rate": "端到端任務完成率(目標:>85%)",
    "e2e_latency_p95": "P95 端到端延遲(目標:<30s)",
    "total_cost_per_task": "每次任務平均 Token 成本",
    "agent_error_rate": "各 Agent 錯誤率(目標:<5%)",
    "agent_retry_count": "重試次數(高重試 = 需要調查)",
    "tool_call_budget_usage": "工具呼叫次數/預算比",
    "output_quality_score": "輸出品質評分",
    "goal_alignment_score": "目標一致性評分",
    "hallucination_rate": "幻覺偵測率",
}

def evaluate_agent_output(original_task: str, agent_output: str) -> dict:
    evaluation_prompt = f"""
    原始任務:{original_task}
    Agent 輸出:{agent_output}
    請從任務完成度、準確性、相關性、幻覺偵測四個維度評分(1-5 分),
    以 JSON 回傳:{{"completeness": x, "accuracy": x, "relevance": x,
    "hallucination_detected": true/false, "comments": "..."}}
    """
    evaluation = llm.invoke(evaluation_prompt)
    return json.loads(evaluation.content)

五大踩坑與防坑:

坑1

上下文污染:Agent A 幻覺傳給 B/C,全鏈路基於錯誤前提。防坑:每個交接點用 validate_agent_output 做 Schema 與置信度校驗。

坑2

無限迴圈:重試或工具呼叫失控,Token 費用暴漲。防坑:設定 MAX_ITERATIONSMAX_TOOL_CALLS_PER_AGENTMAX_TOTAL_TOKENS 硬性上限。

坑3

過度工程:簡單兩步鏈拆成 8 個 Agent。防坑:先從順序流水線開始,正式環境最佳數量 3–8 個

坑4

Demo 鴻溝:內部 Demo 漂亮,上線後邊緣輸入頻繁失敗。防坑:ProductionGuardrails 做輸入長度、注入偵測與 PII 過濾。

坑5

並行同步:Send API 扇出後慢分支未完成就觸發 Supervisor 重跑。防坑:LangGraph defer=True 建立顯式同步屏障。

python
def validate_agent_output(output: dict, schema: dict) -> bool:
    jsonschema.validate(output, schema)
    if output.get("confidence_score", 1.0) < 0.7:
        raise LowConfidenceError(f"Agent 輸出置信度過低: {output['confidence_score']}")
    required_fields = schema.get("required", [])
    missing = [f for f in required_fields if not output.get(f)]
    if missing:
        raise MissingFieldsError(f"輸出缺少必填欄位: {missing}")
    return True

MAX_ITERATIONS = 10
MAX_TOOL_CALLS_PER_AGENT = 20
MAX_TOTAL_TOKENS = 50_000

class ProductionGuardrails:
    def validate_input(self, user_input: str) -> str:
        if len(user_input) > 10000:
            raise InputTooLongError("輸入超過 10000 字元限制")
        injection_patterns = ["ignore previous instructions", "forget everything"]
        for pattern in injection_patterns:
            if pattern.lower() in user_input.lower():
                raise PromptInjectionError("偵測到潛在的提示注入攻擊")
        return user_input.strip()

    def validate_output(self, output: str) -> str:
        output = self.pii_filter.redact(output)
        if self.content_classifier.is_harmful(output):
            raise HarmfulContentError("輸出包含有害內容")
        return output

builder.add_node("supervisor", supervisor_node, defer=True)
text
你的任務是否有明確的線性依賴步驟?
├─ 是 → 子任務是否可以並行執行?
│        ├─ 否 → 【順序流水線】
│        └─ 是 → 【並行扇出 + 順序流水線 混合】
└─ 否 → 是否有一個 Agent 具有決策權威?
         ├─ 是 → 規模是否足夠大需要子團隊?
         │        ├─ 否 → 【Supervisor-Worker 層級模式】
         │        └─ 是 → 【層級式(Supervisors of Supervisors)】
         └─ 否 → 任務是否是長時間非同步的?
                  ├─ 是 → 【黑板架構】
                  └─ 否 → Agent 數量是否 ≤ 5?
                           ├─ 是 → 【Swarm(注意設定終止條件)】
                           └─ 否 → 【考慮重新拆分為層級模式】

五條核心要點:① 編排拓撲 > 模型選擇;② 從順序流水線開始,有證據再增加 Agent;③ MCP + A2A 是新專案標準協定;④ 可觀測性不是可選項(57% vs 8% 差距);⑤ 正式環境 Agent 數量 3–8 個最佳。

2026 趨勢:聯邦編排(多團隊子編排器共享路由策略)、多模態多 Agent(視覺/音訊與文字混合協作)、自適應拓撲選擇(AdaptOrch 方向)、EU AI Act 要求完整決策稽核鏈。

注意:在筆電本機跑多 Agent 編排會遭遇休眠斷連、記憶體不足與網路抖動;廉價 Linux VPS 又無法原生執行 Xcode 與 Apple Silicon 最佳化推理。多 Agent 架構是 2026 年 AI 工程的核心競爭力,但正式環境需要穩定宿主。

對於需要 7×24 多 Agent 編排常駐、PostgresSaver 檢查點持久化、MCP Server 與 Cursor/Claude Code 協作的正式環境,MESHLAUNCH 的 Mac Mini 雲端租用通常是更優解:獨占 Apple Silicon、按天/週/月彈性下單,作為 LangGraph 工作流與本機向量檢索的穩定宿主,讓你的編排資產從「綁定特定供應商」變為團隊自有的可移植能力。

常見問題

需要複雜狀態管理、HITL 與合規稽核選 LangGraph;1–2 天快速原型與角色制內容流水線選 CrewAI;微軟/Azure 技術棧與多輪辯論協作選 AutoGen。多 Agent 編排宿主方案見租用價格頁

MCP 是垂直層,統一 Agent 存取工具、資料庫與 API(寫一次,到處用);A2A 是水平層,透過 Agent Card 與 JSON-RPC 2.0 實現 Agent 間任務委託與能力發現。兩者均由 Linux Foundation Agentic AI Foundation 治理,建議新專案直接採用雙層架構。

至少需要 PostgreSQL 檢查點持久化(PostgresSaver)、OpenTelemetry 分散式追蹤、Token 預算與熔斷器,以及 7×24 穩定宿主避免筆電休眠中斷長時工作階段。Mac Mini 裸金屬可同時執行 LangGraph 工作流、MCP Server 與本機向量檢索。部署與網路問題見幫助中心