基于模型的强化学习
基于模型的强化学习(Model-Based Reinforcement Learning, MBRL)通过学习环境模型来进行决策和规划。本文档系统介绍MBRL的理论基础、核心算法和实践方法。
基本概念
形式化定义
在 MDP 框架下,MBRL 学习一个环境模型:
该模型用于:
- 规划:生成合成经验
- 预测:评估动作价值
- 想象:模拟未来轨迹
与无模型 RL 的对比
| 维度 | 无模型 (Model-Free) | 基于模型 (Model-Based) |
|---|---|---|
| 学习目标 | 价值函数 / 策略 | 转移概率 |
| 样本来源 | 真实环境交互 | 真实 + 模型生成 |
| 样本效率 | 低 | 高 |
| 最优性保证 | 无 | 理论上可证 |
| 模型误差 | 无 | 累积误差问题 |
核心优势
样本效率对比示意图:
▲ 渐近性能
│
│ ╭───────── Model-Free
│ ╱
│ ╱
│ ╱ ╭─────── Model-Based
│ ╱ ╱
│ ╱╱
│╱
└──────────────────────────────▶
↑ 样本数量
↑
真实环境交互
世界模型学习
模型类型
1. 观测模型 (Observational Model)
直接预测下一个观测:
优点:训练简单
缺点:可能忽略关键状态信息
2. 潜在模型 (Latent Model)
学习紧凑潜在表示:
优点:更好的泛化
缺点:表示学习困难
3. 奖励模型
预测即时奖励:
训练目标
规划算法
1. 模型预测控制 (MPC)
MPC 在每个时间步执行以下步骤:
Algorithm MPC:
for t = 0, 1, 2, ... do
1. 获取当前状态 s_t
2. 采样 N 条候选动作序列 {a_t^{(i)}, ..., a_{t+H-1}^{(i)}}_{i=1}^N
3. 使用模型评估每条序列:
for i = 1 to N do
for k = 0 to H-1 do
ŝ_{t+k+1}^{(i)} = f(ŝ_{t+k}^{(i)}, a_{t+k}^{(i)})
r̂_{t+k}^{(i)} = r(ŝ_{t+k}^{(i)}, a_{t+k}^{(i)})
J^{(i)} = Σ_{k=0}^{H-1} γ^k r̂_{t+k}^{(i)}
end for
4. 选择最优动作: a_t* = argmax_i J^{(i)}
5. 执行 a_t*,观察 r_t, s_{t+1}
6. 更新模型 (可选)
end for
2. 交叉熵方法 (CEM)
CEM 是一种用于轨迹优化的进化算法:
def cem_planning(model, s0, horizon, n_samples=100, n_elites=10, n_iters=5):
"""
交叉熵方法进行轨迹优化
"""
mean = torch.zeros(horizon, action_dim)
std = torch.ones(horizon, action_dim)
for _ in range(n_iters):
# 1. 采样动作序列
actions = torch.randn(n_samples, horizon, action_dim) * std + mean
# 2. 评估轨迹
returns = []
s = s0
for i in range(n_samples):
G = 0
for t in range(horizon):
r = model.predict_reward(s, actions[i, t])
s = model.predict_next_state(s, actions[i, t])
G += r * (gamma ** t)
returns.append(G)
returns = torch.stack(returns)
# 3. 选择精英样本
elite_idx = returns.topk(n_elites).indices
elite_actions = actions[elite_idx]
# 4. 更新分布
mean = elite_actions.mean(dim=0)
std = elite_actions.std(dim=0) + 1e-6
return mean[0] # 返回第一个动作3. 随机 MPC
使用随机策略进行规划:
class StochasticMPC:
def __init__(self, model, horizon=10, n_samples=50):
self.model = model
self.horizon = horizon
self.n_samples = n_samples
def act(self, state):
best_action = None
best_value = float('-inf')
for _ in range(self.n_samples):
# 随机采样动作序列
actions = self.sample_action_sequence()
# 计算轨迹价值
value = self.evaluate_trajectory(state, actions)
if value > best_value:
best_value = value
best_action = actions[0]
return best_action
def evaluate_trajectory(self, state, actions):
G = 0
s = state
gamma = 0.99
for t, a in enumerate(actions):
r = self.model.predict_reward(s, a)
s = self.model.predict_next(s, a)
G += (gamma ** t) * r
return G4. LQG (Linear Quadratic Gaussian)
假设系统动力学是线性的,奖励是二次的:
优点:解析可解
缺点:仅适用于线性系统
5. 冥想规划 (Dreaming)
Dreamer 等方法在潜在空间进行规划:
def imagine_rollout(model, z0, policy, horizon):
"""
在潜在空间进行想象 Rollout
"""
trajectory = []
z = z0
returns = 0
gamma = 0.99
for t in range(horizon):
# 策略选择动作
action = policy(z)
# 模型预测下一个状态和奖励
z_next = model.predict_next(z, action)
reward = model.predict_reward(z, action)
trajectory.append((z, action, reward, z_next))
returns += (gamma ** t) * reward
z = z_next
return trajectory, returns不确定性感知规划
为什么要考虑不确定性?
复合误差问题:
真实: s₀ ──▶ s₁ ──▶ s₂ ──▶ s₃ ──▶ ...
│ │ │
模型: s₀ ──▶ ŝ₁' ──▶ ŝ₂' ──▶ ŝ₃' ──▶ ...
↓ ↓ ↓
误差ε₁ 误差ε₂ 误差ε₃
累积误差: ε₁ + ε₂ + ε₃ + ... 越来越大
1. Aleatoric vs Epistemic 不确定性
| 类型 | 含义 | 来源 | 解决方案 |
|---|---|---|---|
| Aleatoric | 固有随机性 | 环境本质随机 | 积分/期望 |
| Epistemic | 模型不确定性 | 数据不足 | Bayesian/Ensemble |
2. Ensemble 方法
使用多个模型估计不确定性:
class EnsembleModel:
def __init__(self, models):
self.models = models # N 个独立模型
def predict_mean(self, state, action):
preds = [m.predict(state, action) for m in self.models]
return torch.stack(preds).mean(dim=0)
def predict_uncertainty(self, state, action):
preds = [m.predict(state, action) for m in self.models]
preds = torch.stack(preds)
mean = preds.mean(dim=0)
std = preds.std(dim=0) # Epistemic 不确定性
return mean, std
def select_ensemble_action(self, state, policy, planning_horizon):
"""
使用高不确定性惩罚的规划
"""
# ... 规划逻辑,使用 std 作为惩罚项
return action3. Bayesian 世界模型
使用变分推断建模不确定性:
class BayesianWorldModel:
def __init__(self):
# 变分分布参数
self.prior_params = None
self.posterior_params = None
def forward(self, z_prev, action, obs=None):
if obs is not None:
# 后验分布
q_params = self.encoder(z_prev, action, obs)
z, kl = self.sample_posterior(q_params)
else:
# 先验分布
z, kl = self.sample_prior(z_prev, action)
return z, kl
def sample_posterior(self, params):
# 从后验分布采样
mu, logvar = params.chunk(2, dim=-1)
std = torch.exp(0.5 * logvar)
z = mu + std * torch.randn_like(std)
return z, self.kl_divergence(mu, logvar)4. 信息增益探索
奖励不确定性高的动作可以促进学习:
模型学习策略
1. 数据收集策略
| 策略 | 描述 | 优势 | 劣势 |
|---|---|---|---|
| 随机收集 | 随机采样动作 | 简单 | 样本效率低 |
| 基于不确定性 | 收集模型不确定区域 | 高效 | 可能不稳定 |
| Dyna风格 | 混合真实和想象数据 | 平衡 | 实现复杂 |
| TSDE | 时序差分估计 | 时序一致性 | 计算开销 |
2. 模型正则化
防止模型过拟合训练数据:
class RegularizedModel:
def __init__(self, model, lambda_reg=0.01):
self.model = model
self.lambda_reg = lambda_reg
def loss(self, batch):
# 标准预测损失
pred_next, pred_reward = self.model(batch.state, batch.action)
pred_loss = F.mse_loss(pred_next, batch.next_state)
reward_loss = F.mse_loss(pred_reward, batch.reward)
# 奖励平坦化正则化
reward_reg = self.lambda_reg * batch.reward.std()
# 动力学平滑正则化
dynamics_reg = self.lambda_reg * self.smoothness_reg(batch.state, batch.action)
return pred_loss + reward_loss + reward_reg + dynamics_reg
def smoothness_reg(self, states, actions):
"""
相邻状态的预测应该平滑变化
"""
delta_s = states[:, 1:] - states[:, :-1]
delta_a = actions[:, 1:] - actions[:, :-1]
return (delta_s.abs() * delta_a.abs()).mean()3. 模型错误检测与处理
class ModelErrorDetector:
def __init__(self, model, threshold=2.0):
self.model = model
self.threshold = threshold
self.error_buffer = deque(maxlen=100)
def detect_error(self, state, action, next_state):
pred_next = self.model.predict_next(state, action)
error = (pred_next - next_state).abs().mean()
self.error_buffer.append(error)
# 检测异常高的错误
mean_error = np.mean(self.error_buffer)
if error > mean_error * self.threshold:
return True
return False
def should_trust_model(self, state):
"""
根据历史误差决定是否信任模型
"""
if len(self.error_buffer) < 10:
return True
recent_errors = list(self.error_buffer)[-10:]
return np.mean(recent_errors) < self.threshold规划-执行策略
1. Replanning
每次执行前重新规划:
class ReplanningController:
def __init__(self, model, planner, horizon=10):
self.model = model
self.planner = planner
self.horizon = horizon
def act(self, state):
# 使用当前模型规划
action = self.planner.plan(state, self.horizon)
return action # 仅执行第一个动作2. Model Predictive Path Integral Control (MPPI)
class MPPI:
def __init__(self, model, horizon=20, n_samples=200, temperature=1.0):
self.model = model
self.horizon = horizon
self.n_samples = n_samples
self.temperature = temperature
def act(self, state):
# 采样动作序列
noise = torch.randn(self.n_samples, self.horizon, self.action_dim)
noise = noise * self.temperature
# 评估所有序列
returns = self.evaluate_sequences(state, noise)
# 加权平均(softmax)
weights = F.softmax(returns / self.temperature, dim=0)
action = (noise[:, 0] * weights).sum(dim=0)
return action
def evaluate_sequences(self, state, noise):
returns = torch.zeros(self.n_samples)
s = state
for t in range(self.horizon):
# 添加噪声的动作
action = self.policy_mean + noise[:, t]
action = torch.clamp(action, self.action_min, self.action_max)
# 累积奖励
r = self.model.predict_reward(s, action)
returns += r * (self.gamma ** t)
# 更新状态
s = self.model.predict_next(s, action)
return returns与 Dreamer 的结合
Dreamer 等方法将 MBRL 和 MFRL 结合:
| 组件 | Dreamer | 无模型方法 |
|---|---|---|
| 探索 | 模型生成数据 | 内在动机 |
| 规划 | 潜在空间想象 | Q值/策略 |
| 更新 | 端到端梯度 | 通常分开 |
Dreamer 的独特设计
- 潜在空间规划:在压缩空间中进行想象
- 动态模型主导:大部分计算用于改进模型
- 策略学习通过反传:梯度通过整个想象轨迹
实践建议
1. 模型选择
| 环境特性 | 推荐模型 |
|---|---|
| 低维连续 | Gaussian Process |
| 高维图像 | Neural Network (RSSM) |
| 离散 | Tabular / NN |
| 多模态 | VAE / GAN |
2. 训练技巧
- 使用更大的 batch size 训练模型
- 集成多个模型估计不确定性
- 定期重新训练模型
- 使用数据增强
3. 调试清单
- 模型预测误差是否随 horizon 增长?
- 不确定性估计是否合理?
- 规划 horizon 是否合适?
- 模型和策略是否同步更新?