概述
Agentic RAG系统的核心是自主决策能力——在复杂的检索-推理环境中,智能体需要基于不完全信息做出最优决策。本文档使用**部分可观测马尔可夫决策过程(POMDP)**框架形式化描述Agentic RAG的决策机制,为系统设计提供理论支撑。
1. POMDP基础回顾
1.1 标准POMDP定义
一个有限时域POMDP由七元组 定义:
| 符号 | 含义 | Agentic RAG中的对应 |
|---|---|---|
| 状态空间 | 工作记忆+检索上下文+对话历史 | |
| 动作空间 | 检索、读取、生成、细化查询、停止 | |
| 观测空间 | 检索结果、生成输出、工具响应 | |
| 转移函数 | $T(s’ | |
| 观测函数 | $Z(o | |
| 奖励函数 | 答案质量、检索效率、成本 | |
| 折扣因子 | 长期vs短期权衡 |
1.2 信念状态
在POMDP中,智能体维护信念状态 表示对当前状态的概率估计:
其中 是历史轨迹。
2. Agentic RAG的POMDP建模
2.1 状态空间
Agentic RAG的状态 是一个复合结构:
| 组件 | 含义 | 数据结构 |
|---|---|---|
| 工作记忆 | 短期上下文向量 | |
| 语义记忆 | 检索到的文档嵌入 | |
| 情景记忆 | 历史交互轨迹 | |
| 当前任务状态 | 任务分解状态 | |
| 查询表示 | 用户查询的内部表示 |
@dataclass
class AgenticRAGState:
"""Agentic RAG状态表示"""
# 工作记忆:当前推理上下文
working_memory: List[Message] = field(default_factory=list)
# 语义记忆:已检索的知识
semantic_memory: List[RetrievedChunk] = field(default_factory=list)
# 情景记忆:历史经验
episodic_memory: List[Interaction] = field(default_factory=list)
# 任务状态:当前执行进度
task_state: TaskState = None
# 查询表示
query_representation: QueryEmbedding = None
def to_belief_vector(self) -> np.ndarray:
"""将状态转换为信念向量用于策略网络"""
wm_vec = self.aggregate_working_memory()
sm_vec = self.aggregate_semantic_memory()
task_vec = self.encode_task_state()
return np.concatenate([wm_vec, sm_vec, task_vec])2.2 动作空间
Agentic RAG的动作空间包含多层次操作:
动作分类层次
┌─────────────────────────────────────────────────────────────────────────┐
│ Agentic RAG 动作空间 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Level 1: 原子动作 │
│ ├── retrieve(query, source, k) 检索动作 │
│ │ ├── source: {vector, keyword, kg, web} │
│ │ └── k: 检索数量 │
│ ├── read(chunk_id, detail) 读取动作 │
│ │ ├── detail: {summary, paragraph, sentence, entity} │
│ │ └── 粒度由细到粗 │
│ ├── generate(prompt, strategy) 生成动作 │
│ │ ├── strategy: {direct, cot, refactor, expand} │
│ │ └── 影响生成风格 │
│ └── stop() 停止动作 │
│ │
│ Level 2: 复合动作(宏动作) │
│ ├── search_and_read(query, depth) 检索+读取 │
│ ├── verify_and_refine(result) 验证+细化 │
│ ├── decompose_and_execute(task) 分解+执行 │
│ └── plan_step() 规划下一步 │
│ │
│ Level 3: 元动作 │
│ ├── update_memory(content) 更新记忆 │
│ ├── update_plan(feedback) 更新计划 │
│ └── escalate_to_human() 升级人工 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
动作编码
class ActionSpace:
"""Agentic RAG动作空间定义"""
ATOMIC_ACTIONS = {
"retrieve_vector": {"type": "retrieve", "source": "vector", "params": {"k": 5}},
"retrieve_keyword": {"type": "retrieve", "source": "keyword", "params": {"k": 10}},
"retrieve_kg": {"type": "retrieve", "source": "knowledge_graph", "params": {"depth": 2}},
"read_summary": {"type": "read", "detail": "summary"},
"read_paragraph": {"type": "read", "detail": "paragraph"},
"read_entity": {"type": "read", "detail": "entity"},
"generate_direct": {"type": "generate", "strategy": "direct"},
"generate_cot": {"type": "generate", "strategy": "chain_of_thought"},
"generate_refine": {"type": "generate", "strategy": "self_refine"},
"stop": {"type": "stop"},
}
@classmethod
def get_action_dim(cls) -> int:
"""动作空间维度"""
return len(cls.ATOMIC_ACTIONS) + len(cls.MACRO_ACTIONS) * cls.MAX_MACRO_LEN
@classmethod
def encode_action(cls, action: Dict) -> np.ndarray:
"""将动作编码为one-hot向量"""
action_vec = np.zeros(cls.get_action_dim())
if action["type"] == "atomic":
idx = cls.ATOMIC_ACTIONS.get(action["name"], {}).get("_idx", -1)
if idx >= 0:
action_vec[idx] = 1.0
return action_vec2.3 观测空间
观测是智能体接收的外部信息:
@dataclass
class Observation:
"""Agentic RAG观测结构"""
# 检索观测
retrieved_chunks: List[Chunk] = field(default_factory=list)
retrieval_metadata: RetrievalMetadata = None
# 生成观测
generation_output: Generation = None
generation_confidence: float = 0.0
# 工具响应
tool_response: ToolResponse = None
tool_execution_time: float = 0.0
# 用户反馈
user_feedback: Optional[Feedback] = None
user_satisfaction: float = 0.0
def to_observation_vector(self) -> np.ndarray:
"""转换为观测向量"""
features = []
# 检索质量特征
features.append(len(self.retrieved_chunks)) # 检索数量
features.append(self.retrieval_metadata.avg_score if self.retrieval_metadata else 0)
# 生成质量特征
features.append(self.generation_confidence)
features.append(len(self.generation_output.text) if self.generation_output else 0)
# 工具执行特征
features.append(self.tool_execution_time)
return np.array(features)2.4 转移函数
状态转移描述执行动作后的状态变化:
def state_transition(state: AgenticRAGState, action: Action,
observation: Observation) -> AgenticRAGState:
"""
状态转移函数
形式化: s' = T(s, a, o)
"""
new_state = copy.deepcopy(state)
if action.type == "retrieve":
# 检索后更新语义记忆
new_state.semantic_memory.extend(observation.retrieved_chunks)
elif action.type == "read":
# 读取后更新工作记忆
new_state.working_memory.append({
"role": "context",
"content": observation.tool_response.content,
"source": action.source
})
elif action.type == "generate":
# 生成后添加到工作记忆
new_state.working_memory.append({
"role": "assistant",
"content": observation.generation_output.text,
"confidence": observation.generation_confidence
})
elif action.type == "stop":
# 停止时进行记忆整合
new_state = consolidate_to_episodic(new_state)
# 更新任务状态
new_state.task_state = update_task_progress(
new_state.task_state, action
)
return new_state2.5 观测函数
观测函数描述在状态 执行动作 后观测到 的概率:
class ObservationFunction:
"""
观测函数 Z(o | s', a)
描述给定状态和动作后的观测分布
"""
def __init__(self, retrieval_model, generation_model):
self.retrieval_model = retrieval_model
self.generation_model = generation_model
def probability(self, observation: Observation,
next_state: AgenticRAGState,
action: Action) -> float:
"""
计算 P(o | s', a)
"""
if action.type == "retrieve":
return self._retrieval_probability(observation, next_state)
elif action.type == "generate":
return self._generation_probability(observation, next_state)
else:
return 0.5 # 默认概率
def _retrieval_probability(self, observation: Observation,
state: AgenticRAGState) -> float:
"""
检索观测的概率
相关性越高,观测概率越高
"""
if not observation.retrieved_chunks:
return 0.1 # 空检索的低概率
# 计算平均相关性分数
relevance_scores = [
chunk.relevance_to(state.query_representation)
for chunk in observation.retrieved_chunks
]
avg_relevance = np.mean(relevance_scores)
# 多样性奖励
diversity = calculate_diversity(observation.retrieved_chunks)
return 0.3 * avg_relevance + 0.7 * diversity
def _generation_probability(self, observation: Observation,
state: AgenticRAGState) -> float:
"""
生成观测的概率
"""
# 基于置信度和流畅度
confidence = observation.generation_confidence
# 基于与上下文的连贯性
coherence = calculate_coherence(
observation.generation_output,
state.working_memory
)
return 0.6 * confidence + 0.4 * coherence2.6 奖励函数
奖励函数指导学习过程:
class RewardFunction:
"""
Agentic RAG奖励函数
R(s, a, s') = α·R_quality + β·R_efficiency - γ·R_cost
"""
def __init__(self, weights: Dict[str, float] = None):
self.weights = weights or {
"quality": 0.6,
"efficiency": 0.3,
"cost": 0.1
}
def compute(self, state: AgenticRAGState, action: Action,
next_state: AgenticRAGState,
final_output: Optional[str] = None) -> float:
"""
计算奖励
"""
r_quality = self._quality_reward(next_state, final_output)
r_efficiency = self._efficiency_reward(action, state, next_state)
r_cost = self._cost_reward(action)
return (
self.weights["quality"] * r_quality +
self.weights["efficiency"] * r_efficiency -
self.weights["cost"] * r_cost
)
def _quality_reward(self, state: AgenticRAGState,
final_output: Optional[str]) -> float:
"""
质量奖励
基于:
1. 答案的准确性
2. 证据的充分性
3. 事实的一致性
"""
if final_output is None:
return 0.0 # 过程奖励
# 事实核查分数
factual_score = self.fact_checker.verify(final_output, state.semantic_memory)
# 完整性分数
completeness = self._calculate_completeness(state)
# 置信度分数
confidence = self._estimate_confidence(state)
return (factual_score + completeness + confidence) / 3
def _efficiency_reward(self, action: Action,
state: AgenticRAGState,
next_state: AgenticRAGState) -> float:
"""
效率奖励
鼓励:
1. 快速收敛
2. 避免不必要的检索
3. 适时停止
"""
# 步骤效率
step_cost = 0.1
# 提前停止奖励
if action.type == "stop" and self._is_satisfied(state):
return 0.5 # 适时停止
# 冗余检索惩罚
if action.type == "retrieve" and self._is_redundant(action, state):
return -0.2
return -step_cost
def _cost_reward(self, action: Action) -> float:
"""
成本奖励
基于:
1. Token消耗
2. API调用次数
3. 计算时间
"""
cost_map = {
"retrieve_vector": 0.05,
"retrieve_keyword": 0.03,
"retrieve_kg": 0.08,
"read_summary": 0.02,
"read_paragraph": 0.05,
"read_entity": 0.03,
"generate_direct": 0.10,
"generate_cot": 0.15,
"generate_refine": 0.12,
"stop": 0.0
}
return cost_map.get(action.name, 0.05)3. 信念状态更新
3.1 贝叶斯更新
信念状态通过贝叶斯更新:
其中 是归一化常数。
class BeliefState:
"""
信念状态管理与更新
"""
def __init__(self, prior: np.ndarray):
"""
初始化信念状态
Args:
prior: 先验分布 P(s)
"""
self.belief = prior # b(s)
self.history = [] # 历史信念用于追踪
def update(self, action: Action, observation: Observation,
transition_fn: Callable, observation_fn: Callable):
"""
信念状态贝叶斯更新
b'(s') = η · Z(o | s', a) · Σ_s T(s' | s, a) · b(s)
"""
new_belief = np.zeros_like(self.belief)
# 对每个可能的下一状态计算
for s_prime in range(len(self.belief)):
# 转移加权求和
transition_sum = 0.0
for s in range(len(self.belief)):
t_prob = transition_fn(s_prime, s, action)
transition_sum += t_prob * self.belief[s]
# 观测概率
z_prob = observation_fn(observation, s_prime, action)
new_belief[s_prime] = z_prob * transition_sum
# 归一化
self.belief = new_belief / (np.sum(new_belief) + 1e-8)
# 保存历史
self.history.append({
"action": action,
"observation": observation,
"belief": self.belief.copy()
})
def get_action_distribution(self, policy: np.ndarray) -> np.ndarray:
"""
基于当前信念获取动作分布
π(a | b) = Σ_s b(s) · π(a | s)
"""
return np.sum(
self.belief[:, None, None] * policy,
axis=0
)3.2 近似推断方法
精确的信念更新在状态空间大时不可行,采用近似方法:
class ApproximateBeliefUpdate:
"""
近似信念更新方法
"""
@staticmethod
def particle_filter(belief: BeliefState,
action: Action,
observation: Observation,
n_particles: int = 100) -> BeliefState:
"""
粒子滤波近似
1. 从当前信念采样粒子
2. 执行重要性重采样
"""
particles = belief.sample(n_particles)
# 重要性权重更新
weights = []
for particle in particles:
# 计算重要性权重
w = belief.belief[particle] * particle.likelihood(action, observation)
weights.append(w)
weights = np.array(weights)
weights /= weights.sum()
# 重采样
indices = np.random.choice(n_particles, size=n_particles, p=weights)
new_particles = [particles[i] for i in indices]
# 更新信念
new_belief = np.zeros_like(belief.belief)
for p in new_particles:
new_belief[p.id] += 1.0 / n_particles
belief.belief = new_belief
return belief
@staticmethod
def point_estimate(belief: BeliefState,
method: str = "max") -> int:
"""
点估计近似
- max: MAP估计
- mean: 期望估计
"""
if method == "max":
return np.argmax(belief.belief)
elif method == "mean":
return np.round(np.sum(
belief.belief * np.arange(len(belief.belief))
)).astype(int)4. 策略学习
4.1 策略表示
策略 是从信念状态到动作的映射:
class PolicyNetwork(nn.Module):
"""
信念条件策略网络
π(a | b; θ)
"""
def __init__(self, belief_dim: int, action_dim: int, hidden_dim: int = 256):
super().__init__()
# 信念编码器
self.belief_encoder = nn.Sequential(
nn.Linear(belief_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 动作输出
self.action_head = nn.Linear(hidden_dim, action_dim)
def forward(self, belief: np.ndarray) -> np.ndarray:
"""
前向传播
Args:
belief: 信念向量 (batch_size, belief_dim)
Returns:
动作logits (batch_size, action_dim)
"""
encoded = self.belief_encoder(belief)
logits = self.action_head(encoded)
return logits
def get_action(self, belief: np.ndarray,
deterministic: bool = False) -> Tuple[int, float]:
"""
获取动作
Args:
belief: 信念向量
deterministic: 是否确定性选择
Returns:
(动作索引, 动作概率)
"""
logits = self.forward(belief)
if deterministic:
action = torch.argmax(logits)
else:
probs = F.softmax(logits, dim=-1)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item(), probs[0, action].item()4.2 强化学习训练
class AgenticRAGTrainer:
"""
Agentic RAG策略训练器
"""
def __init__(self, env: AgenticRAGEnv, policy: PolicyNetwork):
self.env = env
self.policy = policy
self.optimizer = torch.optim.Adam(policy.parameters(), lr=1e-4)
def collect_trajectories(self, n_episodes: int) -> List[Dict]:
"""
收集轨迹数据
"""
trajectories = []
for _ in range(n_episodes):
trajectory = []
state = self.env.reset()
belief = BeliefState(prior=self._get_prior(state))
done = False
while not done:
# 获取信念条件动作
belief_vec = belief.to_belief_vector()
action, prob = self.policy.get_action(belief_vec)
# 执行动作
next_state, reward, done, info = self.env.step(action)
# 获取观测
observation = info["observation"]
# 更新信念
belief.update(
action=action,
observation=observation,
transition_fn=self.env.transition,
observation_fn=self.env.observation
)
trajectory.append({
"belief": belief_vec,
"action": action,
"action_prob": prob,
"reward": reward,
"next_belief": belief.to_belief_vector()
})
state = next_state
trajectories.append(trajectory)
return trajectories
def compute_advantages(self, trajectories: List[Dict],
gamma: float = 0.99,
lambda_: float = 0.95) -> np.ndarray:
"""
计算GAE优势估计
A_t = Σ_{l=1}^{T-t} (γλ)^l δ_t+l
其中 δ_t = r_t + γV(s_{t+1}) - V(s_t)
"""
all_advantages = []
for traj in trajectories:
advantages = []
last_advantage = 0
values = [self.value_network(t["belief"]) for t in traj]
for t in reversed(range(len(traj))):
delta = (traj[t]["reward"] +
gamma * values[t+1] - values[t] if t < len(traj)-1
else traj[t]["reward"] - values[t])
advantage = delta + gamma * lambda_ * last_advantage
advantages.insert(0, advantage)
last_advantage = advantage
all_advantages.extend(advantages)
return np.array(all_advantages)
def update(self, trajectories: List[Dict], advantages: np.ndarray):
"""
策略更新
"""
# 准备训练数据
beliefs = torch.tensor(
[t["belief"] for t in trajectories],
dtype=torch.float32
)
actions = torch.tensor(
[t["action"] for t in trajectories],
dtype=torch.long
)
# 计算策略梯度
logits = self.policy(beliefs)
log_probs = F.log_softmax(logits, dim=-1)
action_log_probs = log_probs.gather(1, actions.unsqueeze(1)).squeeze()
# PPO-style更新
loss = -(action_log_probs * torch.tensor(advantages)).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()5. 决策过程可视化
5.1 完整决策循环
┌─────────────────────────────────────────────────────────────────────────┐
│ Agentic RAG POMDP决策循环 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ 初始化状态 │ │
│ │ s₀ = (∅, ∅, ∅) │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 信念更新 (Belief Update) │ │
│ │ │ │
│ │ b_t(s) = P(s | h_t) │ │
│ │ │ │
│ │ h_t = (a₁, o₁, a₂, o₂, ..., a_t, o_t) │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 策略执行 (Policy Execution) │ │
│ │ │ │
│ │ a_t ~ π(a | b_t) │ │
│ │ │ │
│ │ 可选动作: │ │
│ │ - retrieve(vector, k=5) │ │
│ │ - read(chunk_id, detail="paragraph") │ │
│ │ - generate(strategy="cot") │ │
│ │ - stop() │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 执行动作 a_t │ │
│ │ 获得观测 o_t │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 状态转移 │ │
│ │ s_{t+1} = T │ │
│ │ (s_t, a_t) │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 计算奖励 │ │
│ │ R(s_t, a_t) │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────┐ │
│ │ 决策: 继续 / 停止 / 重规划 │ │
│ └───────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
5.2 停止决策
class StoppingCriteria:
"""
POMDP框架下的停止决策
"""
def __init__(self, value_network, threshold: float = 0.9):
self.value_network = value_network
self.threshold = threshold
def should_stop(self, belief: BeliefState,
state: AgenticRAGState) -> Tuple[bool, str]:
"""
决定是否停止
基于:
1. 当前信念的价值估计
2. 证据充分性
3. 边际效益递减
"""
# 价值估计
belief_vec = belief.to_belief_vector()
value = self.value_network(belief_vec)
# 证据充分性
sufficiency = self._check_evidence_sufficiency(state)
# 边际效益
marginal_benefit = self._estimate_marginal_benefit(belief, state)
if value >= self.threshold and sufficiency >= 0.8:
return True, "quality_threshold_met"
if marginal_benefit < 0.1:
return True, "diminishing_returns"
if len(belief.history) >= self.max_steps:
return True, "max_steps_exceeded"
return False, "continue"
def _check_evidence_sufficiency(self, state: AgenticRAGState) -> float:
"""
检查证据充分性
"""
if not state.semantic_memory:
return 0.0
# 覆盖度
coverage = self._calculate_coverage(state)
# 一致性
consistency = self._calculate_consistency(state)
# 新鲜度
freshness = self._calculate_freshness(state)
return 0.4 * coverage + 0.3 * consistency + 0.3 * freshness6. 实践应用
6.1 系统实现架构
class POMDPBasedAgenticRAG:
"""
基于POMDP框架的Agentic RAG系统
"""
def __init__(self, config: Dict):
# 组件初始化
self.state_encoder = StateEncoder()
self.belief_tracker = BeliefState(prior=self._get_uniform_prior())
self.policy_network = PolicyNetwork(
belief_dim=config["belief_dim"],
action_dim=config["action_dim"]
)
self.value_network = ValueNetwork(config["belief_dim"])
self.reward_function = RewardFunction(config["reward_weights"])
# 环境组件
self.retriever = create_retriever(config["retriever"])
self.generator = create_generator(config["generator"])
self.tools = ToolRegistry(config["tools"])
def query(self, user_query: str) -> str:
"""
处理用户查询
"""
# 初始化
state = self._init_state(user_query)
trajectory = []
for step in range(self.max_steps):
# 信念更新
belief_vec = self.belief_tracker.to_belief_vector()
# 策略决策
action_idx, action_prob = self.policy_network.get_action(belief_vec)
action = self._decode_action(action_idx)
# 执行动作
observation = self._execute_action(action, state)
# 状态更新
next_state = state_transition(state, action, observation)
# 奖励计算
reward = self.reward_function.compute(state, action, next_state)
# 信念更新
self.belief_tracker.update(action, observation,
self.transition, self.observation)
# 轨迹记录
trajectory.append({
"step": step,
"action": action,
"reward": reward,
"state": state
})
state = next_state
# 检查停止条件
if self._should_stop(state):
break
# 生成最终答案
return self._generate_final_answer(state)
def _should_stop(self, state: AgenticRAGState) -> bool:
"""停止条件检查"""
stopping = StoppingCriteria(self.value_network)
should_stop, reason = stopping.should_stop(
self.belief_tracker, state
)
return should_stop6.2 调参建议
| 参数 | 建议范围 | 影响 |
|---|---|---|
| (折扣因子) | 0.9-0.99 | 长期vs短期权衡 |
| (GAE参数) | 0.9-0.98 | 偏差-方差平衡 |
| 信念维度 | 256-1024 | 表示精度 |
| 动作空间大小 | 10-30 | 决策复杂度 |
| 最大步数 | 5-15 | 推理深度 |