多 Agent 协同体系优化:消息路由与状态一致性探究 多 Agent 协同体系优化消息路由与状态一致性探究多 Agent 协同框架的工程化挑战从“各自为政”到“统一指挥”前言本文我们团队做了个大模型客服系统用了三个 Agent订单 Agent、物流 Agent、售后 Agent。结果用户问一句“本文的订单怎么了”三个 Agent 各答各的。这就是多 Agent 协同的最大问题没有统一指挥。今天来聊聊多 Agent 协同的工程化挑战和解决方案。一、 底层原理1.1 多 Agent 协同的核心问题多 Agent 系统最大的问题是信息孤岛和决策冲突graph TD A[用户提问] -- B[路由模块] B -- C[订单 Agent] B -- D[物流 Agent] B -- E[售后 Agent] C -- F[各自回答] D -- F E -- F F -- G{答案一致} G --|不一致| H[用户困惑] G --|一致| I[满意] J[统一协调] -- K[共享上下文] J -- L[统一决策] J -- M[冲突解决]核心挑战Agent 间信息不互通决策可能冲突缺乏统一调度错误蔓延1.2 协同模式对比模式一致性效率复杂度独立运行低高低中心协调高中中级联传递中低高共享黑板高高高二、 快速上手2.1 基础协调器from typing import Dict, Any, List from dataclasses import dataclass dataclass class AgentResponse: agent: str answer: str confidence: float class Coordinator: def __init__(self): self.agents {} self.shared_context {} def register(self, name: str, handler, capabilities: List[str]): self.agents[name] { handler: handler, capabilities: capabilities } def process(self, query: str) - str: best_agent None best_score 0 for name, info in self.agents.items(): score self._match(query, info[capabilities]) if score best_score: best_score score best_agent name if best_agent: response self.agents[best_agent][handler](query, self.shared_context) self.shared_context[last_query] query return response[answer] return 无法处理 def _match(self, query: str, capabilities: List[str]) - float: return sum(1 for c in capabilities if c in query)三、 核心 API / 深水区3.1 协同组件速查组件职责实现要点路由器分发请求意图匹配协调器统一调度优先级排序上下文管理器共享信息全局/局部冲突解决器处理分歧投票/优先级3.2 共享上下文管理class SharedContext: def __init__(self): self.store {} self.history [] def update(self, key: str, value: Any, agent: str): self.store[key] { value: value, agent: agent, timestamp: time.time() } self.history.append((key, value, agent)) def get(self, key: str) - Any: item self.store.get(key) return item[value] if item else None def get_recent(self, n: int 5) - str: return \n.join( f{a} 更新 {k}: {v} for k, v, a in self.history[-n:] )3.3 冲突解决class ConflictResolver: def resolve(self, responses: List[AgentResponse]) - str: if not responses: return 无法获取答案 if len(responses) 1: return responses[0].answer # 按置信度排序 sorted_responses sorted( responses, keylambda r: r.confidence, reverseTrue ) return sorted_responses[0].answer四、 实战演练4.1 完整的多 Agent 协同系统from typing import Dict, Any, List from dataclasses import dataclass import time dataclass class AgentDef: name: str capabilities: List[str] handler: callable class MultiAgentSystem: def __init__(self): self.agents: Dict[str, AgentDef] {} self.context {} self.message_log [] def register(self, agent: AgentDef): self.agents[agent.name] agent def handle(self, query: str) - str: # 1. 路由 candidates self._route(query) if not candidates: return 暂无匹配的 Agent # 2. 执行 responses [] for agent_name in candidates[:2]: agent self.agents[agent_name] result agent.handler(query, self.context) responses.append(AgentResponse( agentagent_name, answerresult.get(answer, ), confidenceresult.get(confidence, 0.5) )) # 3. 解决冲突 final_answer self._resolve(responses) # 4. 更新上下文 self.context[last_query] query self.context[last_answer] final_answer self.message_log.append({query: query, answer: final_answer}) return final_answer def _route(self, query: str) - List[str]: scored [] for name, agent in self.agents.items(): score sum(1 for c in agent.capabilities if c in query) if score 0: scored.append((score, name)) scored.sort(reverseTrue) return [name for _, name in scored] def _resolve(self, responses: List[AgentResponse]) - str: if len(responses) 1: return responses[0].answer sorted_resp sorted(responses, keylambda r: r.confidence, reverseTrue) return sorted_resp[0].answer # Agent 定义 def order_handler(query, ctx): return {answer: f[订单] 已回答: {query}, confidence: 0.9} def logistics_handler(query, ctx): return {answer: f[物流] 已回答: {query}, confidence: 0.8} def after_sale_handler(query, ctx): return {answer: f[售后] 已回答: {query}, confidence: 0.7} system MultiAgentSystem() system.register(AgentDef(order, [订单], order_handler)) system.register(AgentDef(logistics, [物流], logistics_handler)) system.register(AgentDef(after_sale, [售后, 退款], after_sale_handler)) print(system.handle(本文的订单怎么了)) print(system.handle(物流到哪了)) print(system.handle(本文要退款))五、 避坑指南与最佳实践 **技巧共享上下文用字典简单高效不要用复杂数据结构。⚠️ **警告Agent 超时要有兜底一个 Agent 卡住不能拖垮整个系统。✅ **推荐关键信息强制同步用户 ID、订单号等所有 Agent 必须知道。六、 综合实战演示6.1 生产级多 Agent 协同框架from typing import Dict, Any, List, Optional from dataclasses import dataclass import json import time import threading dataclass class AgentInfo: name: str capabilities: List[str] handler: callable timeout: int 30 class ProductionCoordinator: def __init__(self): self.agents: Dict[str, AgentInfo] {} self.context_lock threading.Lock() self.shared { user_info: {}, session_data: {}, last_agent: None } def register(self, info: AgentInfo): self.agents[info.name] info def handle(self, user_id: str, query: str) - str: with self.context_lock: self.shared[last_query] query self.shared[user_info][id] user_id candidates self._rank_agents(query) if not candidates: return 暂无匹配服务 responses [] for name in candidates[:2]: agent self.agents[name] try: with self.context_lock: ctx_copy dict(self.shared) result agent.handler(query, ctx_copy) responses.append(result) with self.context_lock: self.shared[last_agent] name except Exception as e: responses.append({answer: f{name} 暂时不可用, confidence: 0}) return self._final_answer(responses) def _rank_agents(self, query: str) - List[str]: scored [] for name, info in self.agents.items(): score sum(1 for c in info.capabilities if c in query) if score 0: scored.append((score, name)) scored.sort(reverseTrue) return [name for _, name in scored] def _final_answer(self, responses: List[Dict]) - str: best max(responses, keylambda r: r.get(confidence, 0)) return best.get(answer, 处理失败) coord ProductionCoordinator() coord.register(AgentInfo(order, [订单], lambda q, c: {answer: 订单已查到, confidence: 0.9})) coord.register(AgentInfo(refund, [退款, 售后], lambda q, c: {answer: 退款已处理, confidence: 0.85})) print(coord.handle(user_001, 本文的订单退款了))总结多 Agent 协同的关键统一路由和调度共享上下文冲突解决机制超时和兜底