模型预测控制 (Model Predictive Control, MPC)

1. 概述

模型预测控制是一种基于模型的闭环控制策略,通过在每个时刻求解有限时域的优化问题来生成控制动作。1 在强化学习语境下,MPC是一类重要的**基于模型的强化学习(Model-Based RL)**方法。

1.1 MPC vs 无模型RL

方面无模型RLMPC (基于模型RL)
数据效率
样本需求
样本采集在线/离线在线
泛化能力弱(受模型误差影响)
计算成本推理时低在线优化

1.2 MPC核心思想

MPC在每个控制时刻:

  1. 预测未来 步的状态和奖励
  2. 优化控制序列使累积奖励最大化
  3. 仅执行第一个控制动作
  4. 滚动到下一时刻,重复上述过程
时刻 t:
┌─────────────────────────────────────────────────────────────┐
│  预测视野 H                                                │
│  ├── s_t ──→ s_{t+1} ──→ ... ──→ s_{t+H}                │
│  └── a_t ──→ a_{t+1} ──→ ... ──→ a_{t+H-1}              │
│                                                             │
│  优化: min_{a_t,...,a_{t+H-1}} Σ_{i=0}^{H-1} r(s_{t+i},a_{t+i})│
│  执行: 仅 a_t                                              │
└─────────────────────────────────────────────────────────────┘
          ↓
时刻 t+1: 重新预测和优化

2. MPC基础理论

2.1 问题 formulation

有限时域MPC问题

其中:

  • :预测视野(Horizon)
  • :奖励函数
  • :系统动力学模型
  • :终端成本
  • :动作约束集

2.2 滚动时域控制

MPC采用滚动时域策略:

def mpc_controller(env, dynamics_model, horizon=10, num_samples=1000):
    state = env.reset()
    
    while not done:
        # 1. 优化控制序列
        best_actions = optimize_sequence(state, dynamics_model, horizon, num_samples)
        
        # 2. 执行第一个动作
        action = best_actions[0]
        state, reward, done, _ = env.step(action)
    
    return trajectory

2.3 终端成本的作用

终端成本 近似无限视野价值函数:

  • 无终端成本 步后的行为不可控
  • 正确终端成本:等效于无限视野最优

3. 动力学模型学习

3.1 模型架构

神经网络动力学模型

class DynamicsModel(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super().__init__()
        # 输出状态增量
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, state_dim)
        )
    
    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        delta_state = self.net(x)
        return state + delta_state  # 预测下一状态
    
    def predict(self, state, action):
        with torch.no_grad():
            state_t = torch.FloatTensor(state).unsqueeze(0)
            action_t = torch.FloatTensor(action).unsqueeze(0)
            return self.forward(state_t, action_t).numpy()[0]

3.2 概率动力学模型

不确定性估计对于MPC至关重要:

class ProbabilisticDynamicsModel(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256, output_dim=None):
        super().__init__()
        self.output_dim = output_dim or state_dim
        
        # 均值和方差网络
        self.mean_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, self.output_dim)
        )
        
        self.log_var_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, self.output_dim)
        )
    
    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        mean = self.mean_net(x)
        log_var = torch.clamp(self.log_var_net(x), -5, 2)
        var = log_var.exp()
        return mean, var
    
    def sample(self, state, action):
        mean, var = self.forward(state, action)
        std = var.sqrt()
        return torch.distributions.Normal(mean, std).sample()

3.3 模型不确定性

两种不确定性

类型描述处理方法
偶然不确定性 (Aleatoric)数据固有噪声预测方差
认知不确定性 (Epistemic)模型不确定性贝叶斯/集成

集成方法

class EnsembleDynamicsModel:
    def __init__(self, state_dim, action_dim, n_models=5):
        self.models = [
            ProbabilisticDynamicsModel(state_dim, action_dim)
            for _ in range(n_models)
        ]
    
    def predict(self, state, action):
        predictions = [model.predict(state, action) for model in self.models]
        # 预测均值
        mean = np.mean(predictions, axis=0)
        # 预测方差(集成不确定性)
        variance = np.var(predictions, axis=0)
        return mean, variance

4. PETS算法

4.1 算法概述

PETS (Probabilistic Ensembles with Trajectory Sampling)2 结合了:

  1. 概率动力学模型集成
  2. 轨迹采样用于不确定性传播

4.2 核心步骤

Algorithm: PETS

1. 训练概率集成模型:
   - 使用历史数据训练多个动力学模型
   - 每个模型预测均值和方差

2. MPC控制循环:
   for each step:
       for each candidate action sequence:
           # 交叉熵方法优化
           采样多条轨迹
           选择top-k轨迹
           拟合top-k分布
           重复优化
       
       执行最优序列的第一个动作

4.3 交叉熵方法 (CEM)

CEM是一种用于优化序列决策的进化算法:

class CrossEntropyMethod:
    def __init__(self, horizon, action_dim, n_samples=100, n_elites=10, n_iterations=5):
        self.horizon = horizon
        self.action_dim = action_dim
        self.n_samples = n_samples
        self.n_elites = n_elites
        self.n_iterations = n_iterations
    
    def optimize(self, dynamics_model, state, reward_fn):
        # 初始化:均匀分布
        mean = np.zeros((self.horizon, self.action_dim))
        std = np.ones((self.horizon, self.action_dim))
        
        for _ in range(self.n_iterations):
            # 1. 采样
            actions = np.random.normal(mean, std, 
                                     (self.n_samples, self.horizon, self.action_dim))
            
            # 2. 评估轨迹
            returns = []
            for i in range(self.n_samples):
                return_i = self._evaluate(dynamics_model, state, actions[i], reward_fn)
                returns.append(return_i)
            
            # 3. 选择精英
            elites = actions[np.argsort(returns)[-self.n_elites:]]
            
            # 4. 更新分布
            mean = np.mean(elites, axis=0)
            std = np.std(elites, axis=0) + 1e-6
        
        return mean[0]  # 返回第一个动作
    
    def _evaluate(self, model, state, actions, reward_fn):
        total_reward = 0
        s = state.copy()
        
        for a in actions:
            s_next = model.predict(s, a)
            total_reward += reward_fn(s, a)
            s = s_next
        
        return total_reward

5. 随机MPC策略

5.1 两种采样策略

策略描述优势
确定性传播使用预测均值简单
随机采样从预测分布采样不确定性传播

5.2 CVaR优化

条件风险价值 (CVaR) 优化关注最坏情况:

def cvar_optimize(trajectories, returns, alpha=0.1):
    """
    CVaR optimization: optimize expected return in the worst alpha cases
    """
    n = len(returns)
    sorted_indices = np.argsort(returns)
    cutoff = int(n * alpha)
    
    # 在最差的alpha比例样本中优化
    worst_returns = returns[sorted_indices[:cutoff]]
    worst_trajectories = [trajectories[i] for i in sorted_indices[:cutoff]]
    
    return worst_trajectories, worst_returns

6. 完整MPC实现

6.1 MPC控制器

class MPCController:
    def __init__(self, env, dynamics_model, 
                 horizon=10, n_samples=1000, 
                 optimization='cem', beta=0.9):
        self.env = env
        self.model = dynamics_model
        self.horizon = horizon
        self.n_samples = n_samples
        self.beta = beta  # CVaR weight
        
        self.optimizer = CrossEntropyMethod(
            horizon=horizon,
            action_dim=env.action_space.shape[0],
            n_samples=n_samples,
            n_elites=n_samples // 10,
            n_iterations=5
        )
    
    def get_action(self, state):
        # 使用CEM优化
        best_action = self.optimizer.optimize(
            self.model, 
            state,
            self.env.compute_reward
        )
        return best_action
    
    def train_dynamics(self, dataset, epochs=50, batch_size=256):
        """训练动力学模型"""
        optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
        
        for epoch in range(epochs):
            for batch in dataset.sample(batch_size):
                states, actions, next_states = batch
                
                # 预测
                pred_next = self.model(states, actions)
                
                # MSE损失
                loss = F.mse_loss(pred_next, next_states)
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
 
 
class ReplayBuffer:
    """数据收集缓冲区"""
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.FloatTensor(np.array(states)),
            torch.FloatTensor(np.array(actions)),
            torch.FloatTensor(np.array(next_states)),
            torch.FloatTensor(rewards).unsqueeze(1),
            torch.FloatTensor(dones).unsqueeze(1)
        )
 
 
def collect_data(env, policy, buffer, n_steps=1000):
    """使用给定策略收集数据"""
    state, _ = env.reset()
    for _ in range(n_steps):
        action = policy.get_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        buffer.push(state, action, reward, next_state, done)
        
        state = next_state
        if done:
            state, _ = env.reset()

6.2 完整训练循环

def train_mpc(env, num_iterations=100, n_steps_per_iter=1000):
    """MPC完整训练流程"""
    
    # 初始化
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]
    
    model = EnsembleDynamicsModel(state_dim, action_dim, n_models=5)
    mpc = MPCController(env, model)
    buffer = ReplayBuffer(capacity=100000)
    
    # 随机初始化收集数据
    print("Collecting initial data...")
    random_policy = lambda s: env.action_space.sample()
    collect_data(env, type('Obj', (), {'get_action': random_policy})(), 
                buffer, n_steps=5000)
    
    for iteration in range(num_iterations):
        # 1. 训练动力学模型
        print(f"Iteration {iteration}: Training dynamics model...")
        for _ in range(100):  # 多个epoch
            batch = buffer.sample(batch_size=256)
            states, actions, next_states, _, _ = batch
            pred = model(states, actions)
            loss = F.mse_loss(pred, next_states)
            model.optimizer.zero_grad()
            loss.backward()
            model.optimizer.step()
        
        # 2. 使用MPC收集数据
        print(f"Collecting data with MPC...")
        state, _ = env.reset()
        episode_reward = 0
        
        for step in range(n_steps_per_iter):
            action = mpc.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            buffer.push(state, action, reward, next_state, done)
            episode_reward += reward
            
            state = next_state
            if done:
                state, _ = env.reset()
        
        print(f"Episode reward: {episode_reward:.2f}")
    
    return model, mpc

7. 理论基础

7.1 收敛性分析

定理:如果动力学模型准确且优化是凸的,MPC控制器渐近稳定。

预测误差的影响

其中 是收缩系数, 是模型误差。

7.2 样本复杂度

定理:学习动力学模型的样本复杂度为:

其中 是目标误差, 是失败概率。


8. 与无模型RL的对比

8.1 优势

方面MPC
样本效率高(每个样本可多次使用)
安全性可添加约束
可解释性显式规划
棕盒模型可解释

8.2 劣势

方面MPC
模型误差可能导致性能下降
计算成本在线优化
泛化能力受模型限制

8.3 混合方法

Dyna-style方法结合两者:

def dyna_mpc(env, model, policy, buffer):
    """
    混合MPC和无模型学习
    """
    # 1. 使用MPC更新策略
    mpc_action = mpc.get_action(state)
    
    # 2. 使用真实环境更新(无模型)
    real_reward = env.step(mpc_action)
    buffer.push(...)
    
    # 3. 使用模型生成模拟数据
    for _ in range(n_sim):
        sim_state, sim_action = buffer.sample(1)
        sim_next = model.predict(sim_state, sim_action)
        buffer.push(sim_state, sim_action, reward_fn(sim_state, sim_action), sim_next)
    
    # 4. 无模型更新
    policy.update(buffer)

9. 应用场景

9.1 机器人控制

  • 低级运动控制
  • 操纵任务
  • 运动规划

9.2 自动驾驶

  • 轨迹规划
  • 车辆控制
  • 安全约束

9.3 工业过程

  • 过程控制
  • 资源优化
  • 约束满足

10. 总结

MPC是强大的基于模型控制方法:

  1. 高样本效率:每个数据点可用于学习模型
  2. 安全可控:可添加硬约束
  3. 可解释:显式规划过程
  4. 挑战:模型误差、在线计算

最佳实践

  • 使用概率模型捕获不确定性
  • 结合交叉熵方法优化
  • 定期重新训练模型

参考资料


相关主题

Footnotes

  1. Rawlings, J. B., & Mayne, D. Q. (2009). Model Predictive Control: Theory and Design. Nob Hill Publishing.

  2. Chua, K., Calandra, R., McAllister, R., & Levine, S. (2018). Deep Reinforcement Learning in a Handful of Trials using Probabilistic Dynamics Models. NeurIPS.