概述

多智能体系统(Multi-Agent Systems, MAS)通过多个智能体的协作来解决复杂问题。与单一智能体相比,多智能体系统能够:

  • 分工协作:不同智能体专注于不同子领域
  • 并行处理:同时处理多个子任务提高效率
  • 知识互补:结合不同智能体的专业知识
  • 涌现智能:通过交互产生超越个体的能力

协作模式分类

1. 层级协作 (Hierarchical)

特点

  • Supervisor-Agent负责全局规划和协调
  • Worker-Agents负责执行具体任务
  • 信息从上向下流动,汇报从下向上反馈
┌─────────────────────────────────────────────────────┐
│                    SUPERVISOR                        │
│              (任务分解 + 结果聚合)                     │
└──────────────────────────┬──────────────────────────┘
                           │
         ┌─────────────────┼─────────────────┐
         ↓                 ↓                 ↓
   ┌───────────┐    ┌───────────┐    ┌───────────┐
   │  Writer   │    │ Researcher│    │  Reviewer │
   │  Agent    │    │   Agent   │    │   Agent   │
   └─────┬─────┘    └─────┬─────┘    └─────┬─────┘
         │                │                │
         └────────────────┼────────────────┘
                          ↓
              ┌───────────────────────┐
              │    结果聚合 + 最终输出   │
              └───────────────────────┘

2. 平等协作 (Peer-to-Peer)

特点

  • 所有智能体地位平等
  • 通过协商达成共识
  • 适合需要多视角评估的任务
┌─────────┐      ┌─────────┐      ┌─────────┐
│ Agent A │ ←──→ │ Agent B │ ←──→ │ Agent C │
└────┬────┘      └────┬────┘      └────┬────┘
     │                 │                 │
     └─────────────────┼─────────────────┘
                      ↓
            ┌─────────────────────┐
            │    共享知识库/上下文   │
            └─────────────────────┘

3. 竞争协作 (Competitive)

特点

  • 多个智能体竞争同一目标
  • 通过评估选择最优结果
  • 适合创意生成、方案评估等场景
┌─────────┐
│         │ ←── 方案A
│         │
│  Judge  │ ←── 方案B (竞争)
│  Agent  │
│         │ ←── 方案C
│         │
└────┬────┘
     │ 最终选择
     ↓
  最佳方案

4. 管道协作 (Pipeline)

特点

  • 智能体形成处理管道
  • 每个智能体处理特定阶段
  • 输出直接传递给下一阶段
输入 → Agent1 → Agent2 → Agent3 → ... → AgentN → 输出
      (解析)   (处理)   (验证)        (格式化)

通信协议设计

消息类型

from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
 
class MessageType(Enum):
    REQUEST = "request"           # 请求
    RESPONSE = "response"         # 响应
    QUERY = "query"               # 查询
    BROADCAST = "broadcast"       # 广播
    PROPOSE = "propose"           # 提议
    ACCEPT = "accept"             # 接受
    REJECT = "reject"             # 拒绝
    NEGOTIATE = "negotiate"       # 协商
    UPDATE = "update"             # 状态更新
 
@dataclass
class AgentMessage:
    id: str
    sender: str
    receivers: list[str]  # 空列表表示广播
    type: MessageType
    content: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.now)
    conversation_id: str = ""
    reply_to: Optional[str] = None  # 关联的消息ID
    
    def to_dict(self) -> Dict:
        return {
            "id": self.id,
            "sender": self.sender,
            "receivers": self.receivers if self.receivers else ["broadcast"],
            "type": self.type.value,
            "content": self.content,
            "timestamp": self.timestamp.isoformat(),
            "conversation_id": self.conversation_id,
            "reply_to": self.reply_to
        }

消息传递系统

import asyncio
from typing import Callable, Dict, List
from collections import defaultdict
 
class MessageBus:
    """智能体间消息总线"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = defaultdict(list)
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.message_history: List[AgentMessage] = []
    
    def subscribe(self, agent_id: str, callback: Callable):
        """订阅消息"""
        self.subscribers[agent_id].append(callback)
    
    def unsubscribe(self, agent_id: str, callback: Callable):
        """取消订阅"""
        if agent_id in self.subscribers:
            self.subscribers[agent_id].remove(callback)
    
    async def publish(self, message: AgentMessage):
        """发布消息"""
        self.message_history.append(message)
        
        # 发送给特定接收者
        if message.receivers:
            for receiver in message.receivers:
                if receiver in self.subscribers:
                    for callback in self.subscribers[receiver]:
                        await callback(message)
        
        # 广播
        if not message.receivers:
            for agent_id, callbacks in self.subscribers.items():
                if agent_id != message.sender:  # 不发送给自己
                    for callback in callbacks:
                        await callback(message)
    
    async def send_message(
        self,
        sender: str,
        receivers: List[str],
        msg_type: MessageType,
        content: Dict[str, Any]
    ) -> AgentMessage:
        """发送消息"""
        message = AgentMessage(
            id=self.generate_id(),
            sender=sender,
            receivers=receivers,
            type=msg_type,
            content=content,
            conversation_id=f"{sender}_{receivers[0]}" if receivers else "broadcast"
        )
        
        await self.publish(message)
        return message
    
    def generate_id(self) -> str:
        return f"msg_{len(self.message_history)}_{datetime.now().timestamp()}"
    
    def get_conversation(self, conversation_id: str) -> List[AgentMessage]:
        """获取对话历史"""
        return [m for m in self.message_history 
                if m.conversation_id == conversation_id]

Agent间通信实现

class BaseAgent:
    """智能体基类"""
    
    def __init__(self, agent_id: str, message_bus: MessageBus):
        self.agent_id = agent_id
        self.bus = message_bus
        self.pending_messages: asyncio.Queue = asyncio.Queue()
        self.message_bus.subscribe(self.agent_id, self.receive_message)
    
    async def receive_message(self, message: AgentMessage):
        """接收消息"""
        await self.pending_messages.put(message)
    
    async def send(
        self, 
        receivers: List[str], 
        msg_type: MessageType, 
        content: Dict
    ) -> AgentMessage:
        """发送消息"""
        return await self.bus.send_message(
            self.agent_id, receivers, msg_type, content
        )
    
    async def broadcast(self, msg_type: MessageType, content: Dict):
        """广播消息"""
        return await self.bus.send_message(
            self.agent_id, [], msg_type, content
        )
    
    async def process_messages(self):
        """消息处理循环"""
        while True:
            message = await self.pending_messages.get()
            await self.handle_message(message)

任务分解与分配

任务分解策略

@dataclass
class Task:
    id: str
    description: str
    requirements: List[str] = field(default_factory=list)
    priority: int = 0
    estimated_complexity: float = 1.0
 
class TaskDecomposer:
    """任务分解器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    async def decompose(self, task: str, num_agents: int = 3) -> List[Task]:
        """将复杂任务分解为子任务"""
        prompt = f"""将以下任务分解为 {num_agents} 个可并行执行的子任务:
 
任务: {task}
 
要求:
1. 每个子任务应该可以独立执行
2. 子任务之间应有明确的接口定义
3. 考虑任务的依赖关系
 
以JSON格式输出:
{{"tasks": [
    {{"id": "task_1", "description": "...", "requirements": [], "priority": 1}},
    ...
]}}
"""
        response = await self.llm.generate(prompt, format="json")
        tasks_data = json.loads(response)["tasks"]
        return [Task(**t) for t in tasks_data]
 
class TaskAllocator:
    """任务分配器"""
    
    def __init__(self, agents: Dict[str, BaseAgent], capability_registry: Dict):
        self.agents = agents
        self.capabilities = capability_registry  # agent_id -> [capabilities]
    
    def allocate(self, task: Task) -> str:
        """将任务分配给最合适的智能体"""
        best_agent = None
        best_score = -1
        
        for agent_id, capabilities in self.capabilities.items():
            score = self.calculate_match_score(task, capabilities)
            if score > best_score:
                best_score = score
                best_agent = agent_id
        
        return best_agent
    
    def calculate_match_score(self, task: Task, capabilities: List[str]) -> float:
        """计算匹配分数"""
        if not task.requirements:
            return 1.0  # 无特殊要求,均匀分配
        
        match_count = sum(1 for req in task.requirements if req in capabilities)
        return match_count / len(task.requirements)

负载均衡

class LoadBalancer:
    """智能体负载均衡"""
    
    def __init__(self):
        self.agent_loads: Dict[str, int] = defaultdict(int)
        self.max_load = 10
    
    def allocate(self, tasks: List[Task], agents: List[str]) -> Dict[str, List[Task]]:
        """分配任务,实现负载均衡"""
        allocation = {agent_id: [] for agent_id in agents}
        
        # 按优先级排序
        sorted_tasks = sorted(tasks, key=lambda t: -t.priority)
        
        for task in sorted_tasks:
            # 选择负载最轻的智能体
            min_load_agent = min(
                agents,
                key=lambda a: self.agent_loads[a]
            )
            
            if self.agent_loads[min_load_agent] < self.max_load:
                allocation[min_load_agent].append(task)
                self.agent_loads[min_load_agent] += 1
        
        return allocation
    
    def update_load(self, agent_id: str, delta: int):
        """更新负载"""
        self.agent_loads[agent_id] += delta

协调机制

1. 共识达成

class ConsensusProtocol:
    """共识协议"""
    
    async def reach_consensus(
        self,
        agents: List[str],
        topic: str,
        options: List[str]
    ) -> str:
        """多轮投票达成共识"""
        
        for round_num in range(3):  # 最多3轮
            # 第一轮:各智能体独立提案
            proposals = {}
            for agent_id in agents:
                proposal = await self.query_agent(agent_id, topic, options)
                proposals[agent_id] = proposal
            
            # 统计票数
            votes = Counter(proposals.values())
            
            # 检查是否达成共识
            if len(votes) == 1:
                return list(votes.keys())[0]
            
            # 多数票
            majority = votes.most_common(1)[0][0]
            majority_ratio = votes[majority] / len(agents)
            
            if majority_ratio >= 0.6:  # 60%阈值
                return majority
            
            # 否则进入下一轮,提供多数票信息
            options = list(votes.keys())
        
        # 最终让步给多数票
        return votes.most_common(1)[0][0]
    
    async def query_agent(self, agent_id: str, topic: str, options: List[str]) -> str:
        """查询智能体意见"""
        # 实现具体的查询逻辑
        pass

2. 冲突解决

class ConflictResolver:
    """冲突解决器"""
    
    async def resolve(
        self,
        conflicts: List[Dict],
        context: Dict
    ) -> Dict:
        """解决多智能体间的冲突"""
        
        for conflict in conflicts:
            conflict_type = conflict["type"]
            
            if conflict_type == "resource":
                result = await self.resolve_resource_conflict(conflict)
            elif conflict_type == "opinion":
                result = await self.resolve_opinion_conflict(conflict, context)
            elif conflict_type == "execution":
                result = await self.resolve_execution_conflict(conflict)
            
            conflict["resolution"] = result
        
        return conflicts
    
    async def resolve_resource_conflict(self, conflict: Dict) -> Dict:
        """解决资源冲突"""
        agents = conflict["agents"]
        resource = conflict["resource"]
        
        # 基于优先级分配
        priorities = [(a["priority"], a["id"]) for a in agents]
        winner = max(priorities)[1]
        
        return {
            "strategy": "priority_based",
            "winner": winner,
            "others": [a["id"] for a in agents if a["id"] != winner]
        }
    
    async def resolve_opinion_conflict(self, conflict: Dict, context: Dict) -> Dict:
        """解决观点冲突"""
        options = conflict["options"]
        
        # 使用仲裁智能体或投票
        prompt = f"""以下智能体对同一问题有不同观点:
 
{chr(10).join([f"- {opt}" for opt in options])}
 
上下文: {context}
 
请分析各观点的优缺点,选择或综合出最佳方案。"""
        
        resolution = await self.llm.generate(prompt)
        return {"strategy": "arbitration", "resolution": resolution}

3. 状态同步

class StateSynchronizer:
    """状态同步器"""
    
    def __init__(self, message_bus: MessageBus):
        self.bus = message_bus
        self.global_state: Dict[str, Any] = {}
        self.agent_states: Dict[str, Dict] = {}
    
    async def sync_state(self, agent_id: str, local_state: Dict):
        """同步智能体状态"""
        self.agent_states[agent_id] = local_state
        
        # 广播状态更新
        await self.bus.broadcast(
            MessageType.UPDATE,
            {
                "agent_id": agent_id,
                "state": local_state,
                "timestamp": datetime.now().isoformat()
            }
        )
    
    def get_global_view(self) -> Dict:
        """获取全局视图"""
        return {
            "agents": self.agent_states,
            "shared": self.global_state,
            "last_update": datetime.now().isoformat()
        }

多智能体框架实现

Coordinator Agent

class CoordinatorAgent(BaseAgent):
    """协调器智能体"""
    
    def __init__(
        self, 
        agent_id: str, 
        message_bus: MessageBus,
        llm,
        workers: List[BaseAgent]
    ):
        super().__init__(agent_id, message_bus)
        self.llm = llm
        self.workers = {w.agent_id: w for w in workers}
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.results: Dict[str, Any] = {}
    
    async def handle_message(self, message: AgentMessage):
        """处理收到的消息"""
        if message.type == MessageType.REQUEST:
            task = message.content["task"]
            await self.coordinate_task(task)
        
        elif message.type == MessageType.RESPONSE:
            self.results[message.sender] = message.content
            await self.check_completion()
    
    async def coordinate_task(self, task: str):
        """协调任务执行"""
        # 分解任务
        subtasks = await self.decompose(task)
        
        # 分配任务
        allocations = self.allocate_tasks(subtasks)
        
        # 发送任务给workers
        for worker_id, task in allocations.items():
            await self.send(
                [worker_id],
                MessageType.REQUEST,
                {"task": task, "task_id": f"{self.agent_id}_{worker_id}"}
            )
    
    async def check_completion(self):
        """检查是否所有任务完成"""
        if len(self.results) == len(self.workers):
            # 聚合结果
            final_result = await self.aggregate_results(self.results)
            
            # 返回最终结果
            await self.bus.publish(AgentMessage(
                id=self.bus.generate_id(),
                sender=self.agent_id,
                receivers=[],
                type=MessageType.RESPONSE,
                content={"result": final_result}
            ))
    
    async def aggregate_results(self, results: Dict[str, Any]) -> str:
        """聚合多个智能体的结果"""
        prompt = f"""将以下智能体的结果聚合成最终输出:
 
{chr(10).join([f"智能体 {aid}: {r}" for aid, r in results.items()])}
 
请提供连贯、完整的最终结果。"""
        
        return await self.llm.generate(prompt)

Worker Agent

class WorkerAgent(BaseAgent):
    """工作智能体"""
    
    def __init__(self, agent_id: str, message_bus: MessageBus, llm, capabilities: List[str]):
        super().__init__(agent_id, message_bus)
        self.llm = llm
        self.capabilities = capabilities
    
    async def handle_message(self, message: AgentMessage):
        """处理任务请求"""
        if message.type == MessageType.REQUEST:
            task = message.content["task"]
            result = await self.execute_task(task)
            
            await self.send(
                [message.sender],
                MessageType.RESPONSE,
                {"result": result, "task_id": message.content.get("task_id")}
            )
    
    async def execute_task(self, task: str) -> str:
        """执行具体任务"""
        prompt = f"执行以下任务:{task}"
        return await self.llm.generate(prompt)

多智能体系统启动

class MultiAgentSystem:
    """多智能体系统"""
    
    def __init__(self, llm):
        self.llm = llm
        self.message_bus = MessageBus()
        self.agents: Dict[str, BaseAgent] = {}
    
    def add_agent(self, agent: BaseAgent):
        """添加智能体"""
        self.agents[agent.agent_id] = agent
    
    async def setup_team(
        self,
        team_config: Dict
    ) -> CoordinatorAgent:
        """设置团队"""
        # 创建workers
        workers = []
        for worker_config in team_config["workers"]:
            worker = WorkerAgent(
                agent_id=worker_config["id"],
                message_bus=self.message_bus,
                llm=self.llm,
                capabilities=worker_config["capabilities"]
            )
            self.add_agent(worker)
            workers.append(worker)
        
        # 创建协调器
        coordinator = CoordinatorAgent(
            agent_id="coordinator",
            message_bus=self.message_bus,
            llm=self.llm,
            workers=workers
        )
        self.add_agent(coordinator)
        
        return coordinator
    
    async def run_task(self, task: str) -> str:
        """运行任务"""
        coordinator = self.agents["coordinator"]
        await coordinator.send(
            ["coordinator"],
            MessageType.REQUEST,
            {"task": task}
        )
        
        # 等待结果
        result = await coordinator.task_queue.get()
        return result

实际应用示例

1. 软件开发团队

class SoftwareDevelopmentTeam:
    """软件开发多智能体团队"""
    
    def __init__(self, llm):
        self.agents = {
            "architect": ArchitectAgent("architect", llm),
            "coder": CoderAgent("coder", llm),
            "reviewer": ReviewerAgent("reviewer", llm),
            "tester": TesterAgent("tester", llm)
        }
        self.coordinator = Coordinator(
            list(self.agents.values()),
            llm
        )
    
    async def develop_feature(self, feature_spec: str) -> Dict:
        """开发功能"""
        return await self.coordinator.execute(
            f"开发功能: {feature_spec}"
        )

2. 研究团队

class ResearchTeam:
    """研究多智能体团队"""
    
    def __init__(self, llm):
        self.agents = {
            "literature_searcher": SearcherAgent(llm),
            "analyst": AnalystAgent(llm),
            "writer": WriterAgent(llm),
            "critic": CriticAgent(llm)
        }
    
    async def conduct_research(self, topic: str) -> str:
        """进行研究"""
        # 并行搜索和分析
        search_task = self.agents["literature_searcher"].search(topic)
        analysis_task = self.agents["analyst"].analyze(topic)
        
        search_results, analysis = await asyncio.gather(
            search_task, analysis_task
        )
        
        # 综合写作
        draft = await self.agents["writer"].write(
            topic, search_results, analysis
        )
        
        # 批评改进
        feedback = await self.agents["critic"].critique(draft)
        
        return self.agents["writer"].revise(draft, feedback)

参考文献