Warum ein einzelner LLM-Agent in Produktion scheitert
Der monolithische Agent — ein LLM für Retrieval, Reasoning, Coding und Freigabe — lässt sich schnell demonstrieren, bricht aber beim Skalieren strukturell ein. Bis 2026 stießen die meisten Teams, die einen einzelnen Agenten hochskalierten, auf dieselben vier Grenzen.
Kontextfenster-Grenze: Zwischenergebnisse füllen das Fenster; die Qualität der Schlussfolgerungen sinkt bei langen Workflows stark.
Jack-of-all-trades-Verwässerung: Ein Agent für Retrieval, Generierung und Audit macht nichts davon wirklich gut.
Keine Parallelität: Sequentielle Schritte bedeuten: Gesamtlatenz = Summe aller Schritte.
Single Point of Failure: Ein fehlgeschlagener Modellaufruf oder Tool-Timeout stoppt die gesamte Pipeline.
Produktionsbelege: Googles interner Agent Bake-Off (MLflow 2026) verkürzte die Verarbeitungszeit von einer Stunde auf zehn Minuten — ein 6×-Gewinn nach Zerlegung in verteilte Agenten. AdaptOrch (2026) zeigte: Orchestrierungstopologie schlägt Modellwahl und bringt auf SWE-bench 12–23 % Zusatzgewinn.
Ein Multi-Agent-System (MAS) ist eine Sammlung unabhängiger Agenten, die über definierte Protokolle und Orchestrierung zusammenarbeiten, um Aufgaben zu erledigen, die kein einzelner Agent effizient bewältigen kann. Jeder Agent sollte single-responsibility, mit passenden Tools ausgestattet, zustandsisoliert und unabhängig austauschbar sein.
| Agent-Eigenschaft | Bedeutung in Produktion |
|---|---|
| Rollenfokus | Eine klar abgegrenzte Aufgabe: Retrieval, Reasoning, Generierung oder Validierung |
| Tool-Zugang | Nur die für die Rolle nötigen Tools über MCP Server |
| Zustandsisolation | Eigener Kontext und Speicher; keine Verunreinigung anderer Agenten |
| Austauschbarkeit | Einzelnen Worker-Agenten tauschen, ohne den Graphen neu zu verdrahten |
Drei Steuerungstopologien: Zentralisiert — Orchestrator routet alles (auditierbar, aber Engpass). Dezentralisiert — Peer-Netzwerk (resilient, schwer zu debuggen). Hierarchisch — Top-Orchestrator → Team Leads → Worker (Balance aus Kontrolle und Skalierung).
| Modus | Struktur | Vorteile | Nachteile |
|---|---|---|---|
| Zentralisiert | Orchestrator steuert A/B/C | Auditierbar, kontrollierbar | Engpass im Zentrum |
| Dezentralisiert | Peer-to-Peer-Mesh | Hohe Elastizität, geringe Latenz | Schwer debugbar, nicht deterministisch |
| Hierarchisch | Top-Orchestrator → Team Lead → Worker | Kontrolle und Skalierung balanciert | Höhere Designkomplexität |
Für Produktion ist Multi-Agent-Architektur fast immer richtig. Die harte Frage ist das Orchestrierungsmuster — nicht das Foundation-Modell.
LangGraph vs CrewAI vs AutoGen: Vergleich und Auswahl
Das richtige Framework spart Monate Eigenentwicklung. Die Matrix deckt Paradigma, Sprachen, Lernkurve, State, HITL, Observability, Produktionsreife, Prototypgeschwindigkeit, Azure und Szenarien ab.
| Dimension | LangGraph | CrewAI | AutoGen (Microsoft) |
|---|---|---|---|
| Paradigma | State-Machine-Graph | Rollenbasierte Crews | Konversations-Multi-Agent |
| Sprachen | Python / JS/TS | Python | Python / .NET |
| Lernkurve | Steil | Flach | Mittel |
| State | Nativ | Selbst bauen | Begrenzt |
| Human-in-the-Loop | Nativ interrupt() | Selbst bauen | Unterstützt |
| Observability | LangSmith | Begrenzt | Azure Monitor |
| Produktionsreife | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| Schneller Prototyp | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| Azure | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
| Ideal für | Komplexe Stateful Workflows | Rollenbasierte Content-Pipelines | Dialog-Kollaboration und Debatte |
Auswahlempfehlung:
LangGraph:Produktionszuverlässigkeit (Finanz, Gesundheit, Compliance), State-Persistenz, feines HITL, präzise Verzweigungen.
CrewAI:Idee in 1–2 Tagen validieren; Team denkt in Rollen; Content und Research.
AutoGen:Microsoft/Azure; mehrstufige Debatte; Forschung und Dialog-Experimente.
Orchestrierungstopologie prägt die Leistung stärker als das Modell — zuerst Muster und Framework, dann Modell.
Sechs Orchestrierungsmuster und MCP+A2A-Doppelprotokoll
Diese sechs Muster decken über 95 % der Produktionsszenarien ab. Muster 1 — Sequentielle Pipeline: Ausgabe von Agent A wird Eingabe von B; strikt linear; ideal für Content, Code-Review, feste Flows.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
query: str
retrieved_docs: str
analysis: str
final_report: str
def retrieval_agent(state: PipelineState):
docs = search_knowledge_base(state["query"])
return {"retrieved_docs": docs}
def analysis_agent(state: PipelineState):
result = llm.invoke(f"Analyze the following: {state['retrieved_docs']}")
return {"analysis": result.content}
def writer_agent(state: PipelineState):
report = llm.invoke(f"Write report from analysis: {state['analysis']}")
return {"final_report": report.content}
builder = StateGraph(PipelineState)
builder.add_node("retriever", retrieval_agent)
builder.add_node("analyzer", analysis_agent)
builder.add_node("writer", writer_agent)
builder.add_edge(START, "retriever")
builder.add_edge("retriever", "analyzer")
builder.add_edge("analyzer", "writer")
builder.add_edge("writer", END)
pipeline = builder.compile()
Muster 2 — Paralleles Fan-out/Fan-in: Unabhängige Subtasks parallel; Latenz = max(T1…Tn). LangGraph Send API mit Annotated[list, operator.add] aggregiert Ergebnisse.
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
research_results: Annotated[list, operator.add]
final_synthesis: str
def supervisor(state: ResearchState):
subtasks = [
{"query": state["query"], "source": "academic"},
{"query": state["query"], "source": "industry"},
{"query": state["query"], "source": "news"},
]
return [Send("research_worker", task) for task in subtasks]
def research_worker(state: dict):
result = search_by_source(state["query"], state["source"])
return {"research_results": [result]}
def synthesizer(state: ResearchState):
combined = "\n".join(state["research_results"])
synthesis = llm.invoke(f"Synthesize research results: {combined}")
return {"final_synthesis": synthesis.content}
builder = StateGraph(ResearchState)
builder.add_node("research_worker", research_worker)
builder.add_node("synthesizer", synthesizer)
builder.add_conditional_edges(START, supervisor, ["research_worker"])
builder.add_edge("research_worker", "synthesizer")
builder.add_edge("synthesizer", END)
graph = builder.compile()
Muster 3 — Hierarchischer Supervisor-Worker: Supervisor routet Intent; Worker führt Spezialaufgaben aus. Empfohlen: Keyword-Fast-Path + LLM-Routing.
KEYWORD_ROUTING = {
"code": "code_agent", "code": "code_agent",
"search": "search_agent", "query": "search_agent",
"data": "data_agent",
}
def supervisor_with_fast_path(state):
query = state["query"].lower()
for keyword, agent_name in KEYWORD_ROUTING.items():
if keyword in query:
return {"next": agent_name}
routing_prompt = f"""
User request: {state['query']}
Available agents: code_agent, search_agent, data_agent
Return best agent name only.
"""
decision = llm.invoke(routing_prompt)
return {"next": decision.content.strip()}
Muster 4 — Swarm: Peer-to-Peer ohne Zentralorchestrator; Stopp per Terminierungsregeln. In Produktion nur mit hartem Round-Limit. AutoGen GroupChat + max_round:
import autogen
reviewer_1 = autogen.AssistantAgent(
name="SecurityReviewer",
system_message="Security expert focused on vulnerabilities."
)
reviewer_2 = autogen.AssistantAgent(
name="PerformanceReviewer",
system_message="Performance expert focused on efficiency."
)
human_proxy = autogen.UserProxyAgent(
name="CodeAuthor",
human_input_mode="NEVER",
max_consecutive_auto_reply=2,
is_termination_msg=lambda x: "APPROVED" in x.get("content", "")
)
groupchat = autogen.GroupChat(
agents=[human_proxy, reviewer_1, reviewer_2],
messages=[],
max_round=6
)
manager = autogen.GroupChatManager(groupchat=groupchat)
Muster 5 — Blackboard: Gemeinsamer strukturierter Workspace; stundenlange Async-Jobs. Muster 6 — Hybrid: Intent-Router + Supervisor + Fan-out + QA-Pipeline — typisch für Enterprise-Content.
MCP + A2A (Linux Foundation Agentic AI Foundation): MCP vertikal — Agent ↔ Tools/DB/API; A2A horizontal — Delegation und Discovery per Agent Card.
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("data-agent-mcp")
@app.list_tools()
async def list_tools():
return [
Tool(
name="query_customer_db",
description="Query customer DB by id, name, or email",
inputSchema={
"type": "object",
"properties": {
"field": {"type": "string", "enum": ["id", "name", "email"]},
"value": {"type": "string"}
},
"required": ["field", "value"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_customer_db":
result = db.query(arguments["field"], arguments["value"])
return [TextContent(type="text", text=str(result))]
A2A Agent Card unter /.well-known/agent.json; Orchestrator delegiert per JSON-RPC 2.0:
import httpx
async def discover_and_delegate(agent_url: str, task: str):
card_response = await httpx.get(f"{agent_url}/.well-known/agent.json")
agent_card = card_response.json()
available_skills = [s["id"] for s in agent_card["skills"]]
if "web_research" not in available_skills:
raise ValueError(f"Agent {agent_card['name']} lacks web_research skill")
payload = {
"jsonrpc": "2.0",
"method": "message/send",
"id": "task-001",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": task}]
}
}
}
response = await httpx.post(agent_card["url"], json=payload)
return response.json()
Weiterlesen: MCP Server von Grund auf; MCP als HTTP der KI-Ära.
Sechs-Schritte-Runbook für Produktions-Multi-Agent-Systeme
Auswahl und Topologie:Entscheidungsbaum für Pipeline/Fan-out/Hierarchie/Blackboard/Hybrid; 3–8 Agenten, kein Over-Engineering.
State-Persistenz:PostgresSaver für Checkpoints, Recovery und thread_id pro Session. DSGVO: Speicherort und Löschfristen für Checkpoint-Daten klären.
Human-in-the-Loop:Hochrisiko-Aktionen per interrupt() pausieren — Pflicht in regulierten Branchen und unter EU AI Act.
Circuit Breaker:Externe Agent-Aufrufe mit CircuitBreaker kapseln; OPEN bei Schwellwert.
Token-Budget:TokenBudgetManager vor jedem Aufruf; Kostenkontrolle.
Observability:OpenTelemetry traced_agent_call mit correlation_id; MONITORING_METRICS; LLM-as-Judge-Stichproben.
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import interrupt
with PostgresSaver.from_conn_string("postgresql://user:pass@localhost/agentdb") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-session-12345"}}
result = graph.invoke({"query": "Analyze Q2 report"}, config)
def high_risk_action_agent(state):
proposed_action = plan_action(state)
human_decision = interrupt({
"proposed_action": proposed_action,
"risk_level": "HIGH",
"message": "This modifies production DB. Confirm?"
})
if human_decision["approved"]:
return execute_action(proposed_action)
return {"status": "cancelled", "reason": human_decision.get("reason")}
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "CLOSED"
self.last_failure_time = None
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker OPEN - Agent temporarily unavailable")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
return wrapper
class TokenBudgetManager:
def __init__(self, total_budget: int = 100_000):
self.total_budget = total_budget
self.used_tokens = 0
self.agent_usage = {}
def check_budget(self, agent_name: str, estimated_tokens: int) -> bool:
remaining = self.total_budget - self.used_tokens
if estimated_tokens > remaining:
raise BudgetExceededException(
f"Agent {agent_name} requests {estimated_tokens} tokens, "
f"but only {remaining} tokens remain in budget"
)
return True
def record_usage(self, agent_name: str, actual_tokens: int):
self.used_tokens += actual_tokens
self.agent_usage[agent_name] = self.agent_usage.get(agent_name, 0) + actual_tokens
Observability-Daten, fünf Fallstricke und Trends 2026
MAST analysierte 1642 Multi-Agent-Traces — Fehlerverteilung:
| Fehlertyp | Anteil | Beschreibung |
|---|---|---|
| Systemdesign | 41.77% | Wiederholte Schritte, falsche Tools, Kontextoverflow, fehlende Terminierung |
| Agent-Misalignment | 36.94% | Kontextverlust bei Handoffs; Halluzinationen werden zur Wahrheit |
| Validierung | 21.30% | Vorzeitiger Abbruch, unvollständige Prüfung |
57 % vs 8 %:57 % der Organisationen betreiben Agenten in Produktion, nur 8 % haben LLM-Observability — HTTP 200 bei falschen Outputs.
Google Bake-Off 6×: Verteilte Multi-Agent-Architektur verkürzte die Verarbeitungszeit von einer Stunde auf zehn Minuten; Sub-Agenten sind unabhängig upgradebar.
AdaptOrch 12–23 %:Richtige Topologie bringt 12–23 % auf SWE-bench — besser als Modelltausch.
from opentelemetry import trace
import uuid
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_call(agent_name: str, task: dict, correlation_id: str = None):
if not correlation_id:
correlation_id = str(uuid.uuid4())
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("correlation.id", correlation_id)
span.set_attribute("task.type", task.get("type", "unknown"))
try:
result = agent_registry[agent_name].run(task)
span.set_attribute("agent.tokens_used", result.get("tokens", 0))
span.set_attribute("agent.status", "success")
return result
except Exception as e:
span.set_attribute("agent.status", "error")
span.set_attribute("error.message", str(e))
raise
MONITORING_METRICS = {
"task_success_rate": "task_success_rate target >85%",
"e2e_latency_p95": "e2e_latency_p95 target <30s",
"total_cost_per_task": "avg token cost per task",
"agent_error_rate": "agent_error_rate target <5%",
"agent_retry_count": "retry count high needs investigation",
"tool_call_budget_usage": "tool call budget ratio",
"output_quality_score": "output quality score",
"goal_alignment_score": "goal alignment score",
"hallucination_rate": "hallucination rate",
}
def evaluate_agent_output(original_task: str, agent_output: str) -> dict:
evaluation_prompt = f"""
Original task: {original_task}
Agent output: {agent_output}
Rate completeness, accuracy, relevance, and hallucination (1-5 each).
Return JSON only: {{"completeness": x, "accuracy": x, "relevance": x,
"hallucination_detected": true/false, "comments": "..."}}
"""
evaluation = llm.invoke(evaluation_prompt)
return json.loads(evaluation.content)
Fünf Fallstricke und Gegenmaßnahmen:
Kontextverschmutzung:Halluzination von A an B/C. Fix: validate_agent_output mit Schema und Confidence an jedem Handoff.
Endlosschleifen:Retries/Tool-Calls explodieren. Fix: harte Caps für Iterationen, Tools, Tokens.
Over-Engineering:Zwei Schritte in acht Agenten splitten. Fix: mit Pipeline starten; 3–8 Agenten optimal.
Demo-Kluft:Demo glänzt, Edge-Inputs scheitern. Fix: ProductionGuardrails für Länge, Injection, PII — DSGVO-relevant.
Parallel-Sync:Supervisor startet vor langsamen Branches neu. Fix: LangGraph defer=True.
def validate_agent_output(output: dict, schema: dict) -> bool:
jsonschema.validate(output, schema)
if output.get("confidence_score", 1.0) < 0.7:
raise LowConfidenceError(f"Agent output confidence too low: {output['confidence_score']}")
required_fields = schema.get("required", [])
missing = [f for f in required_fields if not output.get(f)]
if missing:
raise MissingFieldsError(f"Missing required output fields: {missing}")
return True
MAX_ITERATIONS = 10
MAX_TOOL_CALLS_PER_AGENT = 20
MAX_TOTAL_TOKENS = 50_000
class ProductionGuardrails:
def validate_input(self, user_input: str) -> str:
if len(user_input) > 10000:
raise InputTooLongError("Input exceeds 10000 chars")
injection_patterns = ["ignore previous instructions", "forget everything"]
for pattern in injection_patterns:
if pattern.lower() in user_input.lower():
raise PromptInjectionError("Prompt injection detected")
return user_input.strip()
def validate_output(self, output: str) -> str:
output = self.pii_filter.redact(output)
if self.content_classifier.is_harmful(output):
raise HarmfulContentError("Harmful content in output")
return output
builder.add_node("supervisor", supervisor_node, defer=True)
Hat die Aufgabe klare lineare Abhängigkeiten?
├─ Ja → Können Subtasks parallel laufen?
│ ├─ Nein → [Sequentielle Pipeline]
│ └─ Ja → [Fan-out + Pipeline Hybrid]
└─ Nein → Gibt es einen Agent mit Entscheidungsautorität?
├─ Ja → Braucht es Sub-Teams?
│ ├─ Nein → [Supervisor-Worker]
│ └─ Ja → [Hierarchisch]
└─ Nein → Lang laufende Async-Aufgabe?
├─ Ja → [Blackboard]
└─ Nein → ≤ 5 Agenten?
├─ Ja → [Swarm mit Terminierung]
└─ Nein → [Hierarchisch splitten]
Fünf Kernpunkte:① Topologie > Modell; ② mit Pipeline starten; ③ MCP+A2A Standard; ④ Observability Pflicht (57 % vs 8 %); ⑤ 3–8 Agenten optimal.
Trends 2026:Föderierte Orchestrierung, multimodale Agenten, adaptive Topologie (AdaptOrch), EU AI Act und DSGVO verlangen vollständige Audit-Trails.
Hinweis: Multi-Agent-Orchestrierung auf dem Laptop scheitert an Sleep-Disconnects, RAM-Engpässen und Netzwerk-Jitter; günstige Linux-VPS laufen weder Xcode noch Apple-Silicon-optimierte Inferenz nativ. Multi-Agent-Architektur ist 2026 Kernkompetenz — Produktion braucht einen stabilen Host.
Für 7×24-Orchestrierung, PostgresSaver, MCP Server und Cursor/Claude Code ist MESHLAUNCH Mac Mini Cloud-Miete oft optimal: dediziertes Apple Silicon, flexible Abrechnung, portierbare Team-Assets. Siehe Mietpreise und Hilfezentrum.
State, HITL, Compliance → LangGraph; 1–2-Tage-Prototyp → CrewAI; Azure-Debatte → AutoGen. Hosting: Mietpreise-Seite.
MCP vertikal — Tools/DB/API einmal schreiben. A2A horizontal — Agent Cards und JSON-RPC 2.0. Beide unter Agentic AI Foundation; Greenfield: Doppelstack.
PostgreSQL (PostgresSaver), OpenTelemetry, Token-Budget, Circuit Breaker, 7×24-Host. Mac Mini Bare Metal für LangGraph, MCP und Vektorsuche. DSGVO bei Cloud-Traces. Deployment: Hilfezentrum.