概述
多智能体系统(Multi-Agent Systems, MAS)通过多个智能体的协作来解决复杂问题。与单一智能体相比,多智能体系统能够:
- 分工协作:不同智能体专注于不同子领域
- 并行处理:同时处理多个子任务提高效率
- 知识互补:结合不同智能体的专业知识
- 涌现智能:通过交互产生超越个体的能力
协作模式分类
1. 层级协作 (Hierarchical)
特点:
- Supervisor-Agent负责全局规划和协调
- Worker-Agents负责执行具体任务
- 信息从上向下流动,汇报从下向上反馈
┌─────────────────────────────────────────────────────┐
│ SUPERVISOR │
│ (任务分解 + 结果聚合) │
└──────────────────────────┬──────────────────────────┘
│
┌─────────────────┼─────────────────┐
↓ ↓ ↓
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Writer │ │ Researcher│ │ Reviewer │
│ Agent │ │ Agent │ │ Agent │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└────────────────┼────────────────┘
↓
┌───────────────────────┐
│ 结果聚合 + 最终输出 │
└───────────────────────┘
2. 平等协作 (Peer-to-Peer)
特点:
- 所有智能体地位平等
- 通过协商达成共识
- 适合需要多视角评估的任务
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ ←──→ │ Agent B │ ←──→ │ Agent C │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└─────────────────┼─────────────────┘
↓
┌─────────────────────┐
│ 共享知识库/上下文 │
└─────────────────────┘
3. 竞争协作 (Competitive)
特点:
- 多个智能体竞争同一目标
- 通过评估选择最优结果
- 适合创意生成、方案评估等场景
┌─────────┐
│ │ ←── 方案A
│ │
│ Judge │ ←── 方案B (竞争)
│ Agent │
│ │ ←── 方案C
│ │
└────┬────┘
│ 最终选择
↓
最佳方案
4. 管道协作 (Pipeline)
特点:
- 智能体形成处理管道
- 每个智能体处理特定阶段
- 输出直接传递给下一阶段
输入 → Agent1 → Agent2 → Agent3 → ... → AgentN → 输出
(解析) (处理) (验证) (格式化)
通信协议设计
消息类型
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
class MessageType(Enum):
REQUEST = "request" # 请求
RESPONSE = "response" # 响应
QUERY = "query" # 查询
BROADCAST = "broadcast" # 广播
PROPOSE = "propose" # 提议
ACCEPT = "accept" # 接受
REJECT = "reject" # 拒绝
NEGOTIATE = "negotiate" # 协商
UPDATE = "update" # 状态更新
@dataclass
class AgentMessage:
id: str
sender: str
receivers: list[str] # 空列表表示广播
type: MessageType
content: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
conversation_id: str = ""
reply_to: Optional[str] = None # 关联的消息ID
def to_dict(self) -> Dict:
return {
"id": self.id,
"sender": self.sender,
"receivers": self.receivers if self.receivers else ["broadcast"],
"type": self.type.value,
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"conversation_id": self.conversation_id,
"reply_to": self.reply_to
}消息传递系统
import asyncio
from typing import Callable, Dict, List
from collections import defaultdict
class MessageBus:
"""智能体间消息总线"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = defaultdict(list)
self.message_queue: asyncio.Queue = asyncio.Queue()
self.message_history: List[AgentMessage] = []
def subscribe(self, agent_id: str, callback: Callable):
"""订阅消息"""
self.subscribers[agent_id].append(callback)
def unsubscribe(self, agent_id: str, callback: Callable):
"""取消订阅"""
if agent_id in self.subscribers:
self.subscribers[agent_id].remove(callback)
async def publish(self, message: AgentMessage):
"""发布消息"""
self.message_history.append(message)
# 发送给特定接收者
if message.receivers:
for receiver in message.receivers:
if receiver in self.subscribers:
for callback in self.subscribers[receiver]:
await callback(message)
# 广播
if not message.receivers:
for agent_id, callbacks in self.subscribers.items():
if agent_id != message.sender: # 不发送给自己
for callback in callbacks:
await callback(message)
async def send_message(
self,
sender: str,
receivers: List[str],
msg_type: MessageType,
content: Dict[str, Any]
) -> AgentMessage:
"""发送消息"""
message = AgentMessage(
id=self.generate_id(),
sender=sender,
receivers=receivers,
type=msg_type,
content=content,
conversation_id=f"{sender}_{receivers[0]}" if receivers else "broadcast"
)
await self.publish(message)
return message
def generate_id(self) -> str:
return f"msg_{len(self.message_history)}_{datetime.now().timestamp()}"
def get_conversation(self, conversation_id: str) -> List[AgentMessage]:
"""获取对话历史"""
return [m for m in self.message_history
if m.conversation_id == conversation_id]Agent间通信实现
class BaseAgent:
"""智能体基类"""
def __init__(self, agent_id: str, message_bus: MessageBus):
self.agent_id = agent_id
self.bus = message_bus
self.pending_messages: asyncio.Queue = asyncio.Queue()
self.message_bus.subscribe(self.agent_id, self.receive_message)
async def receive_message(self, message: AgentMessage):
"""接收消息"""
await self.pending_messages.put(message)
async def send(
self,
receivers: List[str],
msg_type: MessageType,
content: Dict
) -> AgentMessage:
"""发送消息"""
return await self.bus.send_message(
self.agent_id, receivers, msg_type, content
)
async def broadcast(self, msg_type: MessageType, content: Dict):
"""广播消息"""
return await self.bus.send_message(
self.agent_id, [], msg_type, content
)
async def process_messages(self):
"""消息处理循环"""
while True:
message = await self.pending_messages.get()
await self.handle_message(message)任务分解与分配
任务分解策略
@dataclass
class Task:
id: str
description: str
requirements: List[str] = field(default_factory=list)
priority: int = 0
estimated_complexity: float = 1.0
class TaskDecomposer:
"""任务分解器"""
def __init__(self, llm):
self.llm = llm
async def decompose(self, task: str, num_agents: int = 3) -> List[Task]:
"""将复杂任务分解为子任务"""
prompt = f"""将以下任务分解为 {num_agents} 个可并行执行的子任务:
任务: {task}
要求:
1. 每个子任务应该可以独立执行
2. 子任务之间应有明确的接口定义
3. 考虑任务的依赖关系
以JSON格式输出:
{{"tasks": [
{{"id": "task_1", "description": "...", "requirements": [], "priority": 1}},
...
]}}
"""
response = await self.llm.generate(prompt, format="json")
tasks_data = json.loads(response)["tasks"]
return [Task(**t) for t in tasks_data]
class TaskAllocator:
"""任务分配器"""
def __init__(self, agents: Dict[str, BaseAgent], capability_registry: Dict):
self.agents = agents
self.capabilities = capability_registry # agent_id -> [capabilities]
def allocate(self, task: Task) -> str:
"""将任务分配给最合适的智能体"""
best_agent = None
best_score = -1
for agent_id, capabilities in self.capabilities.items():
score = self.calculate_match_score(task, capabilities)
if score > best_score:
best_score = score
best_agent = agent_id
return best_agent
def calculate_match_score(self, task: Task, capabilities: List[str]) -> float:
"""计算匹配分数"""
if not task.requirements:
return 1.0 # 无特殊要求,均匀分配
match_count = sum(1 for req in task.requirements if req in capabilities)
return match_count / len(task.requirements)负载均衡
class LoadBalancer:
"""智能体负载均衡"""
def __init__(self):
self.agent_loads: Dict[str, int] = defaultdict(int)
self.max_load = 10
def allocate(self, tasks: List[Task], agents: List[str]) -> Dict[str, List[Task]]:
"""分配任务,实现负载均衡"""
allocation = {agent_id: [] for agent_id in agents}
# 按优先级排序
sorted_tasks = sorted(tasks, key=lambda t: -t.priority)
for task in sorted_tasks:
# 选择负载最轻的智能体
min_load_agent = min(
agents,
key=lambda a: self.agent_loads[a]
)
if self.agent_loads[min_load_agent] < self.max_load:
allocation[min_load_agent].append(task)
self.agent_loads[min_load_agent] += 1
return allocation
def update_load(self, agent_id: str, delta: int):
"""更新负载"""
self.agent_loads[agent_id] += delta协调机制
1. 共识达成
class ConsensusProtocol:
"""共识协议"""
async def reach_consensus(
self,
agents: List[str],
topic: str,
options: List[str]
) -> str:
"""多轮投票达成共识"""
for round_num in range(3): # 最多3轮
# 第一轮:各智能体独立提案
proposals = {}
for agent_id in agents:
proposal = await self.query_agent(agent_id, topic, options)
proposals[agent_id] = proposal
# 统计票数
votes = Counter(proposals.values())
# 检查是否达成共识
if len(votes) == 1:
return list(votes.keys())[0]
# 多数票
majority = votes.most_common(1)[0][0]
majority_ratio = votes[majority] / len(agents)
if majority_ratio >= 0.6: # 60%阈值
return majority
# 否则进入下一轮,提供多数票信息
options = list(votes.keys())
# 最终让步给多数票
return votes.most_common(1)[0][0]
async def query_agent(self, agent_id: str, topic: str, options: List[str]) -> str:
"""查询智能体意见"""
# 实现具体的查询逻辑
pass2. 冲突解决
class ConflictResolver:
"""冲突解决器"""
async def resolve(
self,
conflicts: List[Dict],
context: Dict
) -> Dict:
"""解决多智能体间的冲突"""
for conflict in conflicts:
conflict_type = conflict["type"]
if conflict_type == "resource":
result = await self.resolve_resource_conflict(conflict)
elif conflict_type == "opinion":
result = await self.resolve_opinion_conflict(conflict, context)
elif conflict_type == "execution":
result = await self.resolve_execution_conflict(conflict)
conflict["resolution"] = result
return conflicts
async def resolve_resource_conflict(self, conflict: Dict) -> Dict:
"""解决资源冲突"""
agents = conflict["agents"]
resource = conflict["resource"]
# 基于优先级分配
priorities = [(a["priority"], a["id"]) for a in agents]
winner = max(priorities)[1]
return {
"strategy": "priority_based",
"winner": winner,
"others": [a["id"] for a in agents if a["id"] != winner]
}
async def resolve_opinion_conflict(self, conflict: Dict, context: Dict) -> Dict:
"""解决观点冲突"""
options = conflict["options"]
# 使用仲裁智能体或投票
prompt = f"""以下智能体对同一问题有不同观点:
{chr(10).join([f"- {opt}" for opt in options])}
上下文: {context}
请分析各观点的优缺点,选择或综合出最佳方案。"""
resolution = await self.llm.generate(prompt)
return {"strategy": "arbitration", "resolution": resolution}3. 状态同步
class StateSynchronizer:
"""状态同步器"""
def __init__(self, message_bus: MessageBus):
self.bus = message_bus
self.global_state: Dict[str, Any] = {}
self.agent_states: Dict[str, Dict] = {}
async def sync_state(self, agent_id: str, local_state: Dict):
"""同步智能体状态"""
self.agent_states[agent_id] = local_state
# 广播状态更新
await self.bus.broadcast(
MessageType.UPDATE,
{
"agent_id": agent_id,
"state": local_state,
"timestamp": datetime.now().isoformat()
}
)
def get_global_view(self) -> Dict:
"""获取全局视图"""
return {
"agents": self.agent_states,
"shared": self.global_state,
"last_update": datetime.now().isoformat()
}多智能体框架实现
Coordinator Agent
class CoordinatorAgent(BaseAgent):
"""协调器智能体"""
def __init__(
self,
agent_id: str,
message_bus: MessageBus,
llm,
workers: List[BaseAgent]
):
super().__init__(agent_id, message_bus)
self.llm = llm
self.workers = {w.agent_id: w for w in workers}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: Dict[str, Any] = {}
async def handle_message(self, message: AgentMessage):
"""处理收到的消息"""
if message.type == MessageType.REQUEST:
task = message.content["task"]
await self.coordinate_task(task)
elif message.type == MessageType.RESPONSE:
self.results[message.sender] = message.content
await self.check_completion()
async def coordinate_task(self, task: str):
"""协调任务执行"""
# 分解任务
subtasks = await self.decompose(task)
# 分配任务
allocations = self.allocate_tasks(subtasks)
# 发送任务给workers
for worker_id, task in allocations.items():
await self.send(
[worker_id],
MessageType.REQUEST,
{"task": task, "task_id": f"{self.agent_id}_{worker_id}"}
)
async def check_completion(self):
"""检查是否所有任务完成"""
if len(self.results) == len(self.workers):
# 聚合结果
final_result = await self.aggregate_results(self.results)
# 返回最终结果
await self.bus.publish(AgentMessage(
id=self.bus.generate_id(),
sender=self.agent_id,
receivers=[],
type=MessageType.RESPONSE,
content={"result": final_result}
))
async def aggregate_results(self, results: Dict[str, Any]) -> str:
"""聚合多个智能体的结果"""
prompt = f"""将以下智能体的结果聚合成最终输出:
{chr(10).join([f"智能体 {aid}: {r}" for aid, r in results.items()])}
请提供连贯、完整的最终结果。"""
return await self.llm.generate(prompt)Worker Agent
class WorkerAgent(BaseAgent):
"""工作智能体"""
def __init__(self, agent_id: str, message_bus: MessageBus, llm, capabilities: List[str]):
super().__init__(agent_id, message_bus)
self.llm = llm
self.capabilities = capabilities
async def handle_message(self, message: AgentMessage):
"""处理任务请求"""
if message.type == MessageType.REQUEST:
task = message.content["task"]
result = await self.execute_task(task)
await self.send(
[message.sender],
MessageType.RESPONSE,
{"result": result, "task_id": message.content.get("task_id")}
)
async def execute_task(self, task: str) -> str:
"""执行具体任务"""
prompt = f"执行以下任务:{task}"
return await self.llm.generate(prompt)多智能体系统启动
class MultiAgentSystem:
"""多智能体系统"""
def __init__(self, llm):
self.llm = llm
self.message_bus = MessageBus()
self.agents: Dict[str, BaseAgent] = {}
def add_agent(self, agent: BaseAgent):
"""添加智能体"""
self.agents[agent.agent_id] = agent
async def setup_team(
self,
team_config: Dict
) -> CoordinatorAgent:
"""设置团队"""
# 创建workers
workers = []
for worker_config in team_config["workers"]:
worker = WorkerAgent(
agent_id=worker_config["id"],
message_bus=self.message_bus,
llm=self.llm,
capabilities=worker_config["capabilities"]
)
self.add_agent(worker)
workers.append(worker)
# 创建协调器
coordinator = CoordinatorAgent(
agent_id="coordinator",
message_bus=self.message_bus,
llm=self.llm,
workers=workers
)
self.add_agent(coordinator)
return coordinator
async def run_task(self, task: str) -> str:
"""运行任务"""
coordinator = self.agents["coordinator"]
await coordinator.send(
["coordinator"],
MessageType.REQUEST,
{"task": task}
)
# 等待结果
result = await coordinator.task_queue.get()
return result实际应用示例
1. 软件开发团队
class SoftwareDevelopmentTeam:
"""软件开发多智能体团队"""
def __init__(self, llm):
self.agents = {
"architect": ArchitectAgent("architect", llm),
"coder": CoderAgent("coder", llm),
"reviewer": ReviewerAgent("reviewer", llm),
"tester": TesterAgent("tester", llm)
}
self.coordinator = Coordinator(
list(self.agents.values()),
llm
)
async def develop_feature(self, feature_spec: str) -> Dict:
"""开发功能"""
return await self.coordinator.execute(
f"开发功能: {feature_spec}"
)2. 研究团队
class ResearchTeam:
"""研究多智能体团队"""
def __init__(self, llm):
self.agents = {
"literature_searcher": SearcherAgent(llm),
"analyst": AnalystAgent(llm),
"writer": WriterAgent(llm),
"critic": CriticAgent(llm)
}
async def conduct_research(self, topic: str) -> str:
"""进行研究"""
# 并行搜索和分析
search_task = self.agents["literature_searcher"].search(topic)
analysis_task = self.agents["analyst"].analyze(topic)
search_results, analysis = await asyncio.gather(
search_task, analysis_task
)
# 综合写作
draft = await self.agents["writer"].write(
topic, search_results, analysis
)
# 批评改进
feedback = await self.agents["critic"].critique(draft)
return self.agents["writer"].revise(draft, feedback)