概述
Agent架构模式定义了智能体如何组织其推理、规划和执行过程。本文档详细解析当前主流的Agent架构模式及其适用场景。
1. ReAct (Reasoning + Acting)
核心思想
ReAct将推理(Reasoning)与行动(Acting)交替进行,使语言模型能够:
- 在推理过程中生成具体任务规划
- 通过执行动作与环境交互
- 利用观察结果更新推理
架构图
┌─────────────────────────────────────────────────────────┐
│ LLM (ReAct Loop) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Thought │ → │ Action │ → │Observe- │ │
│ │ (推理) │ │ (行动) │ │ ation │ │
│ │ │ │ │ │ (观察) │ │
│ └─────────┘ └────┬────┘ └────┬────┘ │
│ ↓ ↓ │
│ ┌─────────────────────┐ │
│ │ Tool Execution │ │
│ │ (工具执行环境) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────┘
实现代码
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class ActionStatus(Enum):
SUCCESS = "success"
FAILURE = "failure"
RUNNING = "running"
@dataclass
class ReActStep:
thought: str
action: str
action_input: Dict[str, Any]
observation: str
status: ActionStatus
class ReActAgent:
def __init__(self, llm, tools, max_steps=10):
self.llm = llm
self.tools = tools
self.max_steps = max_steps
self.prompt_template = """你是一个智能助手,可以使用以下工具来回答问题。
工具列表:
{tools}
请按照以下格式逐步推理:
问题: {input}
{scratchpad}
现在请执行下一步:"""
def format_tools(self) -> str:
tool_schemas = []
for name, tool in self.tools.items():
desc = tool.description
params = ", ".join(tool.parameters.keys())
tool_schemas.append(f"{name}({params}): {desc}")
return "\n".join(tool_schemas)
async def run(self, task: str) -> str:
scratchpad = ""
history = []
for step in range(self.max_steps):
# 构建提示
prompt = self.prompt_template.format(
input=task,
tools=self.format_tools(),
scratchpad=scratchpad
)
# 获取LLM响应
response = await self.llm.generate(prompt)
# 解析响应
parsed = self.parse_response(response)
if parsed["type"] == "finish":
return parsed["answer"]
# 执行动作
if parsed["type"] == "action":
action_name = parsed["action"]
action_input = parsed["action_input"]
try:
observation = await self.tools[action_name].execute(**action_input)
observation_str = f"观察结果: {observation}"
except Exception as e:
observation_str = f"执行错误: {str(e)}"
# 更新思考链
scratchpad += f"\n第{step+1}步:\n"
scratchpad += f"思考: {parsed['thought']}\n"
scratchpad += f"行动: {action_name}({action_input})\n"
scratchpad += f"{observation_str}\n"
history.append(ReActStep(
thought=parsed["thought"],
action=action_name,
action_input=action_input,
observation=observation_str,
status=ActionStatus.SUCCESS if "错误" not in observation_str else ActionStatus.FAILURE
))
return "达到最大步数限制"
def parse_response(self, response: str) -> Dict:
"""解析LLM响应,提取thought、action和action_input"""
# 简化解析逻辑
lines = response.strip().split("\n")
result = {"type": "action", "thought": "", "action": None, "action_input": {}}
for line in lines:
if line.startswith("思考:"):
result["thought"] = line[3:].strip()
elif line.startswith("行动:"):
# 解析动作
action_part = line[3:].strip()
if action_part.startswith("finish"):
result["type"] = "finish"
result["answer"] = action_part.replace("finish[", "").replace("]", "")
else:
result["action"] = action_part
elif line.startswith("参数:"):
result["action_input"] = eval(line[3:].strip())
return resultReAct变体
ReAct-Perceiver:增强感知能力
class ReActPerceiver(ReActAgent):
"""带有增强感知能力的ReAct变体"""
def __init__(self, *args, perception_model=None, **kwargs):
super().__init__(*args, **kwargs)
self.perception = perception_model
async def run(self, task: str) -> str:
# 预处理:感知增强
perceived_task = await self.perception.enhance(task)
return await super().run(perceived_task)2. Plan-and-Execute
核心思想
Plan-and-Execute模式将规划与执行分离:
- 规划阶段:生成完整的任务分解计划
- 执行阶段:按顺序执行计划中的每个步骤
这种分离使得:
- 规划过程可以进行更深入的推理
- 执行过程更加高效
- 更容易实现并行执行
架构图
┌─────────────────────────────────────────────────────────┐
│ Plan-and-Execute 架构 │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ PLANNER (规划器) │ │
│ │ 输入: 任务目标 │ │
│ │ 输出: 有序的任务列表 │ │
│ │ 方法: LLM + 结构化输出 │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ EXECUTOR (执行器) │ │
│ │ │ │
│ │ 步骤1 ─→ 步骤2 ─→ 步骤3 ─→ ... ─→ 步骤N │ │
│ │ ↓ ↓ ↓ ↓ │ │
│ │ 执行 执行 执行 执行 │ │
│ │ │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
实现代码
from typing import List, Optional
from dataclasses import dataclass, field
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SubTask:
id: int
description: str
status: TaskStatus = TaskStatus.PENDING
result: Optional[str] = None
dependencies: List[int] = field(default_factory=list)
@dataclass
class ExecutionPlan:
tasks: List[SubTask]
final_goal: str
def get_ready_tasks(self) -> List[SubTask]:
"""获取所有依赖已满足且待执行的任务"""
completed_ids = {t.id for t in self.tasks if t.status == TaskStatus.COMPLETED}
ready = []
for task in self.tasks:
if task.status == TaskStatus.PENDING:
if all(dep in completed_ids for dep in task.dependencies):
ready.append(task)
return ready
class PlanAndExecuteAgent:
def __init__(self, llm, executor, max_retries=2):
self.llm = llm
self.executor = executor
self.max_retries = max_retries
async def plan(self, task: str) -> ExecutionPlan:
"""使用LLM生成任务分解计划"""
planning_prompt = f"""将以下复杂任务分解为可执行的子任务:
任务: {task}
请以JSON格式输出任务分解,每个子任务需要包含:
- id: 任务编号
- description: 任务描述
- dependencies: 依赖的任务ID列表(前置任务)
确保:
1. 任务分解粒度适中(5-15个子任务)
2. 每个任务可以独立执行
3. 明确标注任务间的依赖关系
JSON格式:
{{"tasks": [...], "final_goal": "..."}}
"""
response = await self.llm.generate(planning_prompt, format="json")
plan_data = json.loads(response)
tasks = [SubTask(**t) for t in plan_data["tasks"]]
return ExecutionPlan(tasks=tasks, final_goal=plan_data["final_goal"])
async def execute_task(self, task: SubTask, context: Dict) -> str:
"""执行单个任务"""
execution_prompt = f"""执行以下任务:
任务: {task.description}
上下文: {context}
请执行任务并报告结果。"""
for attempt in range(self.max_retries):
try:
result = await self.llm.generate(execution_prompt)
return result
except Exception as e:
if attempt == self.max_retries - 1:
raise ExecutionError(f"Task {task.id} failed after {self.max_retries} attempts")
async def run(self, task: str) -> str:
# 阶段1: 规划
plan = await self.plan(task)
# 阶段2: 执行
context = {"task": task}
while True:
ready_tasks = plan.get_ready_tasks()
if not ready_tasks:
if all(t.status == TaskStatus.COMPLETED for t in plan.tasks):
break
else:
# 检测到循环依赖或死锁
await self.replan(plan)
continue
# 执行就绪任务(可并行)
if len(ready_tasks) > 1:
# 并行执行
results = await asyncio.gather(*[
self.execute_task(t, context) for t in ready_tasks
])
for task, result in zip(ready_tasks, results):
task.status = TaskStatus.COMPLETED
task.result = result
context[f"task_{task.id}"] = result
else:
# 串行执行
task = ready_tasks[0]
task.status = TaskStatus.RUNNING
result = await self.execute_task(task, context)
task.status = TaskStatus.COMPLETED
task.result = result
context[f"task_{task.id}"] = result
# 返回最终结果
return context.get(plan.final_goal, "任务完成")
async def replan(self, plan: ExecutionPlan):
"""重新规划"""
failed_tasks = [t for t in plan.tasks if t.status == TaskStatus.FAILED]
new_plan = await self.plan(
f"修复以下失败的任务: {[t.description for t in failed_tasks]}"
)
# 合并新计划(简化处理)
plan.tasks.extend(new_plan.tasks)3. Reflexion
核心思想
Reflexion通过自我反思机制来改进决策:
- 执行一个动作
- 获得观察结果
- 反思:分析成功/失败原因
- 更新内部记忆/策略
- 决定下一步行动
与ReAct的关键区别
| 维度 | ReAct | Reflexion |
|---|---|---|
| 记忆类型 | 简单的观察历史 | 结构化经验总结 |
| 反思机制 | 无 | 有(显式自我评估) |
| 策略更新 | 静态 | 动态(基于反思改进) |
| 适用场景 | 简单任务 | 需要学习的复杂任务 |
实现代码
@dataclass
class Experience:
observation: str
reward: float
reflection: str = ""
class ReflexionAgent:
def __init__(self, llm, tools, memory_system):
self.llm = llm
self.tools = tools
self.memory = memory_system
self.experiences: List[Experience] = []
async def reflect(self, experience: Experience) -> str:
"""生成反思"""
reflection_prompt = f"""分析以下执行经验,识别成功和失败的原因:
观察: {experience.observation}
奖励: {experience.reward}
请从以下角度进行反思:
1. 这次执行的主要成功点是什么?
2. 主要的错误或不足是什么?
3. 如何改进可以避免类似问题?
4. 这些经验对未来的决策有什么指导意义?
请用简洁的语言总结你的反思(2-3句话)。"""
reflection = await self.llm.generate(reflection_prompt)
experience.reflection = reflection
return reflection
async def decide_next_action(self, state: str, reflection: str = "") -> Dict:
"""基于当前状态和历史反思决定下一步动作"""
history_context = ""
if self.experiences:
history_context = "历史经验总结:\n"
for i, exp in enumerate(self.experiences[-3:], 1):
history_context += f"{i}. {exp.reflection}\n"
decision_prompt = f"""当前状态: {state}
{history_context}
{f"本次反思: {reflection}" if reflection else ""}
基于以上信息,决定下一步行动。
"""
return await self.llm.structured_generate(decision_prompt, format="json")
async def run(self, task: str) -> str:
state = task
for step in range(self.max_steps):
# 决策
decision = await self.decide_next_action(state)
if decision["type"] == "finish":
return decision["answer"]
# 执行
if decision["type"] == "tool":
result = await self.tools[decision["tool"]].execute(**decision["args"])
else:
result = decision["answer"]
# 评估奖励
reward = self.evaluate_outcome(state, result)
# 创建经验
experience = Experience(
observation=f"{state}\n行动: {decision}\n结果: {result}",
reward=reward
)
# 反思
reflection = await self.reflect(experience)
# 存储经验
self.experiences.append(experience)
self.memory.add({
"task": task,
"reflection": reflection,
"reward": reward
})
# 更新状态
state = result
return "达到最大步数"
def evaluate_outcome(self, state: str, result: str) -> float:
"""评估行动结果"""
# 简化的奖励函数
if "错误" in result or "失败" in result:
return -1.0
elif "成功" in result or "完成" in result:
return 1.0
return 0.04. Supervisor/Hierarchical架构
核心思想
层次化架构使用多个级别的Agent:
- Supervisor:高层规划与协调
- Sub-agents:执行具体子任务
- 消息传递:层级间通信
架构图
┌─────────────────────────────────────────────────────────┐
│ SUPERVISOR (总指挥) │
│ 任务理解、全局规划、资源分配 │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────┼──────────────┐
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Sub-Agent A │ │ Sub-Agent B │ │ Sub-Agent C │
│ (数据分析) │ │ (代码编写) │ │ (测试验证) │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└──────────────────┼──────────────────┘
↓
┌────────────────────────┐
│ 共享上下文/结果聚合 │
└────────────────────────┘
实现代码
from abc import ABC, abstractmethod
from typing import Dict, List, Any
import asyncio
class SubAgent(ABC):
"""子智能体基类"""
@abstractmethod
async def execute(self, instruction: str) -> str:
pass
@abstractmethod
def can_handle(self, task: str) -> float:
"""返回对该任务的适合程度 (0-1)"""
pass
class SupervisorAgent:
def __init__(self, llm, sub_agents: List[SubAgent]):
self.llm = llm
self.sub_agents = {agent.name: agent for agent in sub_agents}
self.shared_context = {}
async def decompose_task(self, task: str) -> List[Dict]:
"""分解任务并分配给子智能体"""
decomposition_prompt = f"""将以下任务分解并分配给专业智能体:
任务: {task}
可用智能体:
{self.format_agents()}
对于每个子任务,请指定:
1. 任务描述
2. 负责的智能体名称
3. 依赖关系(哪些任务必须先完成)
以JSON格式输出。"""
response = await self.llm.generate(decomposition_prompt, format="json")
plan = json.loads(response)
return plan["subtasks"]
def format_agents(self) -> str:
return "\n".join([
f"- {name}: {agent.description}"
for name, agent in self.sub_agents.items()
])
async def run(self, task: str) -> str:
# 分解任务
subtasks = await self.decompose_task(task)
# 构建任务依赖图
task_graph = self.build_graph(subtasks)
# 按依赖顺序执行
results = {}
while task_graph.pending():
# 获取所有可执行的任务
ready = task_graph.get_ready_tasks()
if not ready:
# 处理依赖冲突
await self.handle_deadlock(task_graph)
continue
# 并行执行可执行任务
if len(ready) > 1:
batch_results = await asyncio.gather(*[
self.execute_subtask(t, results) for t in ready
])
for t, result in zip(ready, batch_results):
results[t["id"]] = result
task_graph.complete(t["id"])
else:
result = await self.execute_subtask(ready[0], results)
results[ready[0]["id"]] = result
task_graph.complete(ready[0]["id"])
# 聚合最终结果
return self.aggregate_results(results)
async def execute_subtask(self, task: Dict, context: Dict) -> str:
agent_name = task["agent"]
agent = self.sub_agents[agent_name]
# 准备指令(包含依赖任务的结果)
instruction = task["description"]
if task.get("dependencies"):
instruction += "\n\n相关任务结果:\n"
for dep_id in task["dependencies"]:
if dep_id in context:
instruction += f"- {dep_id}: {context[dep_id]}\n"
# 执行
result = await agent.execute(instruction)
# 更新共享上下文
self.shared_context[task["id"]] = result
return result
def aggregate_results(self, results: Dict[str, str]) -> str:
"""聚合所有子任务结果"""
aggregation_prompt = f"""聚合以下子任务的结果,形成最终答案:
{chr(10).join([f'{k}: {v}' for k, v in results.items()])}
请提供最终的综合答案。"""
return self.llm.generate(aggregation_prompt)5. Tool-Augmented Agents
核心思想
Tool-Augmented Agents通过调用外部工具扩展能力边界:
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools: Dict[str, Tool] = {}
self.tool_schemas: Dict[str, Dict] = {}
def register(self, name: str, tool: Tool):
self.tools[name] = tool
self.tool_schemas[name] = {
"name": name,
"description": tool.description,
"parameters": tool.get_parameters_schema()
}
def get_schema(self) -> str:
"""生成工具schema供LLM使用"""
return json.dumps(self.tool_schemas, ensure_ascii=False, indent=2)
def call(self, name: str, **kwargs) -> Any:
return self.tools[name].execute(**kwargs)
class ToolAugmentedAgent:
def __init__(self, llm, tool_registry: ToolRegistry):
self.llm = llm
self.tools = tool_registry
async def decide_and_execute(self, state: str) -> str:
"""决策是否使用工具或直接回答"""
prompt = f"""当前状态:
{state}
可用工具:
{self.tools.get_schema()}
决定下一步:
1. 如果需要外部信息或操作,使用工具
2. 如果可以直接回答,返回最终答案
格式:
- 如果使用工具: {{"use_tool": true, "tool": "工具名", "args": {{"参数": "值"}}}}
- 如果完成: {{"use_tool": false, "answer": "最终答案"}}
"""
decision = await self.llm.generate(prompt, format="json")
if decision["use_tool"]:
result = self.tools.call(decision["tool"], **decision["args"])
return f"工具 {decision['tool']} 返回: {result}"
else:
return decision["answer"]架构选择指南
| 场景 | 推荐架构 | 原因 |
|---|---|---|
| 简单问答+工具调用 | ReAct | 简洁、高效 |
| 复杂多步骤任务 | Plan-and-Execute | 规划清晰、可解释 |
| 需要学习的任务 | Reflexion | 自我改进能力 |
| 多领域协作 | Supervisor | 专业化分工 |
| 工具密集型 | Tool-Augmented | 工具集成友好 |