引言
大语言模型(LLM)驱动的Agent系统代表了人工智能从被动响应到主动规划的重要范式转变。本文档系统梳理当前主流的LLM Agent架构模式,分析其核心机制、设计原则与适用场景。
Agent系统核心要素
一个完整的LLM Agent系统通常包含以下核心组件:
| 组件 | 功能 | 关键挑战 |
|---|---|---|
| 推理引擎 | 基于LLM的决策与规划 | 推理可靠性、错误恢复 |
| 工具接口 | 与外部系统交互 | 工具描述、调用可靠性 |
| 记忆系统 | 存储历史与上下文 | 记忆组织、检索效率 |
| 状态管理 | 跟踪任务进度与环境 | 状态一致性、恢复机制 |
| 评估机制 | 判断动作质量与进度 | 自我纠正、多步规划 |
1. ReAct范式:推理与行动交织
1.1 核心思想
ReAct(Reason + Act)是由Yao等人于2022年提出的经典Agent范式,其核心理念是将推理过程显式化,使LLM能够交替进行:
- Thought(思考):分析当前状态,决定下一步行动
- Action(行动):执行具体操作(如调用工具)
- Observation(观察):获取行动结果,更新状态
# ReAct Agent核心循环伪代码
def react_agent(query, tools, max_iterations=10):
"""
ReAct范式Agent实现
Args:
query: 用户查询
tools: 可用工具列表
max_iterations: 最大迭代次数
"""
context = []
observation = None
for step in range(max_iterations):
# 1. 构建提示:包含历史、观察、可用工具
prompt = build_react_prompt(
query=query,
history=context,
observation=observation,
tools=tools
)
# 2. LLM生成Thought和Action
response = llm.generate(prompt)
thought = extract_thought(response)
action = extract_action(response)
# 3. 执行动作
if action.type == "finish":
return action.result
elif action.type == "tool_call":
observation = execute_tool(action.tool, action.args)
else:
observation = None # 空动作,继续推理
# 4. 更新上下文
context.append({
"thought": thought,
"action": action,
"observation": observation
})
return "Max iterations reached"1.2 ReAct提示模板
标准的ReAct提示通常采用以下格式:
你是一个帮助用户完成任务的AI助手。你需要按以下格式思考和行动:
## 可用工具
{tools_description}
## 历史记录
{history}
## 当前观察
{observation}
请按以下格式输出:
Thought: [分析当前状况,决定下一步行动]
Action: [要执行的动作,可以是工具调用或"finish"]1.3 ReAct的优势与局限
| 优势 | 局限 |
|---|---|
| 推理过程透明可追溯 | 推理token开销较大 |
| 易于调试和干预 | 长序列可能累积错误 |
| 适合工具使用场景 | 缺乏全局规划视角 |
| 可与外部系统无缝集成 | 对LLM推理能力依赖强 |
1.4 ReAct变体
ReAct with Self-Consistency
通过采样多个推理路径并投票选择最一致的答案:
def react_self_consistent(prompt, tools, n_samples=5):
"""ReAct + 自洽性:多路径推理投票"""
answers = []
for _ in range(n_samples):
answer = react_agent(prompt, tools, max_iterations=8)
answers.append(answer)
# 投票选择最常见的答案
return Counter(answers).most_common(1)[0][0]ReAct with Reflexion
在ReAct基础上增加自我反思机制:
def react_with_reflexion(query, tools, max_reflections=3):
"""ReAct + 反思:失败时重新规划"""
for reflection_round in range(max_reflections):
result = react_agent(query, tools)
# 评估结果质量
if evaluate_result(result, query):
return result
# 生成反思:为什么失败?
reflection_prompt = f"""
Previous attempt result: {result}
Original query: {query}
Analyze why this result is unsatisfactory and suggest improvements.
"""
reflection = llm.generate(reflection_prompt)
# 基于反思调整提示
query = f"{query}\n\nPrevious reflection: {reflection}"
return result2. Plan-and-Execute范式
2.1 核心思想
Plan-and-Execute(P&E)范式将Agent行为分为两个明确阶段:
- Plan(规划阶段):先生成完整的行动计划
- Execute(执行阶段):按计划逐步执行动作
这种设计符合人类的问题解决习惯:先想后做。
class PlanAndExecuteAgent:
"""Plan-and-Execute Agent实现"""
def __init__(self, llm, tools, planner_prompt=None):
self.llm = llm
self.tools = tools
self.planner_prompt = planner_prompt or self.default_planner_prompt()
def run(self, query):
# 阶段1:规划
plan = self.create_plan(query)
# 阶段2:执行
for step in plan.steps:
result = self.execute_step(step)
plan.update_state(result)
# 检查是否需要重新规划
if plan.needs_replanning(result):
plan = self.replan(query, plan, result)
return plan.final_result
def create_plan(self, query):
"""调用LLM生成行动计划"""
prompt = f"""
任务:{query}
可用工具:
{self.format_tools()}
请生成一个详细的行动计划,包括:
1. 任务分解的步骤
2. 每个步骤的预期输入输出
3. 步骤间的依赖关系
输出格式:
Step 1: [描述]
Step 2: [描述]
...
"""
response = self.llm.generate(prompt)
return Plan.parse(response)
def execute_step(self, step):
"""执行单个步骤"""
# 调用相应的工具
tool = self.match_tool(step)
args = self.extract_args(step)
return tool.execute(args)2.2 Plan-and-Execute的优势
| 优势 | 说明 |
|---|---|
| 全局视角 | 规划阶段可看到完整任务结构 |
| 效率优化 | 可跳过不必要的探索步骤 |
| 可解释性强 | 规划本身即是执行计划文档 |
| 错误恢复 | 可基于部分结果调整后续计划 |
2.3 Plan-and-Execute vs ReAct
| 维度 | Plan-and-Execute | ReAct |
|---|---|---|
| 推理模式 | 先规划后执行 | 边推理边执行 |
| _token开销 | 规划开销较高 | 单步开销较低 |
| 适应性 | 动态调整能力较弱 | 适应性强 |
| 适合场景 | 复杂多步任务 | 探索性任务 |
| 错误处理 | 需显式重规划 | 每步可自我纠正 |
2.4 混合策略:Replan-on-Drift
class AdaptivePlanExecuteAgent:
"""自适应Plan-and-Execute:偏离计划时重规划"""
def __init__(self, llm, tools, drift_threshold=0.3):
self.llm = llm
self.tools = tools
self.drift_threshold = drift_threshold
def run(self, query):
plan = self.create_plan(query)
for i, step in enumerate(plan.steps):
result = self.execute_step(step)
# 计算结果与预期的偏离度
drift = self.calculate_drift(result, step.expected)
if drift > self.drift_threshold:
# 偏离过大,重规划后续步骤
remaining_steps = plan.steps[i:]
new_plan = self.replan(
original_query=query,
completed_steps=plan.steps[:i],
current_result=result
)
plan.steps = plan.steps[:i] + new_plan.steps
plan.history.append({"step": step, "result": result})
return plan.final_result3. Reflexion自我反思机制
3.1 核心思想
Reflexion由Shinn等人于2023年提出,核心创新是引入语言化反馈机制:
- Actor(执行器):基于当前状态生成动作
- Evaluator(评估器):评估动作质量
- Self-Reflection(自反思):生成语言化反馈指导后续行动
3.2 三组件架构
class ReflexionAgent:
"""Reflexion Agent三组件架构"""
def __init__(self, llm, max_reflections=3):
self.llm = llm
self.max_reflections = max_reflections
# 三个核心组件
self.actor = Actor(llm) # 执行器
self.evaluator = Evaluator(llm) # 评估器
self.reflexor = Reflexor(llm) # 反思器
def run(self, task, max_steps=10):
memory = [] # 长期记忆(反思经验)
trajectory = [] # 当前任务轨迹
for step in range(max_steps):
# 1. 执行器生成动作
context = self.build_context(task, memory, trajectory)
action = self.actor.generate(context)
trajectory.append(action)
# 2. 评估器评估动作
evaluation = self.evaluator.evaluate(action, task)
if evaluation.success:
return action.result
# 3. 自反思生成反馈
reflection = self.reflexor.reflect(
trajectory=trajectory,
evaluation=evaluation,
task=task
)
# 4. 更新记忆
memory.append(reflection)
# 5. 基于反思调整后续动作
if step < max_steps - 1:
trajectory[-1] = self.actor.refine(
action=action,
reflection=reflection
)
return trajectory[-1].result
def build_context(self, task, memory, trajectory):
"""构建包含记忆和轨迹的上下文"""
return {
"task": task,
"memory": self.summarize(memory), # 可选的记忆压缩
"trajectory": trajectory,
"lessons": self.extract_lessons(memory) # 从历史反思中提取教训
}3.3 评估器设计
评估器可以是:
基于规则的评估器
class RuleBasedEvaluator:
"""基于规则的评估器"""
def evaluate(self, action, task):
criteria = {
"relevance": self.check_relevance(action, task),
"feasibility": self.check_feasibility(action),
"correctness": self.check_correctness(action)
}
# 加权评分
score = sum(w * v for w, v in zip(
[0.3, 0.3, 0.4],
list(criteria.values())
))
return Evaluation(
success=score > 0.7,
score=score,
feedback=self.generate_feedback(criteria)
)基于LLM的评估器
class LLM Evaluator:
"""基于LLM的评估器"""
def __init__(self, llm):
self.llm = llm
def evaluate(self, action, task):
prompt = f"""
任务:{task}
执行动作:{action.description}
执行结果:{action.result}
请评估:
1. 这个动作是否正确解决了任务?
2. 如果不正确,哪里出了问题?
3. 如何改进?
输出格式:
{{
"correct": true/false,
"score": 0-1,
"issues": ["问题1", "问题2"],
"suggestions": ["建议1", "建议2"]
}}
"""
response = self.llm.generate_json(prompt)
return Evaluation.from_dict(response)3.4 反思器设计
class Reflexor:
"""自反思生成器"""
def __init__(self, llm):
self.llm = llm
def reflect(self, trajectory, evaluation, task):
prompt = f"""
任务目标:{task}
执行轨迹:
{self.format_trajectory(trajectory)}
最终评估:{evaluation.feedback}
评分:{evaluation.score}
请基于以上信息,生成深刻的反思:
1. 分析导致当前结果的关键因素
2. 从成功和失败中提取可泛化的教训
3. 提出具体的改进方向
反思应该简洁但有洞察力,为后续尝试提供指导。
"""
reflection_text = self.llm.generate(prompt)
return Reflection(
trajectory=trajectory,
evaluation=evaluation,
insight=reflection_text,
lessons=self.extract_lessons(reflection_text)
)4. Supervisor/Hierarchical架构
4.1 层次化Agent动机
复杂任务通常具有天然的层次结构:
- 高层决策:制定战略、分解任务、分配资源
- 中层协调:管理子任务间的依赖和冲突
- 底层执行:完成具体操作
层次化架构通过分离这些关注点,提高系统的可扩展性和可维护性。
4.2 层级架构设计
class HierarchicalAgent:
"""层次化Agent架构"""
def __init__(self, llm, sub_agents):
self.llm = llm
self.sub_agents = sub_agents # 下层Agent
def run(self, task):
# Level 1: 任务分解与战略规划
plan = self.decompose_task(task)
# Level 2: 子任务分配与协调
results = self.coordinate_subtasks(plan)
# Level 3: 结果整合与验证
final_result = self.integrate_results(results)
return final_result
def decompose_task(self, task):
"""高层:任务分解"""
prompt = f"""
任务:{task}
请将任务分解为可并行执行的子任务。
每个子任务应该:
- 有明确的输入输出
- 可以独立执行
- 有清晰的完成标准
输出格式:
{{
"subtasks": [
{{"id": 1, "description": "...", "dependencies": []}},
{{"id": 2, "description": "...", "dependencies": [1]}}
],
"coordination_notes": "子任务间的协调要点"
}}
"""
return Plan.parse(self.llm.generate_json(prompt))
def coordinate_subtasks(self, plan):
"""中层:子任务协调"""
# 构建任务依赖图
dag = self.build_dag(plan.subtasks)
# 按拓扑顺序执行
results = {}
for batch in dag.topological_batches():
# 批次内可并行
batch_results = parallel_execute(
[self.sub_agents[task.agent_id] for task in batch],
[task for task in batch]
)
results.update(batch_results)
return results4.3 Supervisor模式
Supervisor模式中,一个主Agent负责任务分配和结果整合:
class SupervisorAgent:
"""Supervisor模式:主Agent协调多个专家Agent"""
def __init__(self, llm, experts):
"""
Args:
experts: Dict[str, ExpertAgent],专家Agent字典
"""
self.llm = llm
self.experts = experts
def run(self, task):
# 1. 判断任务类型,选择合适的专家
expert_selection = self.select_expert(task)
# 2. 构建专家调用指令
instruction = self.build_instruction(task, expert_selection)
# 3. 调用专家执行
result = self.experts[expert_selection.name].execute(instruction)
# 4. 验证和整合结果
if expert_selection.needs_verification:
result = self.verify_and_refine(result, task)
return result
def select_expert(self, task):
"""选择最合适的专家Agent"""
prompt = f"""
任务:{task}
可用专家:
{self.format_experts()}
请选择最适合完成此任务的专家,并说明理由。
"""
response = self.llm.generate_json(prompt)
return ExpertSelection(**response)4.4 层级间的信息传递
def build_hierarchical_context(high_level_plan, mid_level_status, low_level_results):
"""构建层级化的上下文信息"""
return {
# 高层信息:战略目标和约束
"strategy": {
"goal": high_level_plan.goal,
"constraints": high_level_plan.constraints,
"success_criteria": high_level_plan.criteria
},
# 中层信息:当前进度和资源状态
"progress": {
"completed": mid_level_status.completed_steps,
"in_progress": mid_level_status.active_steps,
"resources": mid_level_status.available_resources
},
# 底层信息:具体的执行结果
"results": {
"outputs": [r.output for r in low_level_results],
"metrics": [r.metrics for r in low_level_results],
"errors": [r.error for r in low_level_results if r.error]
},
# 跨层级的关键洞察
"cross_level_insights": extract_cross_level_insights(
high_level_plan, mid_level_status, low_level_results
)
}5. Tool-Augmented Agents设计
5.1 工具接口设计原则
Tool-Augmented Agents的核心挑战是设计可靠的工具接口:
| 原则 | 说明 | 示例 |
|---|---|---|
| 自包含 | 工具应包含所有必要的上下文 | SQL查询需包含数据库连接信息 |
| 幂等性 | 重复执行应产生一致结果 | GET请求是幂等的 |
| 可验证 | 结果应可被验证 | 返回结果应包含状态码或验证信息 |
| 可组合 | 工具输出可作为其他工具输入 | 返回结构化数据而非纯文本 |
5.2 工具描述格式
class Tool:
"""工具接口基类"""
def __init__(self, name, description, parameters):
self.name = name
self.description = description
self.parameters = parameters # JSON Schema格式
def to_langchain_format(self):
"""转换为LangChain格式"""
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
def to_openai_format(self):
"""转换为OpenAI Function Calling格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
}
# 示例:搜索工具定义
search_tool = Tool(
name="web_search",
description="在互联网上搜索信息,返回最相关的网页摘要。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询词"
},
"max_results": {
"type": "integer",
"description": "最大返回结果数",
"default": 5
}
},
"required": ["query"]
}
)5.3 工具选择策略
单一工具选择
def select_tool(llm, query, available_tools):
"""基于LLM的工具选择"""
prompt = f"""
用户查询:{query}
可用工具:
{format_tools(available_tools)}
请选择最合适的工具并给出调用参数。
如果不需要工具,请说明。
输出格式:
{{
"tool_name": "工具名或null",
"arguments": {{参数}},
"reasoning": "选择理由"
}}
"""
return llm.generate_json(prompt)工具链编排
class ToolChainOrchestrator:
"""工具链编排器"""
def __init__(self, llm, tools):
self.llm = llm
self.tool_registry = {t.name: t for t in tools}
def plan_tool_chain(self, goal):
"""规划工具调用链"""
prompt = f"""
目标:{goal}
可用工具:
{format_tools(self.tool_registry.values())}
如果目标需要多个工具配合,请规划调用顺序。
每个后续调用可以利用前一个调用的结果。
输出格式:
{{
"chain": [
{{"tool": "工具1", "args": {{...}}, "depends_on": null}},
{{"tool": "工具2", "args": {{...}}, "depends_on": "工具1"}}
]
}}
"""
plan = self.llm.generate_json(prompt)
return self.execute_chain(plan.chain)
def execute_chain(self, chain_plan):
"""执行工具链"""
results = {}
for step in chain_plan:
# 解析依赖
if step.depends_on:
deps = {k: results[k] for k in step.depends_on.split(',')}
args = self.resolve_args(step.args, deps)
else:
args = step.args
# 执行工具
tool = self.tool_registry[step.tool]
result = tool.execute(**args)
results[step.tool] = result
return results5.4 工具执行与错误处理
class RobustToolExecutor:
"""带错误处理和重试的工具执行器"""
def __init__(self, max_retries=3, backoff_factor=2):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
def execute_with_retry(self, tool, args, context=None):
"""带重试的工具执行"""
last_error = None
for attempt in range(self.max_retries):
try:
result = tool.execute(**args)
# 验证结果质量
if self.validate_result(result):
return result
else:
# 结果质量不达标,尝试修正
args = self.refine_args(args, result, context)
except ToolExecutionError as e:
last_error = e
# 指数退避
time.sleep(self.backoff_factor ** attempt)
# 调整参数重试
args = self.adapt_args_for_retry(args, e)
# 所有重试都失败
return ToolResult(
success=False,
error=str(last_error),
fallback=self.generate_fallback(tool, args, context)
)
def validate_result(self, result):
"""验证工具执行结果"""
# 基本验证
if result is None:
return False
# 格式验证
if hasattr(result, 'is_valid') and not result.is_valid:
return False
# 内容验证
if self.is_meaningful(result):
return True
return False6. Memory系统设计
6.1 记忆类型
LLM Agent的记忆系统通常包含多个层次:
| 层次 | 容量 | 持久性 | 访问频率 | 内容 |
|---|---|---|---|---|
| 感知记忆 | 短 | 当前交互 | 极高 | 原始感知输入 |
| 工作记忆 | 中 | 会话内 | 高 | 当前任务相关上下文 |
| 情景记忆 | 大 | 会话间 | 中 | 历史交互经验 |
| 语义记忆 | 极大 | 永久 | 低 | 领域知识和常识 |
6.2 记忆存储架构
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
class MemoryType(Enum):
EPISODIC = "episodic" # 情景记忆:具体事件
SEMANTIC = "semantic" # 语义记忆:抽象知识
PROCEDURAL = "procedural" # 程序记忆:技能和流程
@dataclass
class Memory:
"""记忆单元"""
content: str
memory_type: MemoryType
importance: float = 0.5 # 重要性评分 0-1
created_at: float = None
access_count: int = 0
last_accessed: float = None
embedding: List[float] = None # 用于向量检索
class MemorySystem:
"""多层次记忆系统"""
def __init__(self, vector_store=None, llm=None):
# 各层次记忆存储
self.episodic_store = [] # 情景记忆
self.semantic_store = {} # 语义记忆 (key-value)
self.procedural_store = {} # 程序记忆
# 向量数据库(用于语义检索)
self.vector_store = vector_store
# LLM(用于记忆压缩和摘要)
self.llm = llm
def add_memory(self, content: str, memory_type: MemoryType,
importance: float = 0.5, metadata: Dict = None):
"""添加新记忆"""
memory = Memory(
content=content,
memory_type=memory_type,
importance=importance,
created_at=time.time(),
metadata=metadata or {}
)
# 生成向量嵌入
if self.vector_store:
memory.embedding = self.embed(content)
# 存储
if memory_type == MemoryType.EPISODIC:
self.episodic_store.append(memory)
elif memory_type == MemoryType.SEMANTIC:
self.semantic_store[metadata.get('key', str(len(self.semantic_store)))] = memory
elif memory_type == MemoryType.PROCEDURAL:
self.procedural_store[metadata.get('procedure_id', str(uuid.uuid4()))] = memory
return memory
def retrieve(self, query: str, memory_type: MemoryType = None,
top_k: int = 5, threshold: float = 0.7) -> List[Memory]:
"""检索相关记忆"""
results = []
# 确定检索范围
stores = [self.episodic_store]
if memory_type == MemoryType.EPISODIC or memory_type is None:
stores.append(self.episodic_store)
if memory_type == MemoryType.SEMANTIC or memory_type is None:
stores.extend([self.semantic_store.values()])
if memory_type == MemoryType.PROCEDURAL or memory_type is None:
stores.extend([self.procedural_store.values()])
# 语义检索
if self.vector_store and self.llm:
query_embedding = self.embed(query)
for store in stores:
for memory in store:
if memory.embedding:
similarity = cosine_similarity(query_embedding, memory.embedding)
if similarity >= threshold:
results.append((similarity, memory))
# 关键词匹配(备选)
else:
query_keywords = set(query.lower().split())
for store in stores:
for memory in store:
memory_keywords = set(memory.content.lower().split())
overlap = len(query_keywords & memory_keywords)
if overlap > 0:
results.append((overlap / len(query_keywords), memory))
# 排序并返回top_k
results.sort(key=lambda x: x[0], reverse=True)
return [memory for _, memory in results[:top_k]]6.3 记忆压缩与摘要
class MemoryCompressor:
"""记忆压缩器:减少记忆占用同时保留关键信息"""
def __init__(self, llm):
self.llm = llm
def compress_episodic(self, memories: List[Memory]) -> List[Memory]:
"""压缩情景记忆"""
if len(memories) <= 5:
return memories
# 按时间分组
groups = self.group_by_time(memories, window=300) # 5分钟窗口
compressed = []
for group in groups:
if len(group) == 1:
compressed.append(group[0])
else:
# 生成组内摘要
summary = self.summarize_group(group)
compressed.append(Memory(
content=summary,
memory_type=MemoryType.EPISODIC,
importance=self.calculate_importance(group),
created_at=group[0].created_at,
metadata={"compressed": True, "original_count": len(group)}
))
return compressed
def summarize_group(self, memories: List[Memory]) -> str:
"""生成记忆组摘要"""
prompt = f"""
以下是一组相关的记忆片段,请生成简洁的摘要:
片段列表:
{chr(10).join([f"{i+1}. {m.content}" for i, m in enumerate(memories)])}
摘要应该:
1. 保留关键信息和决策
2. 移除冗余细节
3. 标注时间跨度(如果相关)
"""
return self.llm.generate(prompt)
def extract_lessons(self, memories: List[Memory]) -> str:
"""从记忆序列中提取教训和模式"""
prompt = f"""
从以下记忆序列中,提取可泛化的教训和模式:
{chr(10).join([m.content for m in memories])}
请分析:
1. 成功的模式是什么?
2. 失败的教训是什么?
3. 有什么可以改进的地方?
"""
return self.llm.generate(prompt)6.4 上下文窗口管理
class ContextWindowManager:
"""上下文窗口管理器:优化LLM输入"""
def __init__(self, max_tokens: int = 128000):
self.max_tokens = max_tokens
self.reserved_tokens = 2000 # 保留给系统提示和输出
def build_context(self, query: str, memories: List[Memory],
system_prompt: str = "") -> str:
"""构建最优的上下文"""
available_tokens = self.max_tokens - self.reserved_tokens
available_tokens -= self.count_tokens(system_prompt)
available_tokens -= self.count_tokens(query)
# 优先策略:按重要性和相关性排序
prioritized = self.prioritize(memories, query)
# 构建上下文
context_parts = []
current_tokens = 0
for memory in prioritized:
memory_tokens = self.count_tokens(memory.content)
if current_tokens + memory_tokens > available_tokens:
# 压缩剩余记忆
context_parts.append(self.summarize_for_context(memory))
break
context_parts.append(memory.content)
current_tokens += memory_tokens
return "\n\n".join(context_parts)
def prioritize(self, memories: List[Memory], query: str) -> List[Memory]:
"""记忆优先级排序"""
scored = []
for memory in memories:
# 综合评分:重要性 × 相关性 × 新近度
recency = math.exp(-0.1 * (time.time() - memory.created_at) / 86400)
relevance = self.calculate_relevance(memory, query)
score = (memory.importance * 0.4 +
relevance * 0.4 +
recency * 0.2)
scored.append((score, memory))
return [m for _, m in sorted(scored, reverse=True)]7. 多Agent协作模式
7.1 协作架构分类
| 模式 | 描述 | 适用场景 |
|---|---|---|
| 层级式 | 单一Supervisor分配任务 | 任务明确、流程固定 |
| 平等协作 | Agent间自由通信协作 | 复杂探索、创意任务 |
| 竞争式 | 多Agent提出方案竞争 | 方案选择、投票决策 |
| 管道式 | Agent串联处理数据 | 数据处理流水线 |
7.2 多Agent通信协议
from enum import Enum
from dataclasses import dataclass
from typing import Any, Dict, Optional
import json
class MessageType(Enum):
REQUEST = "request" # 请求
RESPONSE = "response" # 响应
QUERY = "query" # 查询
BROADCAST = "broadcast" # 广播
PROPOSE = "propose" # 提议
@dataclass
class Message:
"""Agent间通信消息"""
sender: str
receiver: str # "all" 表示广播
type: MessageType
content: Any
conversation_id: str
metadata: Dict = None
def to_json(self) -> str:
return json.dumps({
"sender": self.sender,
"receiver": self.receiver,
"type": self.type.value,
"content": self.content,
"conversation_id": self.conversation_id,
"metadata": self.metadata or {}
})
@classmethod
def from_json(cls, json_str: str) -> 'Message':
data = json.loads(json_str)
return cls(
sender=data["sender"],
receiver=data["receiver"],
type=MessageType(data["type"]),
content=data["content"],
conversation_id=data["conversation_id"],
metadata=data.get("metadata")
)
class AgentMessageBus:
"""Agent消息总线"""
def __init__(self):
self.agents = {} # agent_id -> Agent
self.message_queue = {} # agent_id -> List[Message]
self.topics = {} # topic -> List[subscribers]
def register(self, agent_id: str, agent):
"""注册Agent"""
self.agents[agent_id] = agent
self.message_queue[agent_id] = []
def send(self, message: Message):
"""发送消息"""
if message.receiver == "all":
# 广播
for agent_id in self.agents:
if agent_id != message.sender:
self.message_queue[agent_id].append(message)
else:
# 单播
if message.receiver in self.message_queue:
self.message_queue[message.receiver].append(message)
def receive(self, agent_id: str) -> List[Message]:
"""接收消息"""
messages = self.message_queue.get(agent_id, [])
self.message_queue[agent_id] = []
return messages
def subscribe(self, agent_id: str, topic: str):
"""订阅主题"""
if topic not in self.topics:
self.topics[topic] = []
if agent_id not in self.topics[topic]:
self.topics[topic].append(agent_id)
def publish(self, topic: str, content: Any, sender: str):
"""发布到主题"""
if topic in self.topics:
for agent_id in self.topics[topic]:
if agent_id != sender:
self.send(Message(
sender=sender,
receiver=agent_id,
type=MessageType.BROADCAST,
content={"topic": topic, "data": content},
conversation_id=topic
))7.3 协商式多Agent协作
class NegotiatingMultiAgent:
"""协商式多Agent系统"""
def __init__(self, agents: List[BaseAgent], consensus_threshold: float = 0.7):
self.agents = {a.id: a for a in agents}
self.consensus_threshold = consensus_threshold
self.message_bus = AgentMessageBus()
for agent in agents:
self.message_bus.register(agent.id, agent)
def solve_collaboratively(self, task: str) -> str:
"""协作解决问题"""
conversation_id = str(uuid.uuid4())
# 广播任务
for agent_id in self.agents:
self.message_bus.send(Message(
sender="coordinator",
receiver=agent_id,
type=MessageType.REQUEST,
content={"task": task, "conversation_id": conversation_id},
conversation_id=conversation_id
))
# 收集提案
proposals = []
for _ in range(len(self.agents)):
for agent_id, agent in self.agents.items():
messages = self.message_bus.receive(agent_id)
for msg in messages:
if msg.type == MessageType.PROPOSE:
proposals.append({
"agent": agent_id,
"proposal": msg.content
})
# 协商迭代
for iteration in range(5): # 最多5轮协商
# 评估提案
evaluations = self.evaluate_proposals(proposals)
# 检查是否达成共识
if self.check_consensus(evaluations):
best_proposal = self.select_best_proposal(proposals, evaluations)
return self.execute_proposal(best_proposal)
# 反馈给Agent进行改进
self.provide_feedback(proposals, evaluations)
# 等待更新提案
time.sleep(0.1) # 等待处理
# 未能达成共识,选择最佳提案
return self.select_best_proposal(proposals, evaluations)
def evaluate_proposals(self, proposals: List[Dict]) -> Dict[str, float]:
"""评估提案质量"""
evaluations = {}
for proposal in proposals:
agent_id = proposal["agent"]
# 获取Agent的自我评估
self.message_bus.send(Message(
sender="evaluator",
receiver=agent_id,
type=MessageType.QUERY,
content={"proposal": proposal["proposal"]},
conversation_id=proposal["conversation_id"]
))
messages = self.message_bus.receive(agent_id)
for msg in messages:
if msg.type == MessageType.RESPONSE:
evaluations[agent_id] = msg.content.get("score", 0.5)
return evaluations8. 架构选择指南
8.1 决策框架
任务复杂度
│
├─ 低 ──────────────────────────────────► ReAct
│
├─ 中 ──┬─ 需要全局规划 ─────────────────► Plan-and-Execute
│ │
│ └─ 需要自我改进 ─────────────────► Reflexion
│
└─ 高 ──┬─ 任务有明确层次 ────────────────► Hierarchical
│
├─ 需要多工具协调 ───────────────► Tool-Augmented
│
└─ 需要多专家协作 ────────────────► Multi-Agent
8.2 场景-架构映射
| 场景 | 推荐架构 | 关键配置 |
|---|---|---|
| 简单问答 | ReAct (单一工具) | max_iterations=3 |
| 数据分析 | Plan-and-Execute | 工具链规划 |
| 代码生成 | Reflexion | 编译器反馈循环 |
| 复杂研究 | Hierarchical | 3层架构 |
| 团队协作 | Multi-Agent | 3-5个专家Agent |
| 实时交互 | Reactive | 最小化推理开销 |
8.3 混合架构示例
class HybridAgent:
"""混合架构:结合多种模式"""
def __init__(self, llm, config):
self.config = config
# 根据任务类型选择策略
self.strategies = {
"simple": ReActStrategy(llm, config),
"complex": PlanAndExecuteStrategy(llm, config),
"exploratory": ReflexionStrategy(llm, config)
}
def run(self, task):
# 自动选择策略
task_type = self.classify_task(task)
strategy = self.strategies[task_type]
return strategy.execute(task)
def classify_task(self, task):
"""任务分类"""
prompt = f"""
任务:{task}
请判断任务类型:
- simple: 单一目标,可直接完成
- complex: 多步骤,需要规划
- exploratory: 探索性,需要迭代改进
输出:simple/complex/exploratory
"""
return self.llm.generate(prompt).strip().lower()9. 最佳实践与常见陷阱
9.1 最佳实践
1. 渐进式复杂化
# ✅ 好的做法:从简单开始,逐步增加复杂度
def build_agent(task_requirements):
# 基础Agent
agent = ReActAgent()
if task_requirements.need_planning:
agent = add_planning_module(agent)
if task_requirements.need_memory:
agent = add_memory_system(agent)
if task_requirements.multi_agent:
agent = convert_to_multi_agent(agent)
return agent
# ❌ 避免:从一开始就构建完整复杂系统2. 清晰的工具接口
# ✅ 好的工具接口
def search_web(query: str, max_results: int = 5) -> dict:
"""
搜索网页内容
Args:
query: 搜索关键词
max_results: 最大结果数
Returns:
{
"results": [{"title": "", "url": "", "snippet": ""}],
"total": int,
"query": str
}
"""
pass
# ❌ 避免:模糊的接口描述
def search(query):
"""搜索一下"""
pass3. 优雅的错误处理
def robust_execute(tool, args, max_retries=3):
for attempt in range(max_retries):
try:
result = tool.execute(**args)
if is_valid_result(result):
return result
except RetryableError as e:
logger.warning(f"Attempt {attempt+1} failed: {e}")
args = adapt_for_retry(args, e)
except FatalError as e:
logger.error(f"Fatal error: {e}")
return generate_fallback_result(args, reason=str(e))
return generate_fallback_result(args, reason="max_retries_exceeded")9.2 常见陷阱
| 陷阱 | 表现 | 解决方案 |
|---|---|---|
| 无限循环 | Agent反复执行相同动作 | 设置最大迭代次数、记录已尝试动作 |
| 工具依赖 | Agent过度依赖工具调用 | 平衡推理与工具使用 |
| 上下文溢出 | 历史记录过长导致丢失关键信息 | 实施记忆压缩和摘要 |
| 过早终止 | Agent过早认为任务完成 | 添加验证步骤 |
| 幻觉规划 | 生成不切实际的计划 | 基于可用资源约束规划 |
10. 总结
LLM Agent架构设计是一个不断演进的领域。本文档梳理的核心模式代表了当前的主流实践:
- ReAct:将推理显式化,适合工具使用场景
- Plan-and-Execute:先规划后执行,适合复杂多步任务
- Reflexion:引入自我反思机制,提高长期任务表现
- Hierarchical:分离关注点,适合大规模复杂系统
- Tool-Augmented:扩展LLM能力边界
- Memory-augmented:解决上下文限制
- Multi-Agent:实现复杂协作与分工
在实际应用中,这些模式通常需要组合使用,并根据具体任务特点进行调整。