模型预测控制 (Model Predictive Control, MPC)
1. 概述
模型预测控制是一种基于模型的闭环控制策略,通过在每个时刻求解有限时域的优化问题来生成控制动作。1 在强化学习语境下,MPC是一类重要的**基于模型的强化学习(Model-Based RL)**方法。
1.1 MPC vs 无模型RL
| 方面 | 无模型RL | MPC (基于模型RL) |
|---|---|---|
| 数据效率 | 低 | 高 |
| 样本需求 | 大 | 小 |
| 样本采集 | 在线/离线 | 在线 |
| 泛化能力 | 强 | 弱(受模型误差影响) |
| 计算成本 | 推理时低 | 在线优化 |
1.2 MPC核心思想
MPC在每个控制时刻:
- 预测未来 步的状态和奖励
- 优化控制序列使累积奖励最大化
- 仅执行第一个控制动作
- 滚动到下一时刻,重复上述过程
时刻 t:
┌─────────────────────────────────────────────────────────────┐
│ 预测视野 H │
│ ├── s_t ──→ s_{t+1} ──→ ... ──→ s_{t+H} │
│ └── a_t ──→ a_{t+1} ──→ ... ──→ a_{t+H-1} │
│ │
│ 优化: min_{a_t,...,a_{t+H-1}} Σ_{i=0}^{H-1} r(s_{t+i},a_{t+i})│
│ 执行: 仅 a_t │
└─────────────────────────────────────────────────────────────┘
↓
时刻 t+1: 重新预测和优化
2. MPC基础理论
2.1 问题 formulation
有限时域MPC问题:
其中:
- :预测视野(Horizon)
- :奖励函数
- :系统动力学模型
- :终端成本
- :动作约束集
2.2 滚动时域控制
MPC采用滚动时域策略:
def mpc_controller(env, dynamics_model, horizon=10, num_samples=1000):
state = env.reset()
while not done:
# 1. 优化控制序列
best_actions = optimize_sequence(state, dynamics_model, horizon, num_samples)
# 2. 执行第一个动作
action = best_actions[0]
state, reward, done, _ = env.step(action)
return trajectory2.3 终端成本的作用
终端成本 近似无限视野价值函数:
- 无终端成本: 步后的行为不可控
- 正确终端成本:等效于无限视野最优
3. 动力学模型学习
3.1 模型架构
神经网络动力学模型:
class DynamicsModel(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=256):
super().__init__()
# 输出状态增量
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, state_dim)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
delta_state = self.net(x)
return state + delta_state # 预测下一状态
def predict(self, state, action):
with torch.no_grad():
state_t = torch.FloatTensor(state).unsqueeze(0)
action_t = torch.FloatTensor(action).unsqueeze(0)
return self.forward(state_t, action_t).numpy()[0]3.2 概率动力学模型
不确定性估计对于MPC至关重要:
class ProbabilisticDynamicsModel(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=256, output_dim=None):
super().__init__()
self.output_dim = output_dim or state_dim
# 均值和方差网络
self.mean_net = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, self.output_dim)
)
self.log_var_net = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, self.output_dim)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
mean = self.mean_net(x)
log_var = torch.clamp(self.log_var_net(x), -5, 2)
var = log_var.exp()
return mean, var
def sample(self, state, action):
mean, var = self.forward(state, action)
std = var.sqrt()
return torch.distributions.Normal(mean, std).sample()3.3 模型不确定性
两种不确定性:
| 类型 | 描述 | 处理方法 |
|---|---|---|
| 偶然不确定性 (Aleatoric) | 数据固有噪声 | 预测方差 |
| 认知不确定性 (Epistemic) | 模型不确定性 | 贝叶斯/集成 |
集成方法:
class EnsembleDynamicsModel:
def __init__(self, state_dim, action_dim, n_models=5):
self.models = [
ProbabilisticDynamicsModel(state_dim, action_dim)
for _ in range(n_models)
]
def predict(self, state, action):
predictions = [model.predict(state, action) for model in self.models]
# 预测均值
mean = np.mean(predictions, axis=0)
# 预测方差(集成不确定性)
variance = np.var(predictions, axis=0)
return mean, variance4. PETS算法
4.1 算法概述
PETS (Probabilistic Ensembles with Trajectory Sampling)2 结合了:
- 概率动力学模型集成
- 轨迹采样用于不确定性传播
4.2 核心步骤
Algorithm: PETS
1. 训练概率集成模型:
- 使用历史数据训练多个动力学模型
- 每个模型预测均值和方差
2. MPC控制循环:
for each step:
for each candidate action sequence:
# 交叉熵方法优化
采样多条轨迹
选择top-k轨迹
拟合top-k分布
重复优化
执行最优序列的第一个动作
4.3 交叉熵方法 (CEM)
CEM是一种用于优化序列决策的进化算法:
class CrossEntropyMethod:
def __init__(self, horizon, action_dim, n_samples=100, n_elites=10, n_iterations=5):
self.horizon = horizon
self.action_dim = action_dim
self.n_samples = n_samples
self.n_elites = n_elites
self.n_iterations = n_iterations
def optimize(self, dynamics_model, state, reward_fn):
# 初始化:均匀分布
mean = np.zeros((self.horizon, self.action_dim))
std = np.ones((self.horizon, self.action_dim))
for _ in range(self.n_iterations):
# 1. 采样
actions = np.random.normal(mean, std,
(self.n_samples, self.horizon, self.action_dim))
# 2. 评估轨迹
returns = []
for i in range(self.n_samples):
return_i = self._evaluate(dynamics_model, state, actions[i], reward_fn)
returns.append(return_i)
# 3. 选择精英
elites = actions[np.argsort(returns)[-self.n_elites:]]
# 4. 更新分布
mean = np.mean(elites, axis=0)
std = np.std(elites, axis=0) + 1e-6
return mean[0] # 返回第一个动作
def _evaluate(self, model, state, actions, reward_fn):
total_reward = 0
s = state.copy()
for a in actions:
s_next = model.predict(s, a)
total_reward += reward_fn(s, a)
s = s_next
return total_reward5. 随机MPC策略
5.1 两种采样策略
| 策略 | 描述 | 优势 |
|---|---|---|
| 确定性传播 | 使用预测均值 | 简单 |
| 随机采样 | 从预测分布采样 | 不确定性传播 |
5.2 CVaR优化
条件风险价值 (CVaR) 优化关注最坏情况:
def cvar_optimize(trajectories, returns, alpha=0.1):
"""
CVaR optimization: optimize expected return in the worst alpha cases
"""
n = len(returns)
sorted_indices = np.argsort(returns)
cutoff = int(n * alpha)
# 在最差的alpha比例样本中优化
worst_returns = returns[sorted_indices[:cutoff]]
worst_trajectories = [trajectories[i] for i in sorted_indices[:cutoff]]
return worst_trajectories, worst_returns6. 完整MPC实现
6.1 MPC控制器
class MPCController:
def __init__(self, env, dynamics_model,
horizon=10, n_samples=1000,
optimization='cem', beta=0.9):
self.env = env
self.model = dynamics_model
self.horizon = horizon
self.n_samples = n_samples
self.beta = beta # CVaR weight
self.optimizer = CrossEntropyMethod(
horizon=horizon,
action_dim=env.action_space.shape[0],
n_samples=n_samples,
n_elites=n_samples // 10,
n_iterations=5
)
def get_action(self, state):
# 使用CEM优化
best_action = self.optimizer.optimize(
self.model,
state,
self.env.compute_reward
)
return best_action
def train_dynamics(self, dataset, epochs=50, batch_size=256):
"""训练动力学模型"""
optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
for epoch in range(epochs):
for batch in dataset.sample(batch_size):
states, actions, next_states = batch
# 预测
pred_next = self.model(states, actions)
# MSE损失
loss = F.mse_loss(pred_next, next_states)
optimizer.zero_grad()
loss.backward()
optimizer.step()
class ReplayBuffer:
"""数据收集缓冲区"""
def __init__(self, capacity=100000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.FloatTensor(np.array(states)),
torch.FloatTensor(np.array(actions)),
torch.FloatTensor(np.array(next_states)),
torch.FloatTensor(rewards).unsqueeze(1),
torch.FloatTensor(dones).unsqueeze(1)
)
def collect_data(env, policy, buffer, n_steps=1000):
"""使用给定策略收集数据"""
state, _ = env.reset()
for _ in range(n_steps):
action = policy.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
buffer.push(state, action, reward, next_state, done)
state = next_state
if done:
state, _ = env.reset()6.2 完整训练循环
def train_mpc(env, num_iterations=100, n_steps_per_iter=1000):
"""MPC完整训练流程"""
# 初始化
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
model = EnsembleDynamicsModel(state_dim, action_dim, n_models=5)
mpc = MPCController(env, model)
buffer = ReplayBuffer(capacity=100000)
# 随机初始化收集数据
print("Collecting initial data...")
random_policy = lambda s: env.action_space.sample()
collect_data(env, type('Obj', (), {'get_action': random_policy})(),
buffer, n_steps=5000)
for iteration in range(num_iterations):
# 1. 训练动力学模型
print(f"Iteration {iteration}: Training dynamics model...")
for _ in range(100): # 多个epoch
batch = buffer.sample(batch_size=256)
states, actions, next_states, _, _ = batch
pred = model(states, actions)
loss = F.mse_loss(pred, next_states)
model.optimizer.zero_grad()
loss.backward()
model.optimizer.step()
# 2. 使用MPC收集数据
print(f"Collecting data with MPC...")
state, _ = env.reset()
episode_reward = 0
for step in range(n_steps_per_iter):
action = mpc.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
buffer.push(state, action, reward, next_state, done)
episode_reward += reward
state = next_state
if done:
state, _ = env.reset()
print(f"Episode reward: {episode_reward:.2f}")
return model, mpc7. 理论基础
7.1 收敛性分析
定理:如果动力学模型准确且优化是凸的,MPC控制器渐近稳定。
预测误差的影响:
其中 是收缩系数, 是模型误差。
7.2 样本复杂度
定理:学习动力学模型的样本复杂度为:
其中 是目标误差, 是失败概率。
8. 与无模型RL的对比
8.1 优势
| 方面 | MPC |
|---|---|
| 样本效率 | 高(每个样本可多次使用) |
| 安全性 | 可添加约束 |
| 可解释性 | 显式规划 |
| 棕盒 | 模型可解释 |
8.2 劣势
| 方面 | MPC |
|---|---|
| 模型误差 | 可能导致性能下降 |
| 计算成本 | 在线优化 |
| 泛化能力 | 受模型限制 |
8.3 混合方法
Dyna-style方法结合两者:
def dyna_mpc(env, model, policy, buffer):
"""
混合MPC和无模型学习
"""
# 1. 使用MPC更新策略
mpc_action = mpc.get_action(state)
# 2. 使用真实环境更新(无模型)
real_reward = env.step(mpc_action)
buffer.push(...)
# 3. 使用模型生成模拟数据
for _ in range(n_sim):
sim_state, sim_action = buffer.sample(1)
sim_next = model.predict(sim_state, sim_action)
buffer.push(sim_state, sim_action, reward_fn(sim_state, sim_action), sim_next)
# 4. 无模型更新
policy.update(buffer)9. 应用场景
9.1 机器人控制
- 低级运动控制
- 操纵任务
- 运动规划
9.2 自动驾驶
- 轨迹规划
- 车辆控制
- 安全约束
9.3 工业过程
- 过程控制
- 资源优化
- 约束满足
10. 总结
MPC是强大的基于模型控制方法:
- 高样本效率:每个数据点可用于学习模型
- 安全可控:可添加硬约束
- 可解释:显式规划过程
- 挑战:模型误差、在线计算
最佳实践:
- 使用概率模型捕获不确定性
- 结合交叉熵方法优化
- 定期重新训练模型
参考资料
相关主题
- soft-actor-critic — 无模型RL对比
- td3-twin-delayed-ddpg — 无模型RL对比
- dqn — 无模型RL基础
- exploration-exploitation-rl — MPC中的探索策略