概述
连续动作空间(Continuous Action Space)是强化学习中的重要场景,与离散动作空间有着本质不同的挑战和解决方法。本专题系统介绍面向连续动作空间的Actor-Critic方法,包括经典算法DDPG、TD3、SAC以及PPO的连续动作扩展。1
连续动作空间的挑战
策略参数化
连续动作空间的策略需要输出动作分布而非离散选择:
或使用更通用的参数化:
其中 是确定性策略网络, 是随机噪声。
高斯策略参数化:
class GaussianPolicy(nn.Module):
"""
高斯策略网络
输出均值和标准差
"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super().__init__()
# 均值网络
self.mean_net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Tanh() # 均值压缩到 [-1, 1]
)
# 标准差(可学习参数或网络输出)
self.log_std = nn.Parameter(torch.zeros(action_dim))
def forward(self, state):
mean = self.mean_net(state)
std = torch.exp(self.log_std)
return mean, std
def sample(self, state):
"""采样动作"""
mean, std = self.forward(state)
dist = torch.distributions.Normal(mean, std)
action = dist.rsample() # 重参数化采样
log_prob = dist.log_prob(action).sum(dim=-1)
# 动作裁剪到合法范围
action = torch.tanh(action)
return action, log_prob探索-利用权衡
连续动作空间的探索策略与离散空间有显著不同:
1. 动作空间噪声
在动作输出上添加噪声:
2. 参数空间噪声
在网络参数上添加噪声:
3. 熵正则化
鼓励策略的随机性:
探索强度对比:
| 方法 | 探索类型 | 优点 | 缺点 |
|---|---|---|---|
| -greedy | 动作噪声 | 简单 | 不适合连续空间 |
| OU噪声 | 时序相关 | 物理意义 | 超参数敏感 |
| 高斯噪声 | 独立噪声 | 简单 | 缺乏时序相关性 |
| 熵正则化 | 策略随机性 | 自适应 | 需调 |
DDPG与TD3
DDPG: 确定性策略梯度
DDPG(Deep Deterministic Policy Gradient)是最早的深度连续控制算法之一,结合了Q-learning与策略梯度。2
核心思想:
- 使用确定性策略 代替随机策略
- 借鉴DQN的经验回放和目标网络
- 通过确定性策略梯度更新策略
确定性策略梯度定理:
与随机策略梯度的关系:
随机策略梯度:
确定性策略梯度可视为随机策略梯度在 时的极限。
DDPG完整流程:
class DDPG:
"""Deep Deterministic Policy Gradient"""
def __init__(self, state_dim, action_dim, action_bound,
actor_lr=1e-4, critic_lr=1e-3, gamma=0.99, tau=0.001):
self.gamma = gamma
self.tau = tau
self.action_bound = action_bound
# Actor网络
self.actor = Actor(state_dim, action_dim, action_bound)
self.actor_target = Actor(state_dim, action_dim, action_bound)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
# Critic网络
self.critic = Critic(state_dim, action_dim)
self.critic_target = Critic(state_dim, action_dim)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)
# 目标网络初始化
self._hard_update()
def get_action(self, state, noise_scale=0.1):
"""获取动作"""
state = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state).cpu().numpy()[0]
# 添加探索噪声
noise = np.random.normal(0, noise_scale, size=action.shape)
action = np.clip(action + noise, -self.action_bound, self.action_bound)
return action
def update(self, states, actions, rewards, next_states, dones):
"""更新网络"""
# ========== Critic更新 ==========
with torch.no_grad():
next_actions = self.actor_target(next_states)
q_target = rewards + self.gamma * (1 - dones) * self.critic_target(next_states, next_actions)
q_current = self.critic(states, actions)
critic_loss = F.mse_loss(q_current, q_target)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# ========== Actor更新 ==========
policy_actions = self.actor(states)
# 策略梯度:最大化Q值
policy_loss = -self.critic(states, policy_actions).mean()
self.actor_optimizer.zero_grad()
policy_loss.backward()
self.actor_optimizer.step()
# ========== 目标网络更新 ==========
self._soft_update()
return critic_loss.item(), policy_loss.item()
def _soft_update(self):
"""软更新目标网络"""
for target_param, param in zip(self.actor_target.parameters(),
self.actor.parameters()):
target_param.data.copy_(
self.tau * param.data + (1 - self.tau) * target_param.data
)
def _hard_update(self):
"""硬更新(初始化时)"""
self.actor_target.load_state_dict(self.actor.state_dict())
self.critic_target.load_state_dict(self.critic.state_dict())TD3: Twin延迟策略
TD3(Twin Delayed DDPG)是DDPG的重要改进,通过三项核心技术解决过估计问题。详见 TD3详解。
三项核心技术回顾:
- 双Q学习 (Clipped Double Q-Learning):
- 延迟策略更新 (Delayed Policy Updates):
每隔 步才更新策略网络(通常 )。
- 目标策略平滑 (Target Policy Smoothing):
Clipped Double Q-learning
Clipped Double Q-learning是TD3的核心组件:
数学分析:
假设 和 是 的有偏估计:
标准Q-learning使用 :
Clipped Double Q使用 :
定理(过估计抑制):
令 ,则在独立同分布假设下:
SAC (Soft Actor-Critic)
最大熵RL框架
SAC(Soft Actor-Critic)将最大熵框架引入Actor-Critic,是连续控制任务的SOTA算法之一。3
最大熵目标:
熵正则化的物理意义:
- 探索:高熵 更多探索
- 鲁棒性:避免策略过拟合
- 泛化:防止对特定状态过度自信
Soft Value Function:
定义Soft Q函数和Soft Value函数:
Soft Bellman算子:
其中 。
熵系数自动调整
SAC的核心创新之一是自动调整熵系数 。
自动温度调整的目标:
确保策略的熵不低于目标熵 :
实现:
class SAC:
"""Soft Actor-Critic with Automatic Temperature"""
def __init__(self, state_dim, action_dim, action_bound,
lr=3e-4, gamma=0.99, tau=0.005,
target_entropy=None):
self.gamma = gamma
self.tau = tau
self.action_bound = action_bound
# 目标熵(默认为 -action_dim)
if target_entropy is None:
self.target_entropy = -action_dim
else:
self.target_entropy = target_entropy
# Actor(重参数化高斯策略)
self.actor = ReparamGaussianPolicy(state_dim, action_dim)
# 双Critic
self.critic1 = Critic(state_dim, action_dim)
self.critic2 = Critic(state_dim, action_dim)
self.critic1_target = Critic(state_dim, action_dim)
self.critic2_target = Critic(state_dim, action_dim)
self.critic1_target.load_state_dict(self.critic1.state_dict())
self.critic2_target.load_state_dict(self.critic2.state_dict())
# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic_optimizer = optim.Adam(
list(self.critic1.parameters()) + list(self.critic2.parameters()),
lr=lr
)
# 自动熵温度
self.log_alpha = torch.zeros(1, requires_grad=True)
self.alpha_optimizer = optim.Adam([self.log_alpha], lr=lr)
def get_action(self, state, deterministic=False):
"""获取动作"""
with torch.no_grad():
state = torch.FloatTensor(state).unsqueeze(0)
if deterministic:
action, _ = self.actor.sample(state)
return action.cpu().numpy()[0]
action, log_prob = self.actor.sample(state)
return action.cpu().numpy()[0]
def update(self, states, actions, rewards, next_states, dones):
# 获取当前温度
alpha = torch.exp(self.log_alpha)
# ========== Critic更新 ==========
with torch.no_grad():
next_actions, next_log_probs = self.actor.sample(next_states)
q1_target = self.critic1_target(next_states, next_actions)
q2_target = self.critic2_target(next_states, next_actions)
q_target = torch.min(q1_target, q2_target)
# Soft目标
next_value = q_target - alpha * next_log_probs
q_target = rewards + self.gamma * (1 - dones) * next_value
q1_current = self.critic1(states, actions)
q2_current = self.critic2(states, actions)
critic1_loss = F.mse_loss(q1_current, q_target)
critic2_loss = F.mse_loss(q2_current, q_target)
critic_loss = critic1_loss + critic2_loss
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# ========== Actor更新 ==========
new_actions, log_probs = self.actor.sample(states)
q1_new = self.critic1(states, new_actions)
q2_new = self.critic2(states, new_actions)
q_new = torch.min(q1_new, q2_new)
# 策略梯度 + 熵正则
actor_loss = (alpha * log_probs - q_new).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# ========== 温度参数更新 ==========
# 目标:使 log_prob 的期望等于 target_entropy
alpha_loss = -(self.log_alpha * (log_probs + self.target_entropy).detach()).mean()
self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()
# ========== 软更新目标网络 ==========
self._soft_update(self.critic1, self.critic1_target)
self._soft_update(self.critic2, self.critic2_target)
return {
'critic_loss': critic_loss.item(),
'actor_loss': actor_loss.item(),
'alpha': alpha.item(),
'alpha_loss': alpha_loss.item()
}
def _soft_update(self, source, target):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(
self.tau * param.data + (1 - self.tau) * target_param.data
)理论分析
定理(最大熵策略收敛):
令 为最大熵框架下的最优策略,则:
引理(熵与探索的关系):
令 为策略熵, 为累积后悔。则:
推论:更高的熵 更低的探索后悔下界。
最大熵RL与标准RL的关系:
当 时,最大熵RL退化为标准RL:
PPO在连续动作空间
高斯策略参数化
PPO(Proximal Policy Optimization)同样可以扩展到连续动作空间。4
高斯策略PPO:
动作概率比:
对于高斯策略:
PPO裁剪目标:
动作空间约束
连续动作空间通常有物理约束:
常见处理方法:
1. Tanh压缩:
2. Sigmoid压缩:
3. 伸缩变换:
对数概率修正:
使用Tanh压缩后,需要修正对数概率:
def get_log_prob(policy, states, actions):
"""计算高斯策略的对数概率(带Tanh修正)"""
mean, std = policy(states)
# 逆Tanh变换
actions_clipped = torch.atanh(torch.clamp(actions, -0.999, 0.999))
# 原始高斯对数概率
log_prob = -0.5 * ((actions_clipped - mean) / std) ** 2 \
- torch.log(std) - 0.5 * np.log(2 * np.pi)
# Tanh修正项
log_prob -= torch.log(1 - actions ** 2 + 1e-6)
return log_prob.sum(dim=-1)与离散PPO的对比
| 方面 | 离散PPO | 连续PPO |
|---|---|---|
| 动作参数化 | Categorical分布 | 高斯分布 |
| 动作采样 | torch.distributions.Categorical | torch.distributions.Normal |
| 熵计算 | 高斯熵 | |
| 策略梯度 | ||
| 梯度方差 | 通常较低 | 可能较高 |
| 适用场景 | 离散动作 | 连续控制 |
连续PPO实现:
class PPOContinuous:
"""连续动作空间的PPO"""
def __init__(self, state_dim, action_dim, action_bound,
lr=3e-4, gamma=0.99, lambd=0.95,
clip_eps=0.2, ent_coef=0.01,
vf_coef=0.5, max_grad_norm=0.5):
self.gamma = gamma
self.lambd = lambd
self.clip_eps = clip_eps
self.ent_coef = ent_coef
self.vf_coef = vf_coef
self.max_grad_norm = max_grad_norm
self.action_bound = action_bound
# 策略网络
self.actor = GaussianActor(state_dim, action_dim)
# 价值网络
self.critic = Critic(state_dim)
# 优化器
self.optimizer = optim.Adam(
list(self.actor.parameters()) + list(self.critic.parameters()),
lr=lr
)
def get_action(self, state):
"""采样动作"""
with torch.no_grad():
mean, std = self.actor(torch.FloatTensor(state))
dist = torch.distributions.Normal(mean, std)
action_raw = dist.sample()
action = torch.tanh(action_raw)
log_prob = dist.log_prob(action_raw) - torch.log(1 - action.pow(2) + 1e-6)
return action.numpy(), log_prob.sum(-1).numpy()
def compute_gae(self, rewards, values, next_value, dones):
"""计算GAE"""
advantages = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
value_next = next_value
else:
value_next = values[t + 1]
delta = rewards[t] + self.gamma * value_next * (1 - dones[t]) - values[t]
gae = delta + self.gamma * self.lambd * (1 - dones[t]) * gae
advantages.insert(0, gae)
return torch.tensor(advantages)
def update(self, trajectories):
"""PPO更新"""
states = torch.FloatTensor(np.array([t['state'] for t in trajectories]))
actions = torch.FloatTensor(np.array([t['action'] for t in trajectories]))
rewards = torch.FloatTensor(np.array([t['reward'] for t in trajectories]))
dones = torch.FloatTensor(np.array([t['done'] for t in trajectories]))
# 计算GAE
with torch.no_grad():
values = self.critic(states).squeeze()
next_value = 0 if dones[-1] else values[-1].item()
advantages = self.compute_gae(rewards.numpy(), values.numpy(), next_value, dones.numpy())
returns = advantages + values
# 标准化
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 多epoch更新
for _ in range(10): # num_epochs
# 获取当前策略的动作概率
mean, std = self.actor(states)
dist = torch.distributions.Normal(mean, std)
# 逆Tanh变换
actions_clipped = torch.atanh(torch.clamp(actions, -0.999, 0.999))
# 当前对数概率
log_probs = dist.log_prob(actions_clipped)
log_probs = (log_probs - torch.log(1 - actions.pow(2) + 1e-6)).sum(-1)
# 旧对数概率(从trajectory获取)
old_log_probs = torch.FloatTensor([t['log_prob'] for t in trajectories])
# 概率比
ratio = torch.exp(log_probs - old_log_probs)
# PPO裁剪损失
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * advantages
policy_loss = -torch.min(surr1, surr2).mean()
# 熵损失
entropy_loss = -self.ent_coef * dist.entropy().sum(-1).mean()
# 价值损失
values_pred = self.critic(states).squeeze()
vf_loss = self.vf_coef * F.mse_loss(values_pred, returns)
# 总损失
loss = policy_loss + entropy_loss + vf_loss
# 反向传播
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm)
nn.utils.clip_grad_norm_(self.critic.parameters(), self.max_grad_norm)
self.optimizer.step()
return loss.item(), policy_loss.item(), vf_loss.item()实践技巧
目标网络更新
目标网络是连续控制算法的关键组件,用于稳定训练。
软更新(推荐):
典型值:。
硬更新:
每隔 步直接复制参数。典型值:。
class TargetNetworkUpdater:
"""目标网络更新管理器"""
def __init__(self, networks, target_networks, update_type='soft',
tau=0.005, update_freq=1, hard_update_freq=None):
self.networks = networks
self.target_networks = target_networks
self.update_type = update_type
self.tau = tau
self.update_freq = update_freq
self.hard_update_freq = hard_update_freq
self.step_count = 0
def update(self):
"""执行目标网络更新"""
self.step_count += 1
if self.update_type == 'soft':
for net, target_net in zip(self.networks, self.target_networks):
for tp, p in zip(target_net.parameters(), net.parameters()):
tp.data.copy_(self.tau * p.data + (1 - self.tau) * tp.data)
elif self.update_type == 'hard':
if self.step_count % self.hard_update_freq == 0:
for net, target_net in zip(self.networks, self.target_networks):
target_net.load_state_dict(net.state_dict())
elif self.update_type == 'delayed':
# 延迟更新:在策略更新时同时更新目标网络
if self.step_count % self.update_freq == 0:
for net, target_net in zip(self.networks, self.target_networks):
target_net.load_state_dict(net.state_dict())经验回放设计
连续控制任务的经验回放需要特别设计。
优先经验回放 (PER):
根据TD误差调整采样优先级:
class PrioritizedReplayBuffer:
"""优先经验回放"""
def __init__(self, capacity, alpha=0.6, beta=0.4):
self.capacity = capacity
self.alpha = alpha # 优先级指数
self.beta = beta # 重要性采样指数
self.buffer = []
self.priorities = np.zeros(capacity, dtype=np.float32)
self.pos = 0
def push(self, state, action, reward, next_state, done, td_error=None):
"""添加经验"""
max_priority = self.priorities.max() if self.buffer else 1.0
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
self.buffer[self.pos] = (state, action, reward, next_state, done)
self.priorities[self.pos] = max_priority
self.pos = (self.pos + 1) % self.capacity
def sample(self, batch_size):
"""采样"""
if len(self.buffer) < self.capacity:
probs = self.priorities[:len(self.buffer)]
else:
probs = self.priorities
probs = probs ** self.alpha
probs = probs / probs.sum()
indices = np.random.choice(len(self.buffer), batch_size, p=probs, replace=False)
# 重要性采样权重
weights = (len(self.buffer) * probs[indices]) ** (-self.beta)
weights = weights / weights.max()
batch = [self.buffer[i] for i in indices]
return map(np.array, zip(*batch)), indices, weights
def update_priorities(self, indices, td_errors):
"""更新优先级"""
for idx, error in zip(indices, td_errors):
self.priorities[idx] = abs(error) + 1e-6探索策略选择
| 算法 | 推荐探索策略 | 特点 |
|---|---|---|
| DDPG | OU噪声或高斯噪声 | 需要手动调整噪声尺度 |
| TD3 | 高斯噪声(较小尺度) | TD3自带目标策略平滑减少探索需求 |
| SAC | 策略熵自动调节 | 不需要额外探索噪声 |
| PPO | 熵正则化 | 熵系数需要调优 |
探索强度建议:
def get_exploration_schedule(algorithm, total_steps):
"""
获取探索调度
常见调度策略:
1. 线性衰减
2. 指数衰减
3. 余弦退火
"""
if algorithm == 'ddpg':
# 线性衰减
def schedule(step):
return max(0.1, 1.0 - step / total_steps)
elif algorithm == 'td3':
# 指数衰减
def schedule(step):
return 0.1 * np.exp(-5 * step / total_steps)
elif algorithm == 'sac':
# SAC不需要外部探索调度
def schedule(step):
return None
elif algorithm == 'ppo':
# 熵系数调度
def schedule(step):
return max(0.001, 0.01 * (1 - step / total_steps))
return schedule算法对比与选择
综合对比
| 算法 | 策略类型 | 探索方式 | 稳定性 | 样本效率 | 计算成本 |
|---|---|---|---|---|---|
| DDPG | 确定性 | 动作噪声 | 低 | 高 | 低 |
| TD3 | 确定性 | 动作噪声 | 中 | 高 | 中 |
| SAC | 随机 | 熵正则 | 高 | 高 | 中 |
| PPO | 随机 | 熵正则 | 高 | 中 | 中 |
场景选择指南
选择SAC当:
- 需要高稳定性和鲁棒性
- 超参数调优资源有限
- 需要自动平衡探索-利用
选择TD3当:
- 计算资源有限
- 需要最高样本效率
- 可以接受稍复杂的调参
选择PPO当:
- 已有PPO实现基础
- 需要与现有系统集成
- 偏好on-policy方法的稳定性
参考资料
相关主题
Footnotes
-
Lillicrap et al. (2015). Continuous Control with Deep Reinforcement Learning. ICLR 2016. ↩
-
Silver et al. (2014). Deterministic Policy Gradient Algorithms. ICML 2014. ↩
-
Haarnoja et al. (2018). Soft Actor-Critic: Off-Policy Maximum Entropy Deep Reinforcement Learning. ICML 2018. ↩
-
Schulman et al. (2017). Proximal Policy Optimization Algorithms. arXiv. ↩