概述

DQN(Deep Q-Network)由Mnih等人于2013年提出,并在Nature 2015论文中完善,是深度强化学习的里程碑工作。1

核心贡献:用卷积神经网络处理高维图像输入,学习端到端的策略。

问题背景

Q-Learning的局限性:

  1. 维度灾难:Q表需要存储所有状态-动作对的值
  2. 高维输入:无法处理图像、语音等原始感知数据
  3. 泛化能力差:无法泛化到未见过的状态

Atari游戏示例

输入:210×160×3 图像(像素)
动作:18个控制按钮
状态空间:约10^600种可能(远超宇宙原子数)

DQN核心思想

解决方案

用深度神经网络 近似Q函数:

两大关键技术

技术作用解决的问题
经验回放存储并随机采样历史经验数据相关性、非平稳分布
目标网络固定目标Q值训练不稳定性

算法详解

损失函数

其中 是目标网络参数,每隔 步从 同步。

完整算法流程

1. 初始化:
   - Q网络: Q(s,a;θ)
   - 目标网络: Q(s,a;θ⁻) ← θ
   - 回放缓冲区: D = ∅
   - 探索率: ε

2. 对每个episode:
   a) 初始化状态 s
   
   b) 对每一步 t = 1, T:
      - ε-greedy选择动作:
        A_t = argmax_a Q(s,a;θ)  with prob 1-ε
        random action           with prob ε
      - 执行动作,获得 r, s'
      - 存储 (s, A, r, s') 到 D
      
      - 从D采样小批量:
        (s_j, a_j, r_j, s'_j) ~ U(D)
        
      - 计算目标:
        y_j = r_j + γ max_{a'} Q(s'_j, a'; θ⁻)  if not terminal
             = r_j                                 if terminal
      
      - 梯度更新Q网络:
        θ ← θ - α ∇_θ L(θ)
      
      - 每隔C步:
        θ⁻ ← θ
      
      - s ← s'

Python实现

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
 
class DQN(nn.Module):
    """深度Q网络"""
    
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DQN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, x):
        return self.net(x)
 
 
class ReplayBuffer:
    """经验回放缓冲区"""
    
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards),
            np.array(next_states),
            np.array(dones)
        )
    
    def __len__(self):
        return len(self.buffer)
 
 
class DQNAgent:
    """DQN智能体"""
    
    def __init__(
        self,
        state_dim,
        action_dim,
        hidden_dim=128,
        lr=1e-3,
        gamma=0.99,
        epsilon=1.0,
        epsilon_min=0.01,
        epsilon_decay=0.995,
        target_update=10,
        buffer_size=10000,
        batch_size=64
    ):
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.target_update = target_update
        self.batch_size = batch_size
        self.update_count = 0
        
        # Q网络和目标网络
        self.q_net = DQN(state_dim, action_dim, hidden_dim)
        self.target_net = DQN(state_dim, action_dim, hidden_dim)
        self.target_net.load_state_dict(self.q_net.state_dict())
        
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
        self.replay_buffer = ReplayBuffer(buffer_size)
    
    def choose_action(self, state, training=True):
        """ε-greedy策略"""
        if training and random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        else:
            with torch.no_grad():
                state = torch.FloatTensor(state).unsqueeze(0)
                q_values = self.q_net(state)
                return q_values.argmax().item()
    
    def update(self):
        """从回放缓冲区采样并更新"""
        if len(self.replay_buffer) < self.batch_size:
            return
        
        # 采样
        states, actions, rewards, next_states, dones = \
            self.replay_buffer.sample(self.batch_size)
        
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)
        
        # 当前Q值
        q_values = self.q_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # 目标Q值
        with torch.no_grad():
            next_q_values = self.target_net(next_states).max(1)[0]
            target_q_values = rewards + self.gamma * next_q_values * (1 - dones)
        
        # 计算损失
        loss = nn.MSELoss()(q_values, target_q_values)
        
        # 梯度更新
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_net.parameters(), 1.0)
        self.optimizer.step()
        
        # 更新目标网络
        self.update_count += 1
        if self.update_count % self.target_update == 0:
            self.target_net.load_state_dict(self.q_net.state_dict())
        
        # 衰减ε
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        
        return loss.item()
    
    def store(self, state, action, reward, next_state, done):
        self.replay_buffer.push(state, action, reward, next_state, done)

关键技术详解

1. 经验回放(Experience Replay)

为什么需要?

直接用连续经验训练会导致:

  1. 数据相关性:连续帧高度相关
  2. 非平稳分布:策略变化导致分布变化

解决方案

将经验存储在缓冲区,随机采样打乱相关性:

# 存储
self.replay_buffer.push(state, action, reward, next_state, done)
 
# 随机采样
batch = random.sample(self.buffer, batch_size)

优先级经验回放(PER)

优先采样高TD误差的经验:

class PrioritizedReplayBuffer:
    """优先级经验回放"""
    
    def __init__(self, capacity=10000, alpha=0.6):
        self.capacity = capacity
        self.alpha = alpha
        self.priorities = np.zeros(capacity)
        self.buffer = deque(maxlen=capacity)
        self.position = 0
    
    def push(self, state, action, reward, next_state, done, td_error=1.0):
        max_priority = self.priorities.max() if len(self.buffer) > 0 else 1.0
        self.buffer.append((state, action, reward, next_state, done))
        self.priorities[self.position] = td_error ** self.alpha
        self.position = (self.position + 1) % self.capacity
    
    def sample(self, batch_size, beta=0.4):
        # 计算采样概率
        probs = self.priorities[:len(self.buffer)]
        probs /= probs.sum()
        
        # 加权采样
        indices = np.random.choice(len(self.buffer), batch_size, p=probs)
        weights = (len(self.buffer) * probs[indices]) ** (-beta)
        weights /= weights.max()
        
        batch = [self.buffer[i] for i in indices]
        return zip(*batch), indices, weights

2. 目标网络(Target Network)

问题

直接用当前Q网络计算目标值会导致:

  • 目标随训练变化,导致训练不稳定
  • 类似”用移动目标学习”的问题

解决方案

使用延迟更新的目标网络:

# 定期同步
if self.update_count % self.target_update == 0:
    self.target_net.load_state_dict(self.q_net.state_dict())

3. 梯度裁剪

torch.nn.utils.clip_grad_norm_(self.q_net.parameters(), max_norm=1.0)

防止梯度爆炸,稳定训练。

DQN变体

1. Double DQN

解决Q值过估计问题:

# Standard DQN
y_j = r_j + γ * max_a' Q_target(s'_j, a')
 
# Double DQN
a_max = argmax_a Q_online(s'_j, a)  # 用在线网络选择动作
y_j = r_j + γ * Q_target(s'_j, a_max)  # 用目标网络评估

2. Dueling DQN

分离状态价值和优势:

class DuelingDQN(nn.Module):
    """Dueling DQN架构"""
    
    def __init__(self, state_dim, action_dim):
        super().__init__()
        
        # 共享特征层
        self.feature = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU()
        )
        
        # 状态价值分支
        self.value = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
        
        # 优势分支
        self.advantage = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim)
        )
    
    def forward(self, x):
        features = self.feature(x)
        v = self.value(features)
        a = self.advantage(features)
        
        # Q = V + (A - mean(A))
        q = v + a - a.mean(dim=1, keepdim=True)
        return q

3. Noisy DQN

用噪声网络替代ε-greedy探索:

class NoisyLinear(nn.Module):
    """Noisy线性层"""
    
    def __init__(self, in_dim, out_dim, sigma_init=0.5):
        super().__init__()
        self.in_dim = in_dim
        self.out_dim = out_dim
        
        # 可学习参数
        self.weight_mu = nn.Parameter(torch.FloatTensor(out_dim, in_dim))
        self.weight_sigma = nn.Parameter(torch.FloatTensor(out_dim, in_dim))
        self.bias_mu = nn.Parameter(torch.FloatTensor(out_dim))
        self.bias_sigma = nn.Parameter(torch.FloatTensor(out_dim))
        
        self.reset_parameters()
        self.sigma_init = sigma_init
    
    def reset_parameters(self):
        mu_init = 1.0 / np.sqrt(self.in_dim)
        self.weight_mu.data.uniform_(-mu_init, mu_init)
        self.bias_mu.data.uniform_(-mu_init, mu_init)
        self.weight_sigma.data.fill_(self.sigma_init)
        self.bias_sigma.data.fill_(self.sigma_init)
    
    def forward(self, x):
        weight = self.weight_mu + self.weight_sigma * torch.randn_like(self.weight_mu)
        bias = self.bias_mu + self.bias_sigma * torch.randn_like(self.bias_mu)
        return x @ weight.t() + bias

4. Rainbow DQN

整合多种改进的集大成者:

组件改进
Double DQN解决过估计
Dueling DQN分离V和A
Prioritized Replay优先级采样
Noisy Nets探索策略
Distributional RLQ值分布化
N-step TD多步TD

Atari游戏完整实现

import gym
import torch
import numpy as np
 
class AtariPreprocessor:
    """Atari图像预处理"""
    
    def __init__(self, frame_stack=4):
        self.frame_stack = frame_stack
        self.frames = deque(maxlen=frame_stack)
    
    def preprocess(self, obs):
        """预处理单帧"""
        obs = np.mean(obs, axis=2)  # 灰度化
        obs = obs[34:194]           # 裁剪
        obs = obs[::2, ::2]         # 下采样 210x160 -> 80x80
        return obs.astype(np.uint8)
    
    def reset(self, obs):
        """初始化"""
        obs = self.preprocess(obs)
        self.frames = deque([obs] * self.frame_stack, maxlen=self.frame_stack)
        return self.get_state()
    
    def step(self, obs):
        """处理新帧"""
        obs = self.preprocess(obs)
        self.frames.append(obs)
        return self.get_state()
    
    def get_state(self):
        return np.stack(self.frames, axis=0)
 
 
def train_dqn_atari():
    """Atari游戏训练"""
    env = gym.make('Breakout-v0')
    state_dim = 80 * 80 * 4  # 4帧堆叠
    action_dim = env.action_space.n
    
    agent = DQNAgent(state_dim, action_dim, lr=1e-4)
    preprocessor = AtariPreprocessor()
    
    n_episodes = 10000
    rewards_history = []
    
    for episode in range(n_episodes):
        obs = env.reset()
        state = preprocessor.reset(obs)
        episode_reward = 0
        done = False
        
        while not done:
            action = agent.choose_action(state)
            obs, reward, done, _ = env.step(action)
            next_state = preprocessor.step(obs) if not done else state
            
            agent.store(state, action, reward, next_state, done)
            agent.update()
            
            state = next_state
            episode_reward += reward
        
        rewards_history.append(episode_reward)
        
        if (episode + 1) % 10 == 0:
            avg_reward = np.mean(rewards_history[-100:])
            print(f"Episode {episode+1}, Avg Reward: {avg_reward:.2f}, Epsilon: {agent.epsilon:.3f}")
 
 
if __name__ == "__main__":
    train_dqn_atari()

超参数选择

超参数典型值说明
学习率1e-4 (Adam)需调优
折扣因子0.99长期规划
探索率初始1.0完全随机
探索率衰减0.995/0.999逐渐利用
探索率最小0.01-0.1保持探索
批量大小32-128平衡方差/效率
目标网络更新频率10000步经验法则
经验回放大小100000-1000000内存限制
梯度裁剪10.0防止爆炸

局限性与改进方向

DQN的局限

问题描述改进
过估计max操作导致Q值系统性偏高Double DQN
低样本效率需要大量交互PER, HER
不稳定的训练目标变化大目标网络、梯度裁剪
连续动作无法直接处理DDPG, SAC
随机策略不支持随机性Noisy DQN

后续发展

算法特点
DDPG连续动作空间
A3C异步并行,多worker
PPO稳定、sample-efficient
Rainbow集成多种改进
DrQ数据高效

参考


后续主题

  • 策略梯度:直接优化策略的方法
  • PPO:近端策略优化算法
  • RLHF:人类反馈强化学习

Footnotes

  1. Mnih et al., “Human-level control through deep reinforcement learning”, Nature, 2015