概述
本文详细介绍Agentic RAG系统的架构设计与核心组件实现,涵盖检索智能体、规划模块、记忆系统、工具集成以及生产级部署最佳实践。
1. 系统架构概览
1.1 整体架构
┌─────────────────────────────────────────────────────────────────────────┐
│ Agentic RAG 架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 用户交互层 │ │
│ │ (Query Input, Response Display, Feedback) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 协调器 (Coordinator) │ │
│ │ - 任务分解 - 流程控制 - 结果整合 - 异常处理 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ ↓ ↑ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌────────────┐ │
│ │ 检索Agent │ ←────→ │ 生成Agent │ ←────→ │ 工具Agent │ │
│ │ (Retrieval) │ │ (Generation) │ │ (Tools) │ │
│ └─────────────────┘ └─────────────────┘ └────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 记忆系统 (Memory) │ │
│ │ 短期记忆 长期记忆 工作记忆 经验记忆 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 数据层 (Data Layer) │ │
│ │ 向量存储 知识图谱 结构化DB 缓存层 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
1.2 核心组件职责
| 组件 | 职责 | 关键能力 |
|---|---|---|
| 协调器 | 全局调度与决策 | 任务分解、流程控制、异常恢复 |
| 检索Agent | 多源信息获取 | 向量检索、图谱检索、混合检索 |
| 生成Agent | 答案构建与验证 | 草稿生成、答案修订、质量评估 |
| 工具Agent | 扩展系统能力 | 搜索引擎、计算器、API调用 |
| 记忆系统 | 状态与经验管理 | 上下文维护、经验存储 |
| 数据层 | 知识存储与索引 | 索引管理、查询优化 |
2. 检索智能体设计
2.1 多策略检索Agent
class RetrievalAgent:
"""
多策略检索Agent
支持向量检索、关键词检索、知识图谱检索等多种策略
"""
def __init__(self, vector_db, kg_store, config=None):
self.vector_db = vector_db
self.kg_store = kg_store
self.config = config or self._default_config()
# 检索策略权重
self.strategy_weights = {
"dense": 0.4,
"sparse": 0.2,
"graph": 0.3,
"keyword": 0.1
}
def search(self, query, strategy="adaptive", top_k=10, context=None):
"""
自适应检索主方法
Args:
query: 检索查询
strategy: 检索策略 ("adaptive", "dense", "hybrid", "graph")
top_k: 返回结果数量
context: 额外的上下文信息
"""
if strategy == "adaptive":
strategy = self._select_strategy(query, context)
if strategy == "dense":
return self._dense_search(query, top_k)
elif strategy == "sparse":
return self._sparse_search(query, top_k)
elif strategy == "hybrid":
return self._hybrid_search(query, top_k)
elif strategy == "graph":
return self._graph_search(query, top_k, context)
else:
return self._adaptive_search(query, top_k, context)
def _select_strategy(self, query, context):
"""
根据查询特征选择最佳检索策略
"""
# 分析查询特征
query_features = {
"has_entity": self._contains_entities(query),
"has_relation": self._contains_relations(query),
"is_comparative": self._is_comparative(query),
"is_temporal": self._is_temporal(query),
"complexity": self._estimate_complexity(query)
}
# 基于特征的策略选择
if query_features["has_entity"] and query_features["has_relation"]:
return "graph" # 关系查询使用图谱检索
elif query_features["is_comparative"]:
return "hybrid" # 比较查询使用混合检索
elif query_features["complexity"] > 0.7:
return "adaptive" # 复杂查询使用自适应
else:
return "dense" # 默认使用向量检索
def _dense_search(self, query, top_k):
"""向量检索"""
query_embedding = self._embed_query(query)
results = self.vector_db.search(
query_embedding,
top_k=top_k,
filters=self.config.get("filters")
)
return [self._format_result(r, "dense") for r in results]
def _sparse_search(self, query, top_k):
"""稀疏检索(BM25)"""
results = self.vector_db.sparse_search(
query,
top_k=top_k
)
return [self._format_result(r, "sparse") for r in results]
def _hybrid_search(self, query, top_k):
"""混合检索"""
# 并行执行向量和稀疏检索
dense_results = self._dense_search(query, top_k * 2)
sparse_results = self._sparse_search(query, top_k * 2)
# RRF融合
fused = self._reciprocal_rank_fusion(
dense_results,
sparse_results,
k=60
)
return fused[:top_k]
def _graph_search(self, query, top_k, context=None):
"""知识图谱检索"""
# 识别查询中的实体
entities = self._extract_entities(query)
if not entities:
# 如果没有识别到实体,回退到向量检索
return self._dense_search(query, top_k)
# 在图谱中检索实体及其邻域
graph_results = []
for entity in entities:
# 获取实体信息
node_info = self.kg_store.get_entity(entity)
if node_info:
graph_results.append(node_info)
# 获取1-2跳邻居
neighbors = self.kg_store.get_neighbors(
entity,
depth=2
)
graph_results.extend(neighbors)
# 去重和排序
unique_results = self._deduplicate(graph_results)
return [self._format_result(r, "graph") for r in unique_results[:top_k]]
def _reciprocal_rank_fusion(self, results_list, k=60):
"""
倒数排名融合(RRF)
RRF公式:RRF(d) = Σ 1/(k + rank(d))
"""
from collections import defaultdict
scores = defaultdict(float)
doc_scores = defaultdict(list)
for results in results_list:
for rank, result in enumerate(results, 1):
doc_id = result["doc_id"]
score = 1 / (k + rank)
scores[doc_id] += score
doc_scores[doc_id].append(score)
# 合并结果
fused = []
for doc_id in scores:
result = self._get_full_result(doc_id)
result["fused_score"] = scores[doc_id]
result["sources"] = len(doc_scores[doc_id])
fused.append(result)
# 排序
fused.sort(key=lambda x: x["fused_score"], reverse=True)
return fused2.2 迭代式检索增强
class IterativeRetrieval:
"""
迭代式检索:支持多轮检索-评估-补充
"""
def __init__(self, retrieval_agent, evaluator):
self.retrieval_agent = retrieval_agent
self.evaluator = evaluator
def iterative_search(self, query, max_iterations=3, target_recall=0.8):
"""
迭代式检索直到达到目标召回率
Args:
query: 查询
max_iterations: 最大迭代次数
target_recall: 目标召回率
"""
all_results = []
current_context = {}
for iteration in range(max_iterations):
# 执行检索
if iteration == 0:
results = self.retrieval_agent.search(query, top_k=10)
else:
# 基于前一轮结果生成补充查询
sub_queries = self._generate_sub_queries(
query, all_results, current_context
)
results = []
for sq in sub_queries:
sq_results = self.retrieval_agent.search(
sq,
top_k=5,
context=current_context
)
results.extend(sq_results)
# 评估召回率
recall = self.evaluator.estimate_recall(
query, results, all_results
)
# 添加新结果
new_results = self._filter_new_results(results, all_results)
all_results.extend(new_results)
# 更新上下文
current_context = self._update_context(
query, all_results, current_context
)
# 检查是否达到目标
if recall >= target_recall:
break
return all_results
def _generate_sub_queries(self, original_query, previous_results, context):
"""
基于当前上下文生成补充查询
"""
# 分析已检索内容中的知识缺口
gap_analysis = self.evaluator.analyze_gaps(
original_query,
previous_results,
context
)
# 生成针对性查询
sub_queries = []
for gap in gap_analysis.get("gaps", []):
if gap["type"] == "entity_detail":
# 实体细节查询
sub_queries.append(f"关于 {gap['entity']} 的详细信息")
elif gap["type"] == "relation":
# 关系查询
sub_queries.append(f"{gap['entity1']} 和 {gap['entity2']} 的关系")
elif gap["type"] == "background":
# 背景信息查询
sub_queries.append(f"{gap['topic']} 的背景和基础知识")
return sub_queries if sub_queries else [original_query]3. 规划模块实现
3.1 任务规划器
class TaskPlanner:
"""
任务规划器:分解复杂任务并制定执行计划
"""
def __init__(self, llm):
self.llm = llm
self.task_templates = self._load_templates()
def plan(self, query, available_tools, constraints):
"""
为查询制定执行计划
Args:
query: 用户查询
available_tools: 可用工具列表
constraints: 约束条件(时间、成本等)
Returns:
执行计划
"""
# 分析查询
query_analysis = self._analyze_query(query)
# 确定任务类型
task_type = self._classify_task(query_analysis)
# 选择任务模板
template = self._select_template(task_type, query_analysis)
# 实例化计划
plan = self._instantiate_plan(
template,
query_analysis,
available_tools,
constraints
)
# 优化计划
optimized_plan = self._optimize_plan(plan, constraints)
return {
"task_type": task_type,
"analysis": query_analysis,
"plan": optimized_plan,
"estimated_cost": self._estimate_cost(optimized_plan),
"expected_quality": self._estimate_quality(optimized_plan)
}
def _analyze_query(self, query):
"""深度分析查询"""
analysis_prompt = f"""
详细分析以下查询的各个维度:
查询:{query}
分析维度:
1. 意图类型:事实查询、解释性、比较性、分析性、生成性
2. 所需信息类型:具体事实、概念解释、过程描述、数据分析
3. 复杂度:简单/中等/复杂/多跳
4. 时间敏感性:最新信息/历史信息/无要求
5. 准确性要求:高/中/低
6. 潜在知识缺口:查询中隐含但未明确的信息需求
输出JSON格式的完整分析。
"""
response = self.llm.generate(analysis_prompt)
return parse_json(response)
def _classify_task(self, analysis):
"""分类任务类型"""
intent = analysis.get("intent_type", "unknown")
complexity = analysis.get("complexity", "simple")
if complexity == "simple" and intent in ["fact", "explanatory"]:
return "simple_retrieval"
elif complexity == "complex" or intent == "analytical":
return "complex_analysis"
elif intent == "generative":
return "creative_generation"
elif complexity == "multi_hop":
return "multi_hop_reasoning"
else:
return "general_qa"
def _select_template(self, task_type, analysis):
"""选择任务模板"""
# 根据任务类型选择预定义模板
templates = {
"simple_retrieval": {
"steps": [
{"type": "retrieve", "target": "primary_info", "priority": 1},
{"type": "generate", "priority": 2}
],
"expected_tools": ["vector_search"]
},
"complex_analysis": {
"steps": [
{"type": "retrieve", "target": "background", "priority": 1},
{"type": "retrieve", "target": "specific", "priority": 2},
{"type": "analyze", "priority": 3},
{"type": "verify", "priority": 4},
{"type": "generate", "priority": 5}
],
"expected_tools": ["vector_search", "kg_search", "calculator"]
},
"multi_hop_reasoning": {
"steps": [
{"type": "retrieve", "target": "entity_1", "priority": 1},
{"type": "retrieve", "target": "entity_2", "priority": 1},
{"type": "analyze", "target": "relation", "priority": 2, "depends_on": [1, 2]},
{"type": "retrieve", "target": "supporting", "priority": 3, "depends_on": [2]},
{"type": "synthesize", "priority": 4, "depends_on": [3]}
],
"expected_tools": ["vector_search", "kg_search", "web_search"]
}
}
return templates.get(task_type, templates["general_qa"])
def _instantiate_plan(self, template, analysis, available_tools, constraints):
"""实例化具体执行计划"""
plan = {
"phases": [],
"estimated_steps": len(template["steps"]),
"required_capabilities": template.get("expected_tools", [])
}
for step in template["steps"]:
# 检查工具可用性
if step["type"] == "retrieve":
tool = self._select_tool(step["target"], available_tools)
else:
tool = step["type"]
# 检查依赖
depends_on = [
plan["phases"][dep - 1]["phase_id"]
for dep in step.get("depends_on", [])
]
# 估计资源需求
resource_estimate = self._estimate_step_resources(
step, analysis, constraints
)
plan["phases"].append({
"phase_id": len(plan["phases"]) + 1,
"type": step["type"],
"target": step.get("target"),
"tool": tool,
"depends_on": depends_on,
"resources": resource_estimate,
"quality_weight": step.get("priority", 5) / 10
})
return plan
def _optimize_plan(self, plan, constraints):
"""优化计划以满足约束"""
# 识别可并行化的步骤
parallel_groups = self._identify_parallel_opportunities(plan)
# 重排以优化执行顺序
optimized_phases = self._reorder_phases(plan["phases"], parallel_groups)
# 检查是否满足约束
if not self._check_constraints(optimized_phases, constraints):
# 如果不满足,简化计划
simplified = self._simplify_plan(optimized_phases, constraints)
return simplified
plan["phases"] = optimized_phases
plan["parallel_groups"] = parallel_groups
return plan4. 记忆系统实现
4.1 多层次记忆架构
class MultiLevelMemory:
"""
多层次记忆系统
- 短期记忆:当前会话上下文
- 工作记忆:当前任务处理状态
- 长期记忆:持久化经验
- 语义记忆:结构化知识
"""
def __init__(self, vector_store, config=None):
self.vector_store = vector_store
self.config = config or {}
# 内存存储
self.short_term = [] # 最近交互
self.working = {} # 当前任务状态
self.episodic = [] # 情节记忆
self.semantic = {} # 语义记忆
# 配置参数
self.max_short_term = self.config.get("max_short_term", 20)
self.max_working = self.config.get("max_working", 100)
self.consolidation_threshold = self.config.get("consolidation_threshold", 5)
def store_interaction(self, interaction):
"""存储一次交互"""
# 添加到短期记忆
self.short_term.append({
"content": interaction,
"timestamp": datetime.now(),
"access_count": 0
})
# 修剪短期记忆
if len(self.short_term) > self.max_short_term:
self._consolidate_oldest()
def store_working_state(self, task_id, state):
"""存储工作状态"""
self.working[task_id] = {
"state": state,
"updated_at": datetime.now(),
"version": self.working.get(task_id, {}).get("version", 0) + 1
}
# 修剪工作记忆
if len(self.working) > self.max_working:
self._evict_least_used_working()
def retrieve_relevant(self, query, memory_type="all", top_k=5):
"""
检索相关记忆
Args:
query: 查询
memory_type: 记忆类型 ("short", "working", "long", "semantic", "all")
top_k: 返回数量
"""
results = []
if memory_type in ["short", "all"]:
results.extend(self._search_short_term(query, top_k))
if memory_type in ["working", "all"]:
results.extend(self._search_working(query, top_k))
if memory_type in ["long", "all"]:
results.extend(self._search_long_term(query, top_k))
if memory_type in ["semantic", "all"]:
results.extend(self._search_semantic(query, top_k))
# 去重和排序
unique_results = self._deduplicate_results(results)
unique_results.sort(key=lambda x: x["relevance"], reverse=True)
return unique_results[:top_k]
def _search_short_term(self, query, top_k):
"""搜索短期记忆"""
query_embedding = self.vector_store.embed(query)
results = []
for memory in self.short_term:
# 计算相关性
memory_embedding = self.vector_store.embed(memory["content"])
similarity = cosine_similarity(query_embedding, memory_embedding)
if similarity > 0.5:
results.append({
"content": memory["content"],
"type": "short_term",
"relevance": similarity,
"timestamp": memory["timestamp"]
})
memory["access_count"] += 1
return sorted(results, key=lambda x: x["relevance"], reverse=True)[:top_k]
def _search_long_term(self, query, top_k):
"""搜索长期记忆(情节记忆)"""
# 使用向量搜索
results = self.vector_store.search(
query,
collection="long_term_memory",
top_k=top_k
)
return [
{
"content": r["content"],
"type": "long_term",
"relevance": r["score"],
"timestamp": r.get("timestamp")
}
for r in results
]
def _consolidate_oldest(self):
"""将旧的短期记忆整合到长期记忆"""
if len(self.short_term) < self.consolidation_threshold:
return
# 选择高访问或高价值的记忆
candidate = min(
self.short_term,
key=lambda x: (x["access_count"], x["timestamp"])
)
# 存储到长期记忆
self._store_long_term(candidate["content"])
# 从短期记忆移除
self.short_term.remove(candidate)
def _store_long_term(self, content):
"""存储到长期记忆"""
# 生成嵌入
embedding = self.vector_store.embed(content)
# 存储到向量数据库
self.vector_store.insert(
collection="long_term_memory",
vector=embedding,
content=content,
timestamp=datetime.now()
)
# 添加情节索引
self.episodic.append({
"content": content,
"timestamp": datetime.now()
})4.2 经验学习模块
class ExperienceLearner:
"""
经验学习:从历史交互中学习并改进
"""
def __init__(self, memory, llm):
self.memory = memory
self.llm = llm
self.learned_patterns = {}
def learn_from_interaction(self, query, plan, result, feedback=None):
"""
从一次交互中学习
Args:
query: 原始查询
plan: 使用的执行计划
result: 执行结果
feedback: 用户反馈(可选)
"""
# 提取成功的模式
success_pattern = self._extract_pattern(query, plan, result, success=True)
# 提取失败模式
if not result.get("success") or (feedback and feedback.get("rating", 5) < 3):
failure_pattern = self._extract_pattern(query, plan, result, success=False)
self._store_pattern(failure_pattern, pattern_type="failure")
# 存储成功模式
self._store_pattern(success_pattern, pattern_type="success")
# 更新检索策略
self._update_retrieval_strategy(success_pattern)
# 更新计划策略
self._update_planning_strategy(success_pattern)
def _extract_pattern(self, query, plan, result, success):
"""提取交互模式"""
extraction_prompt = f"""
从以下交互中提取可学习的模式。
查询:{query}
执行计划:{plan}
执行结果:{result}
成功标记:{success}
请提取:
1. 查询类型和特征
2. 有效的策略组合
3. 关键的成功/失败因素
4. 可泛化的规则
输出JSON格式的模式描述。
"""
response = self.llm.generate(extraction_prompt)
pattern = parse_json(response)
pattern["success"] = success
pattern["query_type"] = self._classify_query_type(query)
return pattern
def _store_pattern(self, pattern, pattern_type):
"""存储学习到的模式"""
pattern_id = hashlib.md5(
f"{pattern['query_type']}_{pattern_type}".encode()
).hexdigest()
if pattern_id not in self.learned_patterns:
self.learned_patterns[pattern_id] = {
"type": pattern_type,
"count": 0,
"patterns": []
}
self.learned_patterns[pattern_id]["count"] += 1
self.learned_patterns[pattern_id]["patterns"].append(pattern)
# 更新到记忆
self.memory._store_long_term({
"pattern_id": pattern_id,
"pattern": pattern,
"type": pattern_type
})
def get_learned_guidance(self, query):
"""
获取基于历史学习的指导
"""
query_type = self._classify_query_type(query)
# 查找相关模式
relevant_patterns = []
for pattern_id, data in self.learned_patterns.items():
if any(
p.get("query_type") == query_type
for p in data["patterns"]
):
relevant_patterns.extend(data["patterns"])
if not relevant_patterns:
return None
# 合并模式
guidance = self._merge_patterns(relevant_patterns)
return guidance5. 生产部署指南
5.1 系统配置
# 生产配置
PRODUCTION_CONFIG = {
# LLM配置
"llm": {
"provider": "openai",
"model": "gpt-4-turbo",
"temperature": 0.7,
"max_tokens": 4096,
"retry_config": {
"max_retries": 3,
"backoff_factor": 2
}
},
# 向量数据库配置
"vector_db": {
"provider": "qdrant",
"collection": "agentic_rag",
"vector_size": 1536,
"hnsw_config": {
"m": 16,
"ef_construct": 200
}
},
# 检索配置
"retrieval": {
"default_top_k": 10,
"max_context_docs": 20,
"reranker": {
"enabled": True,
"model": "cohere/rerank-multilingual-v3.0",
"top_n": 5
}
},
# Agent配置
"agents": {
"max_iterations": 5,
"early_stopping": True,
"confidence_threshold": 0.8,
"timeout_seconds": 120
},
# 记忆配置
"memory": {
"max_short_term": 50,
"max_working": 200,
"consolidation_threshold": 10,
"semantic_index_enabled": True
},
# 缓存配置
"cache": {
"enabled": True,
"ttl_seconds": 3600,
"max_entries": 10000
},
# 监控配置
"monitoring": {
"enable_tracing": True,
"log_level": "INFO",
"metrics_enabled": True
}
}5.2 部署架构
# docker-compose.yml
version: '3.8'
services:
# API服务
api:
build: ./api
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- QDRANT_URL=http://qdrant:6333
- LLM_API_KEY=${LLM_API_KEY}
depends_on:
- redis
- qdrant
deploy:
replicas: 3
resources:
limits:
memory: 4G
# Worker服务
worker:
build: ./worker
environment:
- REDIS_URL=redis://redis:6379
- QDRANT_URL=http://qdrant:6333
depends_on:
- redis
- qdrant
deploy:
replicas: 5
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
# Qdrant向量数据库
qdrant:
image: qdrant/qdrant
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_data:/qdrant/storage
# Prometheus监控
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
volumes:
redis_data:
qdrant_data:5.3 性能监控
class AgenticRAGMonitor:
"""
Agentic RAG系统监控
"""
def __init__(self):
self.metrics = {
"requests": [],
"retrieval_latency": [],
"generation_latency": [],
"total_latency": [],
"cache_hit_rate": 0,
"error_rate": 0,
"iteration_distribution": []
}
def record_request(self, request_data):
"""记录请求"""
self.metrics["requests"].append({
"timestamp": datetime.now(),
"query": request_data.get("query", "")[:100],
"task_type": request_data.get("task_type"),
"iterations": request_data.get("iterations", 1),
"success": request_data.get("success", True)
})
def record_latency(self, phase, latency_ms):
"""记录延迟"""
key = f"{phase}_latency"
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append({
"timestamp": datetime.now(),
"latency_ms": latency_ms
})
def get_health_metrics(self):
"""获取健康指标"""
now = datetime.now()
last_hour = [r for r in self.metrics["requests"]
if (now - r["timestamp"]).seconds < 3600]
return {
"requests_per_minute": len(last_hour) / 60,
"avg_latency": np.mean([
m["latency_ms"]
for m in self.metrics.get("total_latency", [])
]),
"p95_latency": np.percentile([
m["latency_ms"]
for m in self.metrics.get("total_latency", [])
], 95),
"cache_hit_rate": self.metrics["cache_hit_rate"],
"error_rate": self._calculate_error_rate(),
"avg_iterations": np.mean([
r["iterations"]
for r in last_hour
])
}6. 总结
Agentic RAG的架构设计体现了以下核心原则:
| 设计原则 | 实现方式 | 价值 |
|---|---|---|
| 模块化 | 独立Agent组件 | 易于维护和扩展 |
| 自适应性 | 动态策略选择 | 匹配不同查询需求 |
| 记忆增强 | 多层次记忆系统 | 利用历史经验 |
| 容错性 | 迭代验证与纠正 | 提高答案质量 |
| 可观测性 | 完整监控体系 | 便于优化调试 |
| 可扩展性 | 工具集成框架 | 持续增强能力 |
这些架构组件共同构成了一个生产级的Agentic RAG系统,能够高效处理各类复杂查询并持续学习和改进。