概述

本文详细介绍Agentic RAG系统的架构设计与核心组件实现,涵盖检索智能体、规划模块、记忆系统、工具集成以及生产级部署最佳实践。


1. 系统架构概览

1.1 整体架构

┌─────────────────────────────────────────────────────────────────────────┐
│                          Agentic RAG 架构                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                         用户交互层                                 │  │
│  │         (Query Input, Response Display, Feedback)                  │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                                    ↓                                    │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                         协调器 (Coordinator)                      │  │
│  │     - 任务分解    - 流程控制    - 结果整合    - 异常处理         │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                    ↓                      ↑                              │
│  ┌─────────────────┐        ┌─────────────────┐        ┌────────────┐  │
│  │   检索Agent    │ ←────→ │   生成Agent    │ ←────→ │  工具Agent │  │
│  │   (Retrieval)  │        │   (Generation) │        │  (Tools)   │  │
│  └─────────────────┘        └─────────────────┘        └────────────┘  │
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                         记忆系统 (Memory)                         │  │
│  │   短期记忆    长期记忆    工作记忆    经验记忆                     │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │                         数据层 (Data Layer)                       │  │
│  │      向量存储      知识图谱      结构化DB      缓存层            │  │
│  └──────────────────────────────────────────────────────────────────┘  │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

1.2 核心组件职责

组件职责关键能力
协调器全局调度与决策任务分解、流程控制、异常恢复
检索Agent多源信息获取向量检索、图谱检索、混合检索
生成Agent答案构建与验证草稿生成、答案修订、质量评估
工具Agent扩展系统能力搜索引擎、计算器、API调用
记忆系统状态与经验管理上下文维护、经验存储
数据层知识存储与索引索引管理、查询优化

2. 检索智能体设计

2.1 多策略检索Agent

class RetrievalAgent:
    """
    多策略检索Agent
    支持向量检索、关键词检索、知识图谱检索等多种策略
    """
    
    def __init__(self, vector_db, kg_store, config=None):
        self.vector_db = vector_db
        self.kg_store = kg_store
        self.config = config or self._default_config()
        
        # 检索策略权重
        self.strategy_weights = {
            "dense": 0.4,
            "sparse": 0.2,
            "graph": 0.3,
            "keyword": 0.1
        }
    
    def search(self, query, strategy="adaptive", top_k=10, context=None):
        """
        自适应检索主方法
        
        Args:
            query: 检索查询
            strategy: 检索策略 ("adaptive", "dense", "hybrid", "graph")
            top_k: 返回结果数量
            context: 额外的上下文信息
        """
        if strategy == "adaptive":
            strategy = self._select_strategy(query, context)
        
        if strategy == "dense":
            return self._dense_search(query, top_k)
        elif strategy == "sparse":
            return self._sparse_search(query, top_k)
        elif strategy == "hybrid":
            return self._hybrid_search(query, top_k)
        elif strategy == "graph":
            return self._graph_search(query, top_k, context)
        else:
            return self._adaptive_search(query, top_k, context)
    
    def _select_strategy(self, query, context):
        """
        根据查询特征选择最佳检索策略
        """
        # 分析查询特征
        query_features = {
            "has_entity": self._contains_entities(query),
            "has_relation": self._contains_relations(query),
            "is_comparative": self._is_comparative(query),
            "is_temporal": self._is_temporal(query),
            "complexity": self._estimate_complexity(query)
        }
        
        # 基于特征的策略选择
        if query_features["has_entity"] and query_features["has_relation"]:
            return "graph"  # 关系查询使用图谱检索
        elif query_features["is_comparative"]:
            return "hybrid"  # 比较查询使用混合检索
        elif query_features["complexity"] > 0.7:
            return "adaptive"  # 复杂查询使用自适应
        else:
            return "dense"  # 默认使用向量检索
    
    def _dense_search(self, query, top_k):
        """向量检索"""
        query_embedding = self._embed_query(query)
        results = self.vector_db.search(
            query_embedding,
            top_k=top_k,
            filters=self.config.get("filters")
        )
        return [self._format_result(r, "dense") for r in results]
    
    def _sparse_search(self, query, top_k):
        """稀疏检索(BM25)"""
        results = self.vector_db.sparse_search(
            query,
            top_k=top_k
        )
        return [self._format_result(r, "sparse") for r in results]
    
    def _hybrid_search(self, query, top_k):
        """混合检索"""
        # 并行执行向量和稀疏检索
        dense_results = self._dense_search(query, top_k * 2)
        sparse_results = self._sparse_search(query, top_k * 2)
        
        # RRF融合
        fused = self._reciprocal_rank_fusion(
            dense_results,
            sparse_results,
            k=60
        )
        
        return fused[:top_k]
    
    def _graph_search(self, query, top_k, context=None):
        """知识图谱检索"""
        # 识别查询中的实体
        entities = self._extract_entities(query)
        
        if not entities:
            # 如果没有识别到实体,回退到向量检索
            return self._dense_search(query, top_k)
        
        # 在图谱中检索实体及其邻域
        graph_results = []
        for entity in entities:
            # 获取实体信息
            node_info = self.kg_store.get_entity(entity)
            if node_info:
                graph_results.append(node_info)
            
            # 获取1-2跳邻居
            neighbors = self.kg_store.get_neighbors(
                entity,
                depth=2
            )
            graph_results.extend(neighbors)
        
        # 去重和排序
        unique_results = self._deduplicate(graph_results)
        
        return [self._format_result(r, "graph") for r in unique_results[:top_k]]
    
    def _reciprocal_rank_fusion(self, results_list, k=60):
        """
        倒数排名融合(RRF)
        
        RRF公式:RRF(d) = Σ 1/(k + rank(d))
        """
        from collections import defaultdict
        
        scores = defaultdict(float)
        doc_scores = defaultdict(list)
        
        for results in results_list:
            for rank, result in enumerate(results, 1):
                doc_id = result["doc_id"]
                score = 1 / (k + rank)
                scores[doc_id] += score
                doc_scores[doc_id].append(score)
        
        # 合并结果
        fused = []
        for doc_id in scores:
            result = self._get_full_result(doc_id)
            result["fused_score"] = scores[doc_id]
            result["sources"] = len(doc_scores[doc_id])
            fused.append(result)
        
        # 排序
        fused.sort(key=lambda x: x["fused_score"], reverse=True)
        
        return fused

2.2 迭代式检索增强

class IterativeRetrieval:
    """
    迭代式检索:支持多轮检索-评估-补充
    """
    
    def __init__(self, retrieval_agent, evaluator):
        self.retrieval_agent = retrieval_agent
        self.evaluator = evaluator
    
    def iterative_search(self, query, max_iterations=3, target_recall=0.8):
        """
        迭代式检索直到达到目标召回率
        
        Args:
            query: 查询
            max_iterations: 最大迭代次数
            target_recall: 目标召回率
        """
        all_results = []
        current_context = {}
        
        for iteration in range(max_iterations):
            # 执行检索
            if iteration == 0:
                results = self.retrieval_agent.search(query, top_k=10)
            else:
                # 基于前一轮结果生成补充查询
                sub_queries = self._generate_sub_queries(
                    query, all_results, current_context
                )
                
                results = []
                for sq in sub_queries:
                    sq_results = self.retrieval_agent.search(
                        sq, 
                        top_k=5,
                        context=current_context
                    )
                    results.extend(sq_results)
            
            # 评估召回率
            recall = self.evaluator.estimate_recall(
                query, results, all_results
            )
            
            # 添加新结果
            new_results = self._filter_new_results(results, all_results)
            all_results.extend(new_results)
            
            # 更新上下文
            current_context = self._update_context(
                query, all_results, current_context
            )
            
            # 检查是否达到目标
            if recall >= target_recall:
                break
        
        return all_results
    
    def _generate_sub_queries(self, original_query, previous_results, context):
        """
        基于当前上下文生成补充查询
        """
        # 分析已检索内容中的知识缺口
        gap_analysis = self.evaluator.analyze_gaps(
            original_query,
            previous_results,
            context
        )
        
        # 生成针对性查询
        sub_queries = []
        
        for gap in gap_analysis.get("gaps", []):
            if gap["type"] == "entity_detail":
                # 实体细节查询
                sub_queries.append(f"关于 {gap['entity']} 的详细信息")
            elif gap["type"] == "relation":
                # 关系查询
                sub_queries.append(f"{gap['entity1']}{gap['entity2']} 的关系")
            elif gap["type"] == "background":
                # 背景信息查询
                sub_queries.append(f"{gap['topic']} 的背景和基础知识")
        
        return sub_queries if sub_queries else [original_query]

3. 规划模块实现

3.1 任务规划器

class TaskPlanner:
    """
    任务规划器:分解复杂任务并制定执行计划
    """
    
    def __init__(self, llm):
        self.llm = llm
        self.task_templates = self._load_templates()
    
    def plan(self, query, available_tools, constraints):
        """
        为查询制定执行计划
        
        Args:
            query: 用户查询
            available_tools: 可用工具列表
            constraints: 约束条件(时间、成本等)
        
        Returns:
            执行计划
        """
        # 分析查询
        query_analysis = self._analyze_query(query)
        
        # 确定任务类型
        task_type = self._classify_task(query_analysis)
        
        # 选择任务模板
        template = self._select_template(task_type, query_analysis)
        
        # 实例化计划
        plan = self._instantiate_plan(
            template,
            query_analysis,
            available_tools,
            constraints
        )
        
        # 优化计划
        optimized_plan = self._optimize_plan(plan, constraints)
        
        return {
            "task_type": task_type,
            "analysis": query_analysis,
            "plan": optimized_plan,
            "estimated_cost": self._estimate_cost(optimized_plan),
            "expected_quality": self._estimate_quality(optimized_plan)
        }
    
    def _analyze_query(self, query):
        """深度分析查询"""
        analysis_prompt = f"""
详细分析以下查询的各个维度:
 
查询:{query}
 
分析维度:
1. 意图类型:事实查询、解释性、比较性、分析性、生成性
2. 所需信息类型:具体事实、概念解释、过程描述、数据分析
3. 复杂度:简单/中等/复杂/多跳
4. 时间敏感性:最新信息/历史信息/无要求
5. 准确性要求:高/中/低
6. 潜在知识缺口:查询中隐含但未明确的信息需求
 
输出JSON格式的完整分析。
"""
        response = self.llm.generate(analysis_prompt)
        return parse_json(response)
    
    def _classify_task(self, analysis):
        """分类任务类型"""
        intent = analysis.get("intent_type", "unknown")
        complexity = analysis.get("complexity", "simple")
        
        if complexity == "simple" and intent in ["fact", "explanatory"]:
            return "simple_retrieval"
        elif complexity == "complex" or intent == "analytical":
            return "complex_analysis"
        elif intent == "generative":
            return "creative_generation"
        elif complexity == "multi_hop":
            return "multi_hop_reasoning"
        else:
            return "general_qa"
    
    def _select_template(self, task_type, analysis):
        """选择任务模板"""
        # 根据任务类型选择预定义模板
        templates = {
            "simple_retrieval": {
                "steps": [
                    {"type": "retrieve", "target": "primary_info", "priority": 1},
                    {"type": "generate", "priority": 2}
                ],
                "expected_tools": ["vector_search"]
            },
            "complex_analysis": {
                "steps": [
                    {"type": "retrieve", "target": "background", "priority": 1},
                    {"type": "retrieve", "target": "specific", "priority": 2},
                    {"type": "analyze", "priority": 3},
                    {"type": "verify", "priority": 4},
                    {"type": "generate", "priority": 5}
                ],
                "expected_tools": ["vector_search", "kg_search", "calculator"]
            },
            "multi_hop_reasoning": {
                "steps": [
                    {"type": "retrieve", "target": "entity_1", "priority": 1},
                    {"type": "retrieve", "target": "entity_2", "priority": 1},
                    {"type": "analyze", "target": "relation", "priority": 2, "depends_on": [1, 2]},
                    {"type": "retrieve", "target": "supporting", "priority": 3, "depends_on": [2]},
                    {"type": "synthesize", "priority": 4, "depends_on": [3]}
                ],
                "expected_tools": ["vector_search", "kg_search", "web_search"]
            }
        }
        
        return templates.get(task_type, templates["general_qa"])
    
    def _instantiate_plan(self, template, analysis, available_tools, constraints):
        """实例化具体执行计划"""
        plan = {
            "phases": [],
            "estimated_steps": len(template["steps"]),
            "required_capabilities": template.get("expected_tools", [])
        }
        
        for step in template["steps"]:
            # 检查工具可用性
            if step["type"] == "retrieve":
                tool = self._select_tool(step["target"], available_tools)
            else:
                tool = step["type"]
            
            # 检查依赖
            depends_on = [
                plan["phases"][dep - 1]["phase_id"]
                for dep in step.get("depends_on", [])
            ]
            
            # 估计资源需求
            resource_estimate = self._estimate_step_resources(
                step, analysis, constraints
            )
            
            plan["phases"].append({
                "phase_id": len(plan["phases"]) + 1,
                "type": step["type"],
                "target": step.get("target"),
                "tool": tool,
                "depends_on": depends_on,
                "resources": resource_estimate,
                "quality_weight": step.get("priority", 5) / 10
            })
        
        return plan
    
    def _optimize_plan(self, plan, constraints):
        """优化计划以满足约束"""
        # 识别可并行化的步骤
        parallel_groups = self._identify_parallel_opportunities(plan)
        
        # 重排以优化执行顺序
        optimized_phases = self._reorder_phases(plan["phases"], parallel_groups)
        
        # 检查是否满足约束
        if not self._check_constraints(optimized_phases, constraints):
            # 如果不满足,简化计划
            simplified = self._simplify_plan(optimized_phases, constraints)
            return simplified
        
        plan["phases"] = optimized_phases
        plan["parallel_groups"] = parallel_groups
        
        return plan

4. 记忆系统实现

4.1 多层次记忆架构

class MultiLevelMemory:
    """
    多层次记忆系统
    - 短期记忆:当前会话上下文
    - 工作记忆:当前任务处理状态
    - 长期记忆:持久化经验
    - 语义记忆:结构化知识
    """
    
    def __init__(self, vector_store, config=None):
        self.vector_store = vector_store
        self.config = config or {}
        
        # 内存存储
        self.short_term = []  # 最近交互
        self.working = {}     # 当前任务状态
        self.episodic = []    # 情节记忆
        self.semantic = {}     # 语义记忆
        
        # 配置参数
        self.max_short_term = self.config.get("max_short_term", 20)
        self.max_working = self.config.get("max_working", 100)
        self.consolidation_threshold = self.config.get("consolidation_threshold", 5)
    
    def store_interaction(self, interaction):
        """存储一次交互"""
        # 添加到短期记忆
        self.short_term.append({
            "content": interaction,
            "timestamp": datetime.now(),
            "access_count": 0
        })
        
        # 修剪短期记忆
        if len(self.short_term) > self.max_short_term:
            self._consolidate_oldest()
    
    def store_working_state(self, task_id, state):
        """存储工作状态"""
        self.working[task_id] = {
            "state": state,
            "updated_at": datetime.now(),
            "version": self.working.get(task_id, {}).get("version", 0) + 1
        }
        
        # 修剪工作记忆
        if len(self.working) > self.max_working:
            self._evict_least_used_working()
    
    def retrieve_relevant(self, query, memory_type="all", top_k=5):
        """
        检索相关记忆
        
        Args:
            query: 查询
            memory_type: 记忆类型 ("short", "working", "long", "semantic", "all")
            top_k: 返回数量
        """
        results = []
        
        if memory_type in ["short", "all"]:
            results.extend(self._search_short_term(query, top_k))
        
        if memory_type in ["working", "all"]:
            results.extend(self._search_working(query, top_k))
        
        if memory_type in ["long", "all"]:
            results.extend(self._search_long_term(query, top_k))
        
        if memory_type in ["semantic", "all"]:
            results.extend(self._search_semantic(query, top_k))
        
        # 去重和排序
        unique_results = self._deduplicate_results(results)
        unique_results.sort(key=lambda x: x["relevance"], reverse=True)
        
        return unique_results[:top_k]
    
    def _search_short_term(self, query, top_k):
        """搜索短期记忆"""
        query_embedding = self.vector_store.embed(query)
        
        results = []
        for memory in self.short_term:
            # 计算相关性
            memory_embedding = self.vector_store.embed(memory["content"])
            similarity = cosine_similarity(query_embedding, memory_embedding)
            
            if similarity > 0.5:
                results.append({
                    "content": memory["content"],
                    "type": "short_term",
                    "relevance": similarity,
                    "timestamp": memory["timestamp"]
                })
                memory["access_count"] += 1
        
        return sorted(results, key=lambda x: x["relevance"], reverse=True)[:top_k]
    
    def _search_long_term(self, query, top_k):
        """搜索长期记忆(情节记忆)"""
        # 使用向量搜索
        results = self.vector_store.search(
            query,
            collection="long_term_memory",
            top_k=top_k
        )
        
        return [
            {
                "content": r["content"],
                "type": "long_term",
                "relevance": r["score"],
                "timestamp": r.get("timestamp")
            }
            for r in results
        ]
    
    def _consolidate_oldest(self):
        """将旧的短期记忆整合到长期记忆"""
        if len(self.short_term) < self.consolidation_threshold:
            return
        
        # 选择高访问或高价值的记忆
        candidate = min(
            self.short_term,
            key=lambda x: (x["access_count"], x["timestamp"])
        )
        
        # 存储到长期记忆
        self._store_long_term(candidate["content"])
        
        # 从短期记忆移除
        self.short_term.remove(candidate)
    
    def _store_long_term(self, content):
        """存储到长期记忆"""
        # 生成嵌入
        embedding = self.vector_store.embed(content)
        
        # 存储到向量数据库
        self.vector_store.insert(
            collection="long_term_memory",
            vector=embedding,
            content=content,
            timestamp=datetime.now()
        )
        
        # 添加情节索引
        self.episodic.append({
            "content": content,
            "timestamp": datetime.now()
        })

4.2 经验学习模块

class ExperienceLearner:
    """
    经验学习:从历史交互中学习并改进
    """
    
    def __init__(self, memory, llm):
        self.memory = memory
        self.llm = llm
        self.learned_patterns = {}
    
    def learn_from_interaction(self, query, plan, result, feedback=None):
        """
        从一次交互中学习
        
        Args:
            query: 原始查询
            plan: 使用的执行计划
            result: 执行结果
            feedback: 用户反馈(可选)
        """
        # 提取成功的模式
        success_pattern = self._extract_pattern(query, plan, result, success=True)
        
        # 提取失败模式
        if not result.get("success") or (feedback and feedback.get("rating", 5) < 3):
            failure_pattern = self._extract_pattern(query, plan, result, success=False)
            self._store_pattern(failure_pattern, pattern_type="failure")
        
        # 存储成功模式
        self._store_pattern(success_pattern, pattern_type="success")
        
        # 更新检索策略
        self._update_retrieval_strategy(success_pattern)
        
        # 更新计划策略
        self._update_planning_strategy(success_pattern)
    
    def _extract_pattern(self, query, plan, result, success):
        """提取交互模式"""
        extraction_prompt = f"""
从以下交互中提取可学习的模式。
 
查询:{query}
执行计划:{plan}
执行结果:{result}
成功标记:{success}
 
请提取:
1. 查询类型和特征
2. 有效的策略组合
3. 关键的成功/失败因素
4. 可泛化的规则
 
输出JSON格式的模式描述。
"""
        response = self.llm.generate(extraction_prompt)
        pattern = parse_json(response)
        pattern["success"] = success
        pattern["query_type"] = self._classify_query_type(query)
        
        return pattern
    
    def _store_pattern(self, pattern, pattern_type):
        """存储学习到的模式"""
        pattern_id = hashlib.md5(
            f"{pattern['query_type']}_{pattern_type}".encode()
        ).hexdigest()
        
        if pattern_id not in self.learned_patterns:
            self.learned_patterns[pattern_id] = {
                "type": pattern_type,
                "count": 0,
                "patterns": []
            }
        
        self.learned_patterns[pattern_id]["count"] += 1
        self.learned_patterns[pattern_id]["patterns"].append(pattern)
        
        # 更新到记忆
        self.memory._store_long_term({
            "pattern_id": pattern_id,
            "pattern": pattern,
            "type": pattern_type
        })
    
    def get_learned_guidance(self, query):
        """
        获取基于历史学习的指导
        """
        query_type = self._classify_query_type(query)
        
        # 查找相关模式
        relevant_patterns = []
        for pattern_id, data in self.learned_patterns.items():
            if any(
                p.get("query_type") == query_type 
                for p in data["patterns"]
            ):
                relevant_patterns.extend(data["patterns"])
        
        if not relevant_patterns:
            return None
        
        # 合并模式
        guidance = self._merge_patterns(relevant_patterns)
        
        return guidance

5. 生产部署指南

5.1 系统配置

# 生产配置
PRODUCTION_CONFIG = {
    # LLM配置
    "llm": {
        "provider": "openai",
        "model": "gpt-4-turbo",
        "temperature": 0.7,
        "max_tokens": 4096,
        "retry_config": {
            "max_retries": 3,
            "backoff_factor": 2
        }
    },
    
    # 向量数据库配置
    "vector_db": {
        "provider": "qdrant",
        "collection": "agentic_rag",
        "vector_size": 1536,
        "hnsw_config": {
            "m": 16,
            "ef_construct": 200
        }
    },
    
    # 检索配置
    "retrieval": {
        "default_top_k": 10,
        "max_context_docs": 20,
        "reranker": {
            "enabled": True,
            "model": "cohere/rerank-multilingual-v3.0",
            "top_n": 5
        }
    },
    
    # Agent配置
    "agents": {
        "max_iterations": 5,
        "early_stopping": True,
        "confidence_threshold": 0.8,
        "timeout_seconds": 120
    },
    
    # 记忆配置
    "memory": {
        "max_short_term": 50,
        "max_working": 200,
        "consolidation_threshold": 10,
        "semantic_index_enabled": True
    },
    
    # 缓存配置
    "cache": {
        "enabled": True,
        "ttl_seconds": 3600,
        "max_entries": 10000
    },
    
    # 监控配置
    "monitoring": {
        "enable_tracing": True,
        "log_level": "INFO",
        "metrics_enabled": True
    }
}

5.2 部署架构

# docker-compose.yml
version: '3.8'
 
services:
  # API服务
  api:
    build: ./api
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - QDRANT_URL=http://qdrant:6333
      - LLM_API_KEY=${LLM_API_KEY}
    depends_on:
      - redis
      - qdrant
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 4G
 
  # Worker服务
  worker:
    build: ./worker
    environment:
      - REDIS_URL=redis://redis:6379
      - QDRANT_URL=http://qdrant:6333
    depends_on:
      - redis
      - qdrant
    deploy:
      replicas: 5
 
  # Redis缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
 
  # Qdrant向量数据库
  qdrant:
    image: qdrant/qdrant
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant_data:/qdrant/storage
 
  # Prometheus监控
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
 
volumes:
  redis_data:
  qdrant_data:

5.3 性能监控

class AgenticRAGMonitor:
    """
    Agentic RAG系统监控
    """
    
    def __init__(self):
        self.metrics = {
            "requests": [],
            "retrieval_latency": [],
            "generation_latency": [],
            "total_latency": [],
            "cache_hit_rate": 0,
            "error_rate": 0,
            "iteration_distribution": []
        }
    
    def record_request(self, request_data):
        """记录请求"""
        self.metrics["requests"].append({
            "timestamp": datetime.now(),
            "query": request_data.get("query", "")[:100],
            "task_type": request_data.get("task_type"),
            "iterations": request_data.get("iterations", 1),
            "success": request_data.get("success", True)
        })
    
    def record_latency(self, phase, latency_ms):
        """记录延迟"""
        key = f"{phase}_latency"
        if key not in self.metrics:
            self.metrics[key] = []
        self.metrics[key].append({
            "timestamp": datetime.now(),
            "latency_ms": latency_ms
        })
    
    def get_health_metrics(self):
        """获取健康指标"""
        now = datetime.now()
        last_hour = [r for r in self.metrics["requests"] 
                    if (now - r["timestamp"]).seconds < 3600]
        
        return {
            "requests_per_minute": len(last_hour) / 60,
            "avg_latency": np.mean([
                m["latency_ms"] 
                for m in self.metrics.get("total_latency", [])
            ]),
            "p95_latency": np.percentile([
                m["latency_ms"] 
                for m in self.metrics.get("total_latency", [])
            ], 95),
            "cache_hit_rate": self.metrics["cache_hit_rate"],
            "error_rate": self._calculate_error_rate(),
            "avg_iterations": np.mean([
                r["iterations"] 
                for r in last_hour
            ])
        }

6. 总结

Agentic RAG的架构设计体现了以下核心原则:

设计原则实现方式价值
模块化独立Agent组件易于维护和扩展
自适应性动态策略选择匹配不同查询需求
记忆增强多层次记忆系统利用历史经验
容错性迭代验证与纠正提高答案质量
可观测性完整监控体系便于优化调试
可扩展性工具集成框架持续增强能力

这些架构组件共同构成了一个生产级的Agentic RAG系统,能够高效处理各类复杂查询并持续学习和改进。


参考资料