概述

Agent架构模式定义了智能体如何组织其推理、规划和执行过程。本文档详细解析当前主流的Agent架构模式及其适用场景。


1. ReAct (Reasoning + Acting)

核心思想

ReAct将推理(Reasoning)与行动(Acting)交替进行,使语言模型能够:

  • 在推理过程中生成具体任务规划
  • 通过执行动作与环境交互
  • 利用观察结果更新推理

架构图

┌─────────────────────────────────────────────────────────┐
│                      LLM (ReAct Loop)                   │
│                                                         │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐             │
│   │ Thought │ →  │ Action  │ →  │Observe- │             │
│   │  (推理)  │    │  (行动)  │    │  ation  │             │
│   │         │    │         │    │ (观察)   │             │
│   └─────────┘    └────┬────┘    └────┬────┘             │
│                       ↓               ↓                  │
│                  ┌─────────────────────┐                │
│                  │   Tool Execution    │                │
│                  │   (工具执行环境)     │                │
│                  └─────────────────────┘                │
└─────────────────────────────────────────────────────────┘

实现代码

import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
 
class ActionStatus(Enum):
    SUCCESS = "success"
    FAILURE = "failure"
    RUNNING = "running"
 
@dataclass
class ReActStep:
    thought: str
    action: str
    action_input: Dict[str, Any]
    observation: str
    status: ActionStatus
 
class ReActAgent:
    def __init__(self, llm, tools, max_steps=10):
        self.llm = llm
        self.tools = tools
        self.max_steps = max_steps
        self.prompt_template = """你是一个智能助手,可以使用以下工具来回答问题。
 
工具列表:
{tools}
 
请按照以下格式逐步推理:
 
问题: {input}
{scratchpad}
 
现在请执行下一步:"""
    
    def format_tools(self) -> str:
        tool_schemas = []
        for name, tool in self.tools.items():
            desc = tool.description
            params = ", ".join(tool.parameters.keys())
            tool_schemas.append(f"{name}({params}): {desc}")
        return "\n".join(tool_schemas)
    
    async def run(self, task: str) -> str:
        scratchpad = ""
        history = []
        
        for step in range(self.max_steps):
            # 构建提示
            prompt = self.prompt_template.format(
                input=task,
                tools=self.format_tools(),
                scratchpad=scratchpad
            )
            
            # 获取LLM响应
            response = await self.llm.generate(prompt)
            
            # 解析响应
            parsed = self.parse_response(response)
            
            if parsed["type"] == "finish":
                return parsed["answer"]
            
            # 执行动作
            if parsed["type"] == "action":
                action_name = parsed["action"]
                action_input = parsed["action_input"]
                
                try:
                    observation = await self.tools[action_name].execute(**action_input)
                    observation_str = f"观察结果: {observation}"
                except Exception as e:
                    observation_str = f"执行错误: {str(e)}"
                
                # 更新思考链
                scratchpad += f"\n{step+1}步:\n"
                scratchpad += f"思考: {parsed['thought']}\n"
                scratchpad += f"行动: {action_name}({action_input})\n"
                scratchpad += f"{observation_str}\n"
                
                history.append(ReActStep(
                    thought=parsed["thought"],
                    action=action_name,
                    action_input=action_input,
                    observation=observation_str,
                    status=ActionStatus.SUCCESS if "错误" not in observation_str else ActionStatus.FAILURE
                ))
        
        return "达到最大步数限制"
    
    def parse_response(self, response: str) -> Dict:
        """解析LLM响应,提取thought、action和action_input"""
        # 简化解析逻辑
        lines = response.strip().split("\n")
        result = {"type": "action", "thought": "", "action": None, "action_input": {}}
        
        for line in lines:
            if line.startswith("思考:"):
                result["thought"] = line[3:].strip()
            elif line.startswith("行动:"):
                # 解析动作
                action_part = line[3:].strip()
                if action_part.startswith("finish"):
                    result["type"] = "finish"
                    result["answer"] = action_part.replace("finish[", "").replace("]", "")
                else:
                    result["action"] = action_part
            elif line.startswith("参数:"):
                result["action_input"] = eval(line[3:].strip())
        
        return result

ReAct变体

ReAct-Perceiver:增强感知能力

class ReActPerceiver(ReActAgent):
    """带有增强感知能力的ReAct变体"""
    def __init__(self, *args, perception_model=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.perception = perception_model
    
    async def run(self, task: str) -> str:
        # 预处理:感知增强
        perceived_task = await self.perception.enhance(task)
        return await super().run(perceived_task)

2. Plan-and-Execute

核心思想

Plan-and-Execute模式将规划与执行分离:

  1. 规划阶段:生成完整的任务分解计划
  2. 执行阶段:按顺序执行计划中的每个步骤

这种分离使得:

  • 规划过程可以进行更深入的推理
  • 执行过程更加高效
  • 更容易实现并行执行

架构图

┌─────────────────────────────────────────────────────────┐
│  Plan-and-Execute 架构                                   │
│                                                         │
│  ┌─────────────────────────────────────────────────┐    │
│  │              PLANNER (规划器)                     │    │
│  │  输入: 任务目标                                    │    │
│  │  输出: 有序的任务列表                              │    │
│  │  方法: LLM + 结构化输出                           │    │
│  └────────────────────┬────────────────────────────┘    │
│                       ↓                                  │
│  ┌─────────────────────────────────────────────────┐    │
│  │              EXECUTOR (执行器)                     │    │
│  │                                                  │    │
│  │   步骤1 ─→ 步骤2 ─→ 步骤3 ─→ ... ─→ 步骤N        │    │
│  │     ↓        ↓        ↓              ↓           │    │
│  │   执行      执行      执行            执行         │    │
│  │                                                  │    │
│  └─────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────┘

实现代码

from typing import List, Optional
from dataclasses import dataclass, field
from enum import Enum
 
class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
 
@dataclass
class SubTask:
    id: int
    description: str
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[str] = None
    dependencies: List[int] = field(default_factory=list)
 
@dataclass
class ExecutionPlan:
    tasks: List[SubTask]
    final_goal: str
    
    def get_ready_tasks(self) -> List[SubTask]:
        """获取所有依赖已满足且待执行的任务"""
        completed_ids = {t.id for t in self.tasks if t.status == TaskStatus.COMPLETED}
        ready = []
        for task in self.tasks:
            if task.status == TaskStatus.PENDING:
                if all(dep in completed_ids for dep in task.dependencies):
                    ready.append(task)
        return ready
 
class PlanAndExecuteAgent:
    def __init__(self, llm, executor, max_retries=2):
        self.llm = llm
        self.executor = executor
        self.max_retries = max_retries
    
    async def plan(self, task: str) -> ExecutionPlan:
        """使用LLM生成任务分解计划"""
        planning_prompt = f"""将以下复杂任务分解为可执行的子任务:
 
任务: {task}
 
请以JSON格式输出任务分解,每个子任务需要包含:
- id: 任务编号
- description: 任务描述
- dependencies: 依赖的任务ID列表(前置任务)
 
确保:
1. 任务分解粒度适中(5-15个子任务)
2. 每个任务可以独立执行
3. 明确标注任务间的依赖关系
 
JSON格式:
{{"tasks": [...], "final_goal": "..."}}
"""
        response = await self.llm.generate(planning_prompt, format="json")
        plan_data = json.loads(response)
        
        tasks = [SubTask(**t) for t in plan_data["tasks"]]
        return ExecutionPlan(tasks=tasks, final_goal=plan_data["final_goal"])
    
    async def execute_task(self, task: SubTask, context: Dict) -> str:
        """执行单个任务"""
        execution_prompt = f"""执行以下任务:
 
任务: {task.description}
上下文: {context}
 
请执行任务并报告结果。"""
        
        for attempt in range(self.max_retries):
            try:
                result = await self.llm.generate(execution_prompt)
                return result
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise ExecutionError(f"Task {task.id} failed after {self.max_retries} attempts")
        
    async def run(self, task: str) -> str:
        # 阶段1: 规划
        plan = await self.plan(task)
        
        # 阶段2: 执行
        context = {"task": task}
        
        while True:
            ready_tasks = plan.get_ready_tasks()
            
            if not ready_tasks:
                if all(t.status == TaskStatus.COMPLETED for t in plan.tasks):
                    break
                else:
                    # 检测到循环依赖或死锁
                    await self.replan(plan)
                    continue
            
            # 执行就绪任务(可并行)
            if len(ready_tasks) > 1:
                # 并行执行
                results = await asyncio.gather(*[
                    self.execute_task(t, context) for t in ready_tasks
                ])
                for task, result in zip(ready_tasks, results):
                    task.status = TaskStatus.COMPLETED
                    task.result = result
                    context[f"task_{task.id}"] = result
            else:
                # 串行执行
                task = ready_tasks[0]
                task.status = TaskStatus.RUNNING
                result = await self.execute_task(task, context)
                task.status = TaskStatus.COMPLETED
                task.result = result
                context[f"task_{task.id}"] = result
        
        # 返回最终结果
        return context.get(plan.final_goal, "任务完成")
    
    async def replan(self, plan: ExecutionPlan):
        """重新规划"""
        failed_tasks = [t for t in plan.tasks if t.status == TaskStatus.FAILED]
        new_plan = await self.plan(
            f"修复以下失败的任务: {[t.description for t in failed_tasks]}"
        )
        # 合并新计划(简化处理)
        plan.tasks.extend(new_plan.tasks)

3. Reflexion

核心思想

Reflexion通过自我反思机制来改进决策:

  1. 执行一个动作
  2. 获得观察结果
  3. 反思:分析成功/失败原因
  4. 更新内部记忆/策略
  5. 决定下一步行动

与ReAct的关键区别

维度ReActReflexion
记忆类型简单的观察历史结构化经验总结
反思机制有(显式自我评估)
策略更新静态动态(基于反思改进)
适用场景简单任务需要学习的复杂任务

实现代码

@dataclass
class Experience:
    observation: str
    reward: float
    reflection: str = ""
    
class ReflexionAgent:
    def __init__(self, llm, tools, memory_system):
        self.llm = llm
        self.tools = tools
        self.memory = memory_system
        self.experiences: List[Experience] = []
    
    async def reflect(self, experience: Experience) -> str:
        """生成反思"""
        reflection_prompt = f"""分析以下执行经验,识别成功和失败的原因:
 
观察: {experience.observation}
奖励: {experience.reward}
 
请从以下角度进行反思:
1. 这次执行的主要成功点是什么?
2. 主要的错误或不足是什么?
3. 如何改进可以避免类似问题?
4. 这些经验对未来的决策有什么指导意义?
 
请用简洁的语言总结你的反思(2-3句话)。"""
        
        reflection = await self.llm.generate(reflection_prompt)
        experience.reflection = reflection
        return reflection
    
    async def decide_next_action(self, state: str, reflection: str = "") -> Dict:
        """基于当前状态和历史反思决定下一步动作"""
        history_context = ""
        if self.experiences:
            history_context = "历史经验总结:\n"
            for i, exp in enumerate(self.experiences[-3:], 1):
                history_context += f"{i}. {exp.reflection}\n"
        
        decision_prompt = f"""当前状态: {state}
 
{history_context}
 
{f"本次反思: {reflection}" if reflection else ""}
 
基于以上信息,决定下一步行动。
"""
        return await self.llm.structured_generate(decision_prompt, format="json")
    
    async def run(self, task: str) -> str:
        state = task
        
        for step in range(self.max_steps):
            # 决策
            decision = await self.decide_next_action(state)
            
            if decision["type"] == "finish":
                return decision["answer"]
            
            # 执行
            if decision["type"] == "tool":
                result = await self.tools[decision["tool"]].execute(**decision["args"])
            else:
                result = decision["answer"]
            
            # 评估奖励
            reward = self.evaluate_outcome(state, result)
            
            # 创建经验
            experience = Experience(
                observation=f"{state}\n行动: {decision}\n结果: {result}",
                reward=reward
            )
            
            # 反思
            reflection = await self.reflect(experience)
            
            # 存储经验
            self.experiences.append(experience)
            self.memory.add({
                "task": task,
                "reflection": reflection,
                "reward": reward
            })
            
            # 更新状态
            state = result
        
        return "达到最大步数"
    
    def evaluate_outcome(self, state: str, result: str) -> float:
        """评估行动结果"""
        # 简化的奖励函数
        if "错误" in result or "失败" in result:
            return -1.0
        elif "成功" in result or "完成" in result:
            return 1.0
        return 0.0

4. Supervisor/Hierarchical架构

核心思想

层次化架构使用多个级别的Agent:

  • Supervisor:高层规划与协调
  • Sub-agents:执行具体子任务
  • 消息传递:层级间通信

架构图

┌─────────────────────────────────────────────────────────┐
│                    SUPERVISOR (总指挥)                    │
│                  任务理解、全局规划、资源分配                │
└──────────────────────┬──────────────────────────────────┘
                       │
        ┌──────────────┼──────────────┐
        ↓              ↓              ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│  Sub-Agent A  │ │  Sub-Agent B  │ │  Sub-Agent C  │
│  (数据分析)   │ │  (代码编写)   │ │  (测试验证)   │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
        │                  │                  │
        └──────────────────┼──────────────────┘
                           ↓
              ┌────────────────────────┐
              │    共享上下文/结果聚合    │
              └────────────────────────┘

实现代码

from abc import ABC, abstractmethod
from typing import Dict, List, Any
import asyncio
 
class SubAgent(ABC):
    """子智能体基类"""
    
    @abstractmethod
    async def execute(self, instruction: str) -> str:
        pass
    
    @abstractmethod
    def can_handle(self, task: str) -> float:
        """返回对该任务的适合程度 (0-1)"""
        pass
 
class SupervisorAgent:
    def __init__(self, llm, sub_agents: List[SubAgent]):
        self.llm = llm
        self.sub_agents = {agent.name: agent for agent in sub_agents}
        self.shared_context = {}
    
    async def decompose_task(self, task: str) -> List[Dict]:
        """分解任务并分配给子智能体"""
        decomposition_prompt = f"""将以下任务分解并分配给专业智能体:
 
任务: {task}
 
可用智能体:
{self.format_agents()}
 
对于每个子任务,请指定:
1. 任务描述
2. 负责的智能体名称
3. 依赖关系(哪些任务必须先完成)
 
以JSON格式输出。"""
        
        response = await self.llm.generate(decomposition_prompt, format="json")
        plan = json.loads(response)
        return plan["subtasks"]
    
    def format_agents(self) -> str:
        return "\n".join([
            f"- {name}: {agent.description}" 
            for name, agent in self.sub_agents.items()
        ])
    
    async def run(self, task: str) -> str:
        # 分解任务
        subtasks = await self.decompose_task(task)
        
        # 构建任务依赖图
        task_graph = self.build_graph(subtasks)
        
        # 按依赖顺序执行
        results = {}
        
        while task_graph.pending():
            # 获取所有可执行的任务
            ready = task_graph.get_ready_tasks()
            
            if not ready:
                # 处理依赖冲突
                await self.handle_deadlock(task_graph)
                continue
            
            # 并行执行可执行任务
            if len(ready) > 1:
                batch_results = await asyncio.gather(*[
                    self.execute_subtask(t, results) for t in ready
                ])
                for t, result in zip(ready, batch_results):
                    results[t["id"]] = result
                    task_graph.complete(t["id"])
            else:
                result = await self.execute_subtask(ready[0], results)
                results[ready[0]["id"]] = result
                task_graph.complete(ready[0]["id"])
        
        # 聚合最终结果
        return self.aggregate_results(results)
    
    async def execute_subtask(self, task: Dict, context: Dict) -> str:
        agent_name = task["agent"]
        agent = self.sub_agents[agent_name]
        
        # 准备指令(包含依赖任务的结果)
        instruction = task["description"]
        if task.get("dependencies"):
            instruction += "\n\n相关任务结果:\n"
            for dep_id in task["dependencies"]:
                if dep_id in context:
                    instruction += f"- {dep_id}: {context[dep_id]}\n"
        
        # 执行
        result = await agent.execute(instruction)
        
        # 更新共享上下文
        self.shared_context[task["id"]] = result
        
        return result
    
    def aggregate_results(self, results: Dict[str, str]) -> str:
        """聚合所有子任务结果"""
        aggregation_prompt = f"""聚合以下子任务的结果,形成最终答案:
 
{chr(10).join([f'{k}: {v}' for k, v in results.items()])}
 
请提供最终的综合答案。"""
        
        return self.llm.generate(aggregation_prompt)

5. Tool-Augmented Agents

核心思想

Tool-Augmented Agents通过调用外部工具扩展能力边界:

class ToolRegistry:
    """工具注册表"""
    
    def __init__(self):
        self.tools: Dict[str, Tool] = {}
        self.tool_schemas: Dict[str, Dict] = {}
    
    def register(self, name: str, tool: Tool):
        self.tools[name] = tool
        self.tool_schemas[name] = {
            "name": name,
            "description": tool.description,
            "parameters": tool.get_parameters_schema()
        }
    
    def get_schema(self) -> str:
        """生成工具schema供LLM使用"""
        return json.dumps(self.tool_schemas, ensure_ascii=False, indent=2)
    
    def call(self, name: str, **kwargs) -> Any:
        return self.tools[name].execute(**kwargs)
 
class ToolAugmentedAgent:
    def __init__(self, llm, tool_registry: ToolRegistry):
        self.llm = llm
        self.tools = tool_registry
    
    async def decide_and_execute(self, state: str) -> str:
        """决策是否使用工具或直接回答"""
        prompt = f"""当前状态:
{state}
 
可用工具:
{self.tools.get_schema()}
 
决定下一步:
1. 如果需要外部信息或操作,使用工具
2. 如果可以直接回答,返回最终答案
 
格式:
- 如果使用工具: {{"use_tool": true, "tool": "工具名", "args": {{"参数": "值"}}}}
- 如果完成: {{"use_tool": false, "answer": "最终答案"}}
"""
        
        decision = await self.llm.generate(prompt, format="json")
        
        if decision["use_tool"]:
            result = self.tools.call(decision["tool"], **decision["args"])
            return f"工具 {decision['tool']} 返回: {result}"
        else:
            return decision["answer"]

架构选择指南

场景推荐架构原因
简单问答+工具调用ReAct简洁、高效
复杂多步骤任务Plan-and-Execute规划清晰、可解释
需要学习的任务Reflexion自我改进能力
多领域协作Supervisor专业化分工
工具密集型Tool-Augmented工具集成友好

参考文献