基于模型的强化学习

基于模型的强化学习(Model-Based Reinforcement Learning, MBRL)通过学习环境模型来进行决策和规划。本文档系统介绍MBRL的理论基础、核心算法和实践方法。

基本概念

形式化定义

MDP 框架下,MBRL 学习一个环境模型:

该模型用于:

  1. 规划:生成合成经验
  2. 预测:评估动作价值
  3. 想象:模拟未来轨迹

与无模型 RL 的对比

维度无模型 (Model-Free)基于模型 (Model-Based)
学习目标价值函数 / 策略 转移概率
样本来源真实环境交互真实 + 模型生成
样本效率
最优性保证理论上可证
模型误差累积误差问题

核心优势

样本效率对比示意图:

     ▲ 渐近性能
     │
     │      ╭───────── Model-Free
     │     ╱
     │    ╱
     │   ╱  ╭─────── Model-Based
     │  ╱ ╱
     │ ╱╱
     │╱
     └──────────────────────────────▶
       ↑              样本数量
       ↑
    真实环境交互

世界模型学习

模型类型

1. 观测模型 (Observational Model)

直接预测下一个观测:

优点:训练简单
缺点:可能忽略关键状态信息

2. 潜在模型 (Latent Model)

学习紧凑潜在表示:

优点:更好的泛化
缺点:表示学习困难

3. 奖励模型

预测即时奖励:

训练目标

规划算法

1. 模型预测控制 (MPC)

MPC 在每个时间步执行以下步骤:

Algorithm MPC:
  for t = 0, 1, 2, ... do
    1. 获取当前状态 s_t
    2. 采样 N 条候选动作序列 {a_t^{(i)}, ..., a_{t+H-1}^{(i)}}_{i=1}^N
    3. 使用模型评估每条序列:
       for i = 1 to N do
         for k = 0 to H-1 do
           ŝ_{t+k+1}^{(i)} = f(ŝ_{t+k}^{(i)}, a_{t+k}^{(i)})
           r̂_{t+k}^{(i)} = r(ŝ_{t+k}^{(i)}, a_{t+k}^{(i)})
         J^{(i)} = Σ_{k=0}^{H-1} γ^k r̂_{t+k}^{(i)}
       end for
    4. 选择最优动作: a_t* = argmax_i J^{(i)}
    5. 执行 a_t*,观察 r_t, s_{t+1}
    6. 更新模型 (可选)
  end for

2. 交叉熵方法 (CEM)

CEM 是一种用于轨迹优化的进化算法:

def cem_planning(model, s0, horizon, n_samples=100, n_elites=10, n_iters=5):
    """
    交叉熵方法进行轨迹优化
    """
    mean = torch.zeros(horizon, action_dim)
    std = torch.ones(horizon, action_dim)
    
    for _ in range(n_iters):
        # 1. 采样动作序列
        actions = torch.randn(n_samples, horizon, action_dim) * std + mean
        
        # 2. 评估轨迹
        returns = []
        s = s0
        for i in range(n_samples):
            G = 0
            for t in range(horizon):
                r = model.predict_reward(s, actions[i, t])
                s = model.predict_next_state(s, actions[i, t])
                G += r * (gamma ** t)
            returns.append(G)
        
        returns = torch.stack(returns)
        
        # 3. 选择精英样本
        elite_idx = returns.topk(n_elites).indices
        elite_actions = actions[elite_idx]
        
        # 4. 更新分布
        mean = elite_actions.mean(dim=0)
        std = elite_actions.std(dim=0) + 1e-6
    
    return mean[0]  # 返回第一个动作

3. 随机 MPC

使用随机策略进行规划:

class StochasticMPC:
    def __init__(self, model, horizon=10, n_samples=50):
        self.model = model
        self.horizon = horizon
        self.n_samples = n_samples
    
    def act(self, state):
        best_action = None
        best_value = float('-inf')
        
        for _ in range(self.n_samples):
            # 随机采样动作序列
            actions = self.sample_action_sequence()
            
            # 计算轨迹价值
            value = self.evaluate_trajectory(state, actions)
            
            if value > best_value:
                best_value = value
                best_action = actions[0]
        
        return best_action
    
    def evaluate_trajectory(self, state, actions):
        G = 0
        s = state
        gamma = 0.99
        
        for t, a in enumerate(actions):
            r = self.model.predict_reward(s, a)
            s = self.model.predict_next(s, a)
            G += (gamma ** t) * r
        
        return G

4. LQG (Linear Quadratic Gaussian)

假设系统动力学是线性的,奖励是二次的:

优点:解析可解
缺点:仅适用于线性系统

5. 冥想规划 (Dreaming)

Dreamer 等方法在潜在空间进行规划:

def imagine_rollout(model, z0, policy, horizon):
    """
    在潜在空间进行想象 Rollout
    """
    trajectory = []
    z = z0
    returns = 0
    gamma = 0.99
    
    for t in range(horizon):
        # 策略选择动作
        action = policy(z)
        
        # 模型预测下一个状态和奖励
        z_next = model.predict_next(z, action)
        reward = model.predict_reward(z, action)
        
        trajectory.append((z, action, reward, z_next))
        returns += (gamma ** t) * reward
        z = z_next
    
    return trajectory, returns

不确定性感知规划

为什么要考虑不确定性?

复合误差问题:

真实:  s₀ ──▶ s₁ ──▶ s₂ ──▶ s₃ ──▶ ...
                │       │       │
模型:  s₀ ──▶ ŝ₁' ──▶ ŝ₂' ──▶ ŝ₃' ──▶ ...
                ↓       ↓       ↓
             误差ε₁   误差ε₂   误差ε₃
              
累积误差: ε₁ + ε₂ + ε₃ + ... 越来越大

1. Aleatoric vs Epistemic 不确定性

类型含义来源解决方案
Aleatoric固有随机性环境本质随机积分/期望
Epistemic模型不确定性数据不足Bayesian/Ensemble

2. Ensemble 方法

使用多个模型估计不确定性:

class EnsembleModel:
    def __init__(self, models):
        self.models = models  # N 个独立模型
    
    def predict_mean(self, state, action):
        preds = [m.predict(state, action) for m in self.models]
        return torch.stack(preds).mean(dim=0)
    
    def predict_uncertainty(self, state, action):
        preds = [m.predict(state, action) for m in self.models]
        preds = torch.stack(preds)
        
        mean = preds.mean(dim=0)
        std = preds.std(dim=0)  # Epistemic 不确定性
        
        return mean, std
    
    def select_ensemble_action(self, state, policy, planning_horizon):
        """
        使用高不确定性惩罚的规划
        """
        # ... 规划逻辑,使用 std 作为惩罚项
        return action

3. Bayesian 世界模型

使用变分推断建模不确定性:

class BayesianWorldModel:
    def __init__(self):
        # 变分分布参数
        self.prior_params = None
        self.posterior_params = None
    
    def forward(self, z_prev, action, obs=None):
        if obs is not None:
            # 后验分布
            q_params = self.encoder(z_prev, action, obs)
            z, kl = self.sample_posterior(q_params)
        else:
            # 先验分布
            z, kl = self.sample_prior(z_prev, action)
        
        return z, kl
    
    def sample_posterior(self, params):
        # 从后验分布采样
        mu, logvar = params.chunk(2, dim=-1)
        std = torch.exp(0.5 * logvar)
        z = mu + std * torch.randn_like(std)
        return z, self.kl_divergence(mu, logvar)

4. 信息增益探索

奖励不确定性高的动作可以促进学习:

模型学习策略

1. 数据收集策略

策略描述优势劣势
随机收集随机采样动作简单样本效率低
基于不确定性收集模型不确定区域高效可能不稳定
Dyna风格混合真实和想象数据平衡实现复杂
TSDE时序差分估计时序一致性计算开销

2. 模型正则化

防止模型过拟合训练数据:

class RegularizedModel:
    def __init__(self, model, lambda_reg=0.01):
        self.model = model
        self.lambda_reg = lambda_reg
    
    def loss(self, batch):
        # 标准预测损失
        pred_next, pred_reward = self.model(batch.state, batch.action)
        pred_loss = F.mse_loss(pred_next, batch.next_state)
        reward_loss = F.mse_loss(pred_reward, batch.reward)
        
        # 奖励平坦化正则化
        reward_reg = self.lambda_reg * batch.reward.std()
        
        # 动力学平滑正则化
        dynamics_reg = self.lambda_reg * self.smoothness_reg(batch.state, batch.action)
        
        return pred_loss + reward_loss + reward_reg + dynamics_reg
    
    def smoothness_reg(self, states, actions):
        """
        相邻状态的预测应该平滑变化
        """
        delta_s = states[:, 1:] - states[:, :-1]
        delta_a = actions[:, 1:] - actions[:, :-1]
        return (delta_s.abs() * delta_a.abs()).mean()

3. 模型错误检测与处理

class ModelErrorDetector:
    def __init__(self, model, threshold=2.0):
        self.model = model
        self.threshold = threshold
        self.error_buffer = deque(maxlen=100)
    
    def detect_error(self, state, action, next_state):
        pred_next = self.model.predict_next(state, action)
        error = (pred_next - next_state).abs().mean()
        
        self.error_buffer.append(error)
        
        # 检测异常高的错误
        mean_error = np.mean(self.error_buffer)
        if error > mean_error * self.threshold:
            return True
        return False
    
    def should_trust_model(self, state):
        """
        根据历史误差决定是否信任模型
        """
        if len(self.error_buffer) < 10:
            return True
        
        recent_errors = list(self.error_buffer)[-10:]
        return np.mean(recent_errors) < self.threshold

规划-执行策略

1. Replanning

每次执行前重新规划:

class ReplanningController:
    def __init__(self, model, planner, horizon=10):
        self.model = model
        self.planner = planner
        self.horizon = horizon
    
    def act(self, state):
        # 使用当前模型规划
        action = self.planner.plan(state, self.horizon)
        return action  # 仅执行第一个动作

2. Model Predictive Path Integral Control (MPPI)

class MPPI:
    def __init__(self, model, horizon=20, n_samples=200, temperature=1.0):
        self.model = model
        self.horizon = horizon
        self.n_samples = n_samples
        self.temperature = temperature
    
    def act(self, state):
        # 采样动作序列
        noise = torch.randn(self.n_samples, self.horizon, self.action_dim)
        noise = noise * self.temperature
        
        # 评估所有序列
        returns = self.evaluate_sequences(state, noise)
        
        # 加权平均(softmax)
        weights = F.softmax(returns / self.temperature, dim=0)
        action = (noise[:, 0] * weights).sum(dim=0)
        
        return action
    
    def evaluate_sequences(self, state, noise):
        returns = torch.zeros(self.n_samples)
        s = state
        
        for t in range(self.horizon):
            # 添加噪声的动作
            action = self.policy_mean + noise[:, t]
            action = torch.clamp(action, self.action_min, self.action_max)
            
            # 累积奖励
            r = self.model.predict_reward(s, action)
            returns += r * (self.gamma ** t)
            
            # 更新状态
            s = self.model.predict_next(s, action)
        
        return returns

与 Dreamer 的结合

Dreamer 等方法将 MBRL 和 MFRL 结合:

组件Dreamer无模型方法
探索模型生成数据内在动机
规划潜在空间想象Q值/策略
更新端到端梯度通常分开

Dreamer 的独特设计

  1. 潜在空间规划:在压缩空间中进行想象
  2. 动态模型主导:大部分计算用于改进模型
  3. 策略学习通过反传:梯度通过整个想象轨迹

实践建议

1. 模型选择

环境特性推荐模型
低维连续Gaussian Process
高维图像Neural Network (RSSM)
离散Tabular / NN
多模态VAE / GAN

2. 训练技巧

  • 使用更大的 batch size 训练模型
  • 集成多个模型估计不确定性
  • 定期重新训练模型
  • 使用数据增强

3. 调试清单

  • 模型预测误差是否随 horizon 增长?
  • 不确定性估计是否合理?
  • 规划 horizon 是否合适?
  • 模型和策略是否同步更新?

参考文献

相关主题