概述
动态规划(Dynamic Programming,DP)方法是解决MDP最经典的方法,前提是环境模型已知(即转移概率 和奖励函数 已知)。1
核心思想是利用贝尔曼方程的递归结构,将复杂问题分解为子问题,逐步求解。
动态规划要求:
- 环境模型已知(基于模型)
- 状态空间有限(或可离散化)
- 马尔可夫性质
两种核心算法
| 算法 | 思路 | 适用场景 |
|---|---|---|
| 策略迭代 | 交替进行策略评估和策略改进 | 需要较少迭代即可收敛 |
| 价值迭代 | 直接寻找最优价值函数 | 状态空间大时更高效 |
策略迭代
算法流程
策略迭代 = 策略评估 + 策略改进
1. 初始化:
- 随机初始化策略 π
- 初始化价值函数 V(s) = 0
2. 迭代直到收敛:
a) 策略评估:
重复更新 V(s) 直到 ||V - V_new|| < θ
V(s) = Σ_a π(a|s) Σ_{s'} P(s'|s,a)[R(s,a,s') + γ V(s')]
b) 策略改进:
对所有状态 s:
对所有动作 a 计算 Q(s,a)
选择最优动作:π'(s) = argmax_a Q(s,a)
如果 π' == π,则收敛,停止
否则 π = π',继续迭代
策略评估
给定策略 ,计算其状态价值函数 。
同步备份实现:
def evaluate_policy(env, pi, theta=1e-6, gamma=0.9):
"""
迭代策略评估
参数:
env: 环境对象
pi: 策略,shape (n_states, n_actions)
theta: 收敛阈值
gamma: 折扣因子
返回:
V: 状态价值函数
"""
n_states = env.n_states
V = np.zeros(n_states)
while True:
delta = 0
V_new = np.zeros(n_states)
for s in range(n_states):
v = 0
for a in range(env.n_actions):
# 获取转移信息
for prob, next_s, reward in env.transitions[s, a]:
v += pi[s, a] * prob * (reward + gamma * V[next_s])
V_new[s] = v
delta = max(delta, abs(v - V[s]))
V = V_new
if delta < theta:
break
return V策略改进
根据评估得到的价值函数,改进策略。
贪婪策略:
def improve_policy(env, V, pi, gamma=0.9):
"""
基于价值函数改进策略
返回:
improved: 是否改进了策略
pi: 改进后的策略
"""
n_states = env.n_states
n_actions = env.n_actions
improved = False
for s in range(n_states):
# 计算当前状态下所有动作的Q值
q_values = []
for a in range(n_actions):
q = 0
for prob, next_s, reward in env.transitions[s, a]:
q += prob * (reward + gamma * V[next_s])
q_values.append(q)
# 贪婪选择最优动作
best_action = np.argmax(q_values)
# 如果当前策略不是最优,则更新
if pi[s, best_action] != 1.0:
pi[s] = 0
pi[s, best_action] = 1.0
improved = True
return improved, pi完整实现
def policy_iteration(env, theta=1e-6, gamma=0.9, max_iterations=1000):
"""
策略迭代算法
返回:
V: 最优价值函数
pi: 最优策略
"""
n_states = env.n_states
n_actions = env.n_actions
# 1. 初始化随机策略
pi = np.ones((n_states, n_actions)) / n_actions
for iteration in range(max_iterations):
# 2. 策略评估
V = evaluate_policy(env, pi, theta, gamma)
# 3. 策略改进
improved, pi = improve_policy(env, V, pi, gamma)
if not improved:
print(f"策略迭代在第 {iteration+1} 次迭代后收敛")
break
return V, pi收敛性证明
策略迭代保证在有限次迭代后收敛,因为:
- 策略改进定理:若 ,则
- 有限性:有限MDP只有有限个策略,改进过程单调,策略数量有限
价值迭代
核心思想
价值迭代直接利用贝尔曼最优方程,从任意初始价值函数开始迭代:
或等价形式:
其中 是贝尔曼最优算子。
算法流程
1. 初始化 V(s) = 0, ∀s
2. 迭代直到收敛:
对每个状态 s:
v_old = V(s)
V(s) = max_a Σ_{s'} P(s'|s,a)[R(s,a,s') + γ V(s')]
delta = max(delta, |v_old - V(s)|)
如果 delta < θ,停止
3. 从收敛的V提取最优策略:
π(s) = argmax_a Σ_{s'} P(s'|s,a)[R(s,a,s') + γ V(s')]
Python实现
def value_iteration(env, theta=1e-6, gamma=0.9, max_iterations=1000):
"""
价值迭代算法
返回:
V: 最优价值函数
pi: 最优策略
"""
n_states = env.n_states
n_actions = env.n_actions
# 1. 初始化
V = np.zeros(n_states)
# 2. 迭代
for iteration in range(max_iterations):
delta = 0
V_new = np.zeros(n_states)
for s in range(n_states):
v = float('-inf')
for a in range(n_actions):
q = 0
for prob, next_s, reward in env.transitions[s, a]:
q += prob * (reward + gamma * V[next_s])
v = max(v, q)
V_new[s] = v
delta = max(delta, abs(v - V[s]))
V = V_new
if delta < theta:
print(f"价值迭代在第 {iteration+1} 次迭代后收敛")
break
# 3. 提取策略
pi = np.zeros((n_states, n_actions))
for s in range(n_states):
q_values = []
for a in range(n_actions):
q = 0
for prob, next_s, reward in env.transitions[s, a]:
q += prob * (reward + gamma * V[next_s])
q_values.append(q)
best_action = np.argmax(q_values)
pi[s, best_action] = 1.0
return V, pi收敛性证明
价值迭代的收敛性基于贝尔曼最优算子 的压缩映射性质:
其中 ,因此 是压缩映射,存在唯一不动点 ,且迭代以几何级数速度收敛。
两种算法的对比
收敛速度
| 方面 | 策略迭代 | 价值迭代 |
|---|---|---|
| 每次迭代复杂度 | 或 | |
| 迭代次数 | 通常较少 | 可能较多 |
| 收敛判断 | 策略不再变化 | 价值变化小于 |
何时选择
状态空间小,环境简单 ──→ 策略迭代(可能只需几次迭代)
状态空间大,精确价值函数重要 ──→ 价值迭代
需要完整价值函数 ──→ 价值迭代
只需要最优策略 ──→ 都可以
原地更新(In-place)
两种算法都可以使用原地(in-place)更新加速:
def value_iteration_inplace(env, theta=1e-6, gamma=0.9):
V = np.zeros(env.n_states)
while True:
delta = 0
for s in range(env.n_states):
v_old = V[s]
# 原地更新:使用当前已更新的值
V[s] = max(
sum(prob * (reward + gamma * V[next_s])
for prob, next_s, reward in env.transitions[s, a])
for a in range(env.n_actions)
)
delta = max(delta, abs(v_old - V[s]))
if delta < theta:
break
return V原地更新通常收敛更快,但收敛性证明稍复杂。
示例:FrozenLake环境
FrozenLake是OpenAI Gym的经典环境,用于演示RL算法。
问题描述
SFFF S = 起始安全区
FHFH F = 冰面(安全)
FHHH H = 洞穴(危险,落入重置)
HFFG G = 目标
F = 冰面(安全)
实现
import numpy as np
class FrozenLake:
"""FrozenLake 4x4 环境简化版"""
def __init__(self):
self.n_states = 16
self.n_actions = 4
self.gamma = 0.9
# 状态布局
# 0=S, 1=F, 2=H, 3=G
self.layout = [
[0, 2, 2, 2],
[0, 0, 2, 2],
[2, 0, 0, 2],
[2, 2, 0, 3]
]
# 转移函数
self.P = self._build_transition()
def _build_transition(self):
"""构建转移概率"""
P = np.zeros((self.n_states, self.n_actions, self.n_states))
dirs = [(-1, 0), (1, 0), (0, -1), (0, 1)] # up, down, left, right
for s in range(self.n_states):
row, col = s // 4, s % 4
for a, (dr, dc) in enumerate(dirs):
nr, nc = row + dr, col + dc
# 边界处理
if 0 <= nr < 4 and 0 <= nc < 4:
next_s = nr * 4 + nc
cell = self.layout[nr][nc]
else:
next_s = s
cell = self.layout[row][col]
# 奖励
if cell == 3: # 目标
reward = 1.0
done = True
elif cell == 2: # 洞穴
reward = 0.0
done = True
else:
reward = 0.0
done = False
P[s, a, next_s] = 1.0
return P
def transitions(self, s, a):
"""返回转移列表"""
transitions = []
for next_s in range(self.n_states):
prob = self.P[s, a, next_s]
if prob > 0:
row, col = next_s // 4, next_s % 4
cell = self.layout[row][col]
if cell == 3:
reward = 1.0
elif cell == 2:
reward = 0.0
else:
reward = 0.0
transitions.append((prob, next_s, reward))
return transitions
# 求解
env = FrozenLake()
# 策略迭代
V_pi, pi_pi = policy_iteration(env, theta=1e-8, gamma=0.9)
print("策略迭代结果:")
print(f"V = {V_pi.round(3)}")
# 价值迭代
V_vi, pi_vi = value_iteration(env, theta=1e-8, gamma=0.9)
print("\n价值迭代结果:")
print(f"V = {V_vi.round(3)}")异步动态规划
全量同步更新效率低,异步DP每次只更新一个或少量状态:
就地价值迭代
每个状态顺序更新,使用最新的价值估计:
def async_value_iteration(env, n_updates=100000, gamma=0.9):
V = np.zeros(env.n_states)
for _ in range(n_updates):
s = np.random.randint(env.n_states) # 随机选择状态
v_old = V[s]
# 计算最优动作的Q值
q_values = []
for a in range(env.n_actions):
q = sum(prob * (reward + gamma * V[next_s])
for prob, next_s, reward in env.transitions(s, a))
q_values.append(q)
V[s] = max(q_values)
return V优先级清扫
优先更新”变化大”的状态:
def prioritized_sweeping(env, theta=1e-5, gamma=0.9):
V = np.zeros(env.n_states)
Q = np.zeros((env.n_states, env.n_actions))
# 计算初始Q值
for s in range(env.n_states):
for a in range(env.n_actions):
Q[s, a] = sum(p * (r + gamma * np.max(Q[ns]))
for p, ns, r in env.transitions(s, a))
# 优先队列
heap = []
for s in range(env.n_states):
for a in range(env.n_actions):
delta = abs(Q[s, a] - V[s])
if delta > theta:
heapq.heappush(heap, (-delta, s, a))
while heap:
neg_delta, s, a = heapq.heappop(heap)
# 更新
old_v = V[s]
V[s] = Q[s, a]
# 反向传播:更新前驱状态
for prev_s in env.predecessors[s]:
for prev_a in range(env.n_actions):
q = sum(p * (r + gamma * np.max(Q[ns]))
for p, ns, r in env.transitions(prev_s, prev_a))
delta = abs(q - Q[prev_s, prev_a])
if delta > theta:
Q[prev_s, prev_a] = q
heapq.heappush(heap, (-delta, prev_s, prev_a))
return V局限性
动态规划方法有以下局限:
| 局限性 | 原因 | 解决方案 |
|---|---|---|
| 环境模型未知 | 无法计算期望 | Q-Learning、SARSA |
| 状态空间巨大 | 存储/计算爆炸 | 近似价值函数 |
| 连续状态/动作 | 无法穷举 | 函数近似 |
| 样本效率低 | 需要遍历所有状态 | 模型-free方法 |
详见 Q-Learning 和 DQN。
参考
后续主题
- Q-Learning:无模型环境的价值迭代
- DQN:深度Q网络处理高维状态
- 策略梯度:直接优化策略的方法
Footnotes
-
Sutton & Barto, “Reinforcement Learning: An Introduction”, 2nd Edition, 2018, Chapter 4 ↩