概述

策略梯度(Policy Gradient)方法直接对策略进行参数化,通过梯度上升优化累积奖励。与基于价值函数的方法(Q-Learning、DQN)不同,策略梯度直接输出动作概率分布,适合连续动作空间和高维动作空间。1

核心思想:用参数化的策略函数 代替价值表,通过梯度上升最大化期望回报。

为什么需要策略梯度?

价值函数的局限

问题描述
高维动作空间连续动作无法枚举所有Q(s,a)
随机策略有时随机策略比确定性策略更好
收敛性Q值估计可能不收敛

策略梯度的优势

优势说明
自然处理连续动作输出均值/方差,直接采样
收敛性更好直接优化目标,振荡更少
随机策略学习隐式探索,避免撞墙
端到端学习直接从感知到动作

策略参数化

离散动作空间

使用softmax输出动作概率:

其中 是状态-动作对的特征向量。

连续动作空间

使用高斯分布:

通常:

目标函数

定义

策略梯度的目标是最大化期望回报:

其中 是轨迹。

两种形式

形式定义适用场景
平均价值Episodic任务
平均奖励率Continuing任务

梯度推导

核心定理:策略梯度定理

对于可微的策略 ,目标函数 的梯度为:

详细推导

1. 轨迹概率

轨迹概率:

2. 对数导数

注意:环境转移概率不含 ,梯度为0。

3. 期望梯度

折扣因子的处理

如果考虑折扣因子:

梯度估计

蒙特卡洛估计

使用采样轨迹估计期望:

def estimate_policy_gradient(env, policy, n_trajectories=100, gamma=0.99):
    """
    蒙特卡洛估计策略梯度
    
    返回:
        gradients: 策略参数的梯度估计
    """
    gradients = []
    
    for _ in range(n_trajectories):
        trajectory = []
        state = env.reset()
        done = False
        
        # 收集轨迹
        while not done:
            action = policy.sample_action(state)  # 随机采样
            next_state, reward, done, _ = env.step(action)
            trajectory.append((state, action, reward))
            state = next_state
        
        # 计算回报和梯度
        G = 0
        for t, (s, a, r) in enumerate(trajectory):
            G = r + gamma * G  # 折扣回报
            grad_log = policy.grad_log_pi(s, a)  # ∇θ log π(a|s)
            gradients.append(grad_log * G)
    
    # 平均
    return np.mean(gradients, axis=0)

方差减小技术

1. 基线(Baseline)

减去基线函数不改变期望但减小方差:

常用基线:

2. 优势函数

使用优势函数

REINFORCE算法

算法流程

1. 初始化策略参数 θ

2. 重复直到收敛:
   a) 用当前策略 π_θ 采集一个episode:
      τ = (s₀, a₀, r₁, ..., s_{T-1}, a_{T-1}, r_T)
   
   b) 对每个时间步 t = 0, T-1:
      - 计算回报 G_t = Σ_{k=t}^{T-1} γ^{k-t} r_{k+1}
      - 计算策略梯度估计: g_t = G_t · ∇θ log π_θ(a_t|s_t)
   
   c) 更新策略: θ ← θ + α · Σ_t g_t

Python实现

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
 
class PolicyNetwork(nn.Module):
    """策略网络:输出动作概率"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )
    
    def forward(self, x):
        return self.net(x)
 
 
class REINFORCEAgent:
    """REINFORCE智能体"""
    
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
        self.gamma = gamma
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
    
    def choose_action(self, state):
        """根据策略选择动作(随机采样)"""
        state = torch.FloatTensor(state).unsqueeze(0)
        probs = self.policy(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)
    
    def update(self, log_probs, rewards):
        """
        REINFORCE更新
        
        参数:
            log_probs: 轨迹中每个动作的log概率
            rewards: 轨迹中每个时间步的奖励
        """
        T = len(rewards)
        
        # 计算折扣回报 G_t
        G = 0
        returns = []
        for t in reversed(range(T)):
            G = rewards[t] + self.gamma * G
            returns.insert(0, G)
        
        returns = torch.FloatTensor(returns)
        
        # 标准化回报(减小方差)
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        
        # 计算策略梯度损失
        loss = 0
        for log_prob, G in zip(log_probs, returns):
            loss -= log_prob * G  # 梯度上升,所以用负号
        
        # 更新
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
 
 
def train_reinforce(env, agent, n_episodes=1000):
    """训练REINFORCE"""
    rewards_history = []
    
    for episode in range(n_episodes):
        state = env.reset()
        done = False
        
        log_probs = []
        rewards = []
        
        while not done:
            action, log_prob = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            
            log_probs.append(log_prob)
            rewards.append(reward)
            
            state = next_state
        
        # 更新
        loss = agent.update(log_probs, rewards)
        
        episode_reward = sum(rewards)
        rewards_history.append(episode_reward)
        
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(rewards_history[-100:])
            print(f"Episode {episode+1}, Avg Reward: {avg_reward:.2f}")
    
    return rewards_history

连续动作空间的REINFORCE

高斯策略

class GaussianPolicy(nn.Module):
    """高斯策略:输出均值和标准差"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        
        # 均值网络
        self.mean_net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
        
        # 对数标准差(可学习)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
    
    def forward(self, state):
        mean = self.mean_net(state)
        std = torch.exp(self.log_std)
        return mean, std
    
    def sample(self, state):
        mean, std = self.forward(state)
        dist = torch.distributions.Normal(mean, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(dim=-1)  # 多维求和
        return action, log_prob
 
 
class ContinuousREINFORCE:
    """连续动作空间的REINFORCE"""
    
    def __init__(self, state_dim, action_dim, action_bound, lr=1e-3, gamma=0.99):
        self.gamma = gamma
        self.action_bound = action_bound
        self.policy = GaussianPolicy(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
    
    def choose_action(self, state, deterministic=False):
        state = torch.FloatTensor(state).unsqueeze(0)
        
        if deterministic:
            with torch.no_grad():
                mean, _ = self.policy(state)
                action = torch.tanh(mean) * self.action_bound
        else:
            action, log_prob = self.policy.sample(state)
            action = torch.tanh(action) * self.action_bound
        
        return action.squeeze(0).numpy()
    
    def update(self, states, actions, rewards):
        """策略梯度更新"""
        T = len(rewards)
        
        # 计算回报
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)
        
        returns = torch.FloatTensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        
        # 计算log概率
        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions) / self.action_bound  # 归一化
        actions = torch.atanh(torch.clamp(actions, -0.999, 0.999))  # inverse tanh
        
        mean, std = self.policy(states)
        dist = torch.distributions.Normal(mean, std)
        log_probs = dist.log_prob(actions).sum(dim=-1)
        
        # 策略梯度损失
        loss = -(log_probs * returns).sum()
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

方差与偏差分析

偏差(Bias)

REINFORCE是无偏的(对策略梯度是无偏估计):

方差

REINFORCE的方差较高,原因:

  1. 蒙特卡洛估计
  2. 长轨迹累积

减小方差的方法

方法说明
折扣因子 减少远期不确定性
基线 减少方差而不改变期望
优势函数 更好的梯度估计
Actor-Critic用函数近似代替蒙特卡洛回报

与Q-Learning的对比

方面策略梯度Q-Learning
表示
输出动作概率分布Q值
动作空间连续/离散通常离散
探索隐式(随机策略)ε-greedy
收敛性更好可能振荡
方差低(TD)
样本效率中等

参考


后续主题

Footnotes

  1. Sutton et al., “Policy Gradient Methods for Reinforcement Learning”, 2000