概述

动态规划(Dynamic Programming,DP)方法是解决MDP最经典的方法,前提是环境模型已知(即转移概率 和奖励函数 已知)。1

核心思想是利用贝尔曼方程的递归结构,将复杂问题分解为子问题,逐步求解。

动态规划要求:

  1. 环境模型已知(基于模型)
  2. 状态空间有限(或可离散化)
  3. 马尔可夫性质

两种核心算法

算法思路适用场景
策略迭代交替进行策略评估和策略改进需要较少迭代即可收敛
价值迭代直接寻找最优价值函数状态空间大时更高效

策略迭代

算法流程

策略迭代 = 策略评估 + 策略改进

1. 初始化:
   - 随机初始化策略 π
   - 初始化价值函数 V(s) = 0

2. 迭代直到收敛:
   a) 策略评估:
      重复更新 V(s) 直到 ||V - V_new|| < θ
      V(s) = Σ_a π(a|s) Σ_{s'} P(s'|s,a)[R(s,a,s') + γ V(s')]
   
   b) 策略改进:
      对所有状态 s:
        对所有动作 a 计算 Q(s,a)
        选择最优动作:π'(s) = argmax_a Q(s,a)
      如果 π' == π,则收敛,停止
      否则 π = π',继续迭代

策略评估

给定策略 ,计算其状态价值函数

同步备份实现

def evaluate_policy(env, pi, theta=1e-6, gamma=0.9):
    """
    迭代策略评估
    
    参数:
        env: 环境对象
        pi: 策略,shape (n_states, n_actions)
        theta: 收敛阈值
        gamma: 折扣因子
    
    返回:
        V: 状态价值函数
    """
    n_states = env.n_states
    V = np.zeros(n_states)
    
    while True:
        delta = 0
        V_new = np.zeros(n_states)
        
        for s in range(n_states):
            v = 0
            for a in range(env.n_actions):
                # 获取转移信息
                for prob, next_s, reward in env.transitions[s, a]:
                    v += pi[s, a] * prob * (reward + gamma * V[next_s])
            V_new[s] = v
            delta = max(delta, abs(v - V[s]))
        
        V = V_new
        if delta < theta:
            break
    
    return V

策略改进

根据评估得到的价值函数,改进策略。

贪婪策略

def improve_policy(env, V, pi, gamma=0.9):
    """
    基于价值函数改进策略
    
    返回:
        improved: 是否改进了策略
        pi: 改进后的策略
    """
    n_states = env.n_states
    n_actions = env.n_actions
    improved = False
    
    for s in range(n_states):
        # 计算当前状态下所有动作的Q值
        q_values = []
        for a in range(n_actions):
            q = 0
            for prob, next_s, reward in env.transitions[s, a]:
                q += prob * (reward + gamma * V[next_s])
            q_values.append(q)
        
        # 贪婪选择最优动作
        best_action = np.argmax(q_values)
        
        # 如果当前策略不是最优,则更新
        if pi[s, best_action] != 1.0:
            pi[s] = 0
            pi[s, best_action] = 1.0
            improved = True
    
    return improved, pi

完整实现

def policy_iteration(env, theta=1e-6, gamma=0.9, max_iterations=1000):
    """
    策略迭代算法
    
    返回:
        V: 最优价值函数
        pi: 最优策略
    """
    n_states = env.n_states
    n_actions = env.n_actions
    
    # 1. 初始化随机策略
    pi = np.ones((n_states, n_actions)) / n_actions
    
    for iteration in range(max_iterations):
        # 2. 策略评估
        V = evaluate_policy(env, pi, theta, gamma)
        
        # 3. 策略改进
        improved, pi = improve_policy(env, V, pi, gamma)
        
        if not improved:
            print(f"策略迭代在第 {iteration+1} 次迭代后收敛")
            break
    
    return V, pi

收敛性证明

策略迭代保证在有限次迭代后收敛,因为:

  1. 策略改进定理:若 ,则
  2. 有限性:有限MDP只有有限个策略,改进过程单调,策略数量有限

价值迭代

核心思想

价值迭代直接利用贝尔曼最优方程,从任意初始价值函数开始迭代:

或等价形式:

其中 是贝尔曼最优算子。

算法流程

1. 初始化 V(s) = 0, ∀s

2. 迭代直到收敛:
   对每个状态 s:
     v_old = V(s)
     V(s) = max_a Σ_{s'} P(s'|s,a)[R(s,a,s') + γ V(s')]
     delta = max(delta, |v_old - V(s)|)
   如果 delta < θ,停止

3. 从收敛的V提取最优策略:
   π(s) = argmax_a Σ_{s'} P(s'|s,a)[R(s,a,s') + γ V(s')]

Python实现

def value_iteration(env, theta=1e-6, gamma=0.9, max_iterations=1000):
    """
    价值迭代算法
    
    返回:
        V: 最优价值函数
        pi: 最优策略
    """
    n_states = env.n_states
    n_actions = env.n_actions
    
    # 1. 初始化
    V = np.zeros(n_states)
    
    # 2. 迭代
    for iteration in range(max_iterations):
        delta = 0
        V_new = np.zeros(n_states)
        
        for s in range(n_states):
            v = float('-inf')
            for a in range(n_actions):
                q = 0
                for prob, next_s, reward in env.transitions[s, a]:
                    q += prob * (reward + gamma * V[next_s])
                v = max(v, q)
            V_new[s] = v
            delta = max(delta, abs(v - V[s]))
        
        V = V_new
        
        if delta < theta:
            print(f"价值迭代在第 {iteration+1} 次迭代后收敛")
            break
    
    # 3. 提取策略
    pi = np.zeros((n_states, n_actions))
    for s in range(n_states):
        q_values = []
        for a in range(n_actions):
            q = 0
            for prob, next_s, reward in env.transitions[s, a]:
                q += prob * (reward + gamma * V[next_s])
            q_values.append(q)
        best_action = np.argmax(q_values)
        pi[s, best_action] = 1.0
    
    return V, pi

收敛性证明

价值迭代的收敛性基于贝尔曼最优算子 压缩映射性质:

其中 ,因此 是压缩映射,存在唯一不动点 ,且迭代以几何级数速度收敛。

两种算法的对比

收敛速度

方面策略迭代价值迭代
每次迭代复杂度
迭代次数通常较少可能较多
收敛判断策略不再变化价值变化小于

何时选择

状态空间小,环境简单  ──→  策略迭代(可能只需几次迭代)
状态空间大,精确价值函数重要  ──→  价值迭代
需要完整价值函数  ──→  价值迭代
只需要最优策略  ──→  都可以

原地更新(In-place)

两种算法都可以使用原地(in-place)更新加速:

def value_iteration_inplace(env, theta=1e-6, gamma=0.9):
    V = np.zeros(env.n_states)
    
    while True:
        delta = 0
        for s in range(env.n_states):
            v_old = V[s]
            # 原地更新:使用当前已更新的值
            V[s] = max(
                sum(prob * (reward + gamma * V[next_s])
                    for prob, next_s, reward in env.transitions[s, a])
                for a in range(env.n_actions)
            )
            delta = max(delta, abs(v_old - V[s]))
        
        if delta < theta:
            break
    
    return V

原地更新通常收敛更快,但收敛性证明稍复杂。

示例:FrozenLake环境

FrozenLake是OpenAI Gym的经典环境,用于演示RL算法。

问题描述

SFFF       S = 起始安全区
FHFH       F = 冰面(安全)
FHHH       H = 洞穴(危险,落入重置)
HFFG       G = 目标
           F = 冰面(安全)

实现

import numpy as np
 
class FrozenLake:
    """FrozenLake 4x4 环境简化版"""
    
    def __init__(self):
        self.n_states = 16
        self.n_actions = 4
        self.gamma = 0.9
        
        # 状态布局
        # 0=S, 1=F, 2=H, 3=G
        self.layout = [
            [0, 2, 2, 2],
            [0, 0, 2, 2],
            [2, 0, 0, 2],
            [2, 2, 0, 3]
        ]
        
        # 转移函数
        self.P = self._build_transition()
    
    def _build_transition(self):
        """构建转移概率"""
        P = np.zeros((self.n_states, self.n_actions, self.n_states))
        dirs = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # up, down, left, right
        
        for s in range(self.n_states):
            row, col = s // 4, s % 4
            
            for a, (dr, dc) in enumerate(dirs):
                nr, nc = row + dr, col + dc
                
                # 边界处理
                if 0 <= nr < 4 and 0 <= nc < 4:
                    next_s = nr * 4 + nc
                    cell = self.layout[nr][nc]
                else:
                    next_s = s
                    cell = self.layout[row][col]
                
                # 奖励
                if cell == 3:  # 目标
                    reward = 1.0
                    done = True
                elif cell == 2:  # 洞穴
                    reward = 0.0
                    done = True
                else:
                    reward = 0.0
                    done = False
                
                P[s, a, next_s] = 1.0
        
        return P
    
    def transitions(self, s, a):
        """返回转移列表"""
        transitions = []
        for next_s in range(self.n_states):
            prob = self.P[s, a, next_s]
            if prob > 0:
                row, col = next_s // 4, next_s % 4
                cell = self.layout[row][col]
                if cell == 3:
                    reward = 1.0
                elif cell == 2:
                    reward = 0.0
                else:
                    reward = 0.0
                transitions.append((prob, next_s, reward))
        return transitions
 
 
# 求解
env = FrozenLake()
 
# 策略迭代
V_pi, pi_pi = policy_iteration(env, theta=1e-8, gamma=0.9)
print("策略迭代结果:")
print(f"V = {V_pi.round(3)}")
 
# 价值迭代
V_vi, pi_vi = value_iteration(env, theta=1e-8, gamma=0.9)
print("\n价值迭代结果:")
print(f"V = {V_vi.round(3)}")

异步动态规划

全量同步更新效率低,异步DP每次只更新一个或少量状态:

就地价值迭代

每个状态顺序更新,使用最新的价值估计:

def async_value_iteration(env, n_updates=100000, gamma=0.9):
    V = np.zeros(env.n_states)
    
    for _ in range(n_updates):
        s = np.random.randint(env.n_states)  # 随机选择状态
        v_old = V[s]
        
        # 计算最优动作的Q值
        q_values = []
        for a in range(env.n_actions):
            q = sum(prob * (reward + gamma * V[next_s])
                   for prob, next_s, reward in env.transitions(s, a))
            q_values.append(q)
        
        V[s] = max(q_values)
    
    return V

优先级清扫

优先更新”变化大”的状态:

def prioritized_sweeping(env, theta=1e-5, gamma=0.9):
    V = np.zeros(env.n_states)
    Q = np.zeros((env.n_states, env.n_actions))
    
    # 计算初始Q值
    for s in range(env.n_states):
        for a in range(env.n_actions):
            Q[s, a] = sum(p * (r + gamma * np.max(Q[ns]))
                         for p, ns, r in env.transitions(s, a))
    
    # 优先队列
    heap = []
    for s in range(env.n_states):
        for a in range(env.n_actions):
            delta = abs(Q[s, a] - V[s])
            if delta > theta:
                heapq.heappush(heap, (-delta, s, a))
    
    while heap:
        neg_delta, s, a = heapq.heappop(heap)
        
        # 更新
        old_v = V[s]
        V[s] = Q[s, a]
        
        # 反向传播:更新前驱状态
        for prev_s in env.predecessors[s]:
            for prev_a in range(env.n_actions):
                q = sum(p * (r + gamma * np.max(Q[ns]))
                       for p, ns, r in env.transitions(prev_s, prev_a))
                delta = abs(q - Q[prev_s, prev_a])
                if delta > theta:
                    Q[prev_s, prev_a] = q
                    heapq.heappush(heap, (-delta, prev_s, prev_a))
    
    return V

局限性

动态规划方法有以下局限:

局限性原因解决方案
环境模型未知无法计算期望Q-Learning、SARSA
状态空间巨大存储/计算爆炸近似价值函数
连续状态/动作无法穷举函数近似
样本效率低需要遍历所有状态模型-free方法

详见 Q-LearningDQN

参考


后续主题

  • Q-Learning:无模型环境的价值迭代
  • DQN:深度Q网络处理高维状态
  • 策略梯度:直接优化策略的方法

Footnotes

  1. Sutton & Barto, “Reinforcement Learning: An Introduction”, 2nd Edition, 2018, Chapter 4