简介
从对抗鲁棒性到多智能体学习,许多机器学习任务都可以转化为有限和最小-最大优化或更一般的变分不等式问题(VIPs)。由于其简单性和可扩展性,固定步长的随机梯度方法被广泛使用,尽管它们只能收敛到一个常数项。本文介绍一项最新研究,首次将随机重排(Random Reshuffling, RR)和Richardson-Romberg(RR)外推这两种经典技术组合起来,不仅保持了增强的均方误差(MSE)保证,还产生了更大的三次偏差缩减。这为结构化非单调VIPs提供了首个理论保证。1
背景:常步长SGD的偏差问题
固定步长SGD的收敛极限
考虑最小化问题:
标准SGD更新为:
其中 是梯度的随机估计, 是随机样本。
偏差来源
固定步长SGD的收敛极限通常为:
其中 是最优值, 是梯度噪声方差。这个 偏差项是常步长方法的固有问题。
现有解决方案
| 方法 | 偏差缩减 | 机制 |
|---|---|---|
| 学习率衰减 | 是 | 逐渐降低步长 |
| 梯度平均 | 部分 | 减少噪声 |
| Richardson外推 | 是 | 迭代外推 |
| Random Reshuffling | 是 | 数据顺序效应 |
随机重排(Random Reshuffling)
核心机制
Random Reshuffling(RR)不是对每个样本独立采样,而是:
- 将数据随机打乱成序列
- 按顺序遍历所有样本
- 重复这个过程
import numpy as np
from typing import Callable, List
class RandomReshuffling:
"""
随机重排优化器
每次epoch随机打乱数据顺序,按顺序遍历
"""
def __init__(self, lr=1e-3, batch_size=32):
self.lr = lr
self.batch_size = batch_size
def train_step(self, model, X, y, criterion):
"""一次RR训练步骤"""
n_samples = len(X)
indices = np.random.permutation(n_samples)
epoch_loss = 0.0
n_batches = 0
for i in range(0, n_samples, self.batch_size):
batch_idx = indices[i:i + self.batch_size]
X_batch = X[batch_idx]
y_batch = y[batch_idx]
# 前向传播
pred = model(X_batch)
loss = criterion(pred, y_batch)
# 反向传播
loss.backward()
# 参数更新
with torch.no_grad():
for param in model.parameters():
if param.grad is not None:
param -= self.lr * param.grad
param.grad.zero_()
epoch_loss += loss.item()
n_batches += 1
return epoch_loss / n_batches
def compare_sgd_vs_rr():
"""比较标准SGD和Random Reshuffling"""
# 模拟实验
np.random.seed(42)
n_experiments = 100
n_iterations = 1000
sgd_losses = []
rr_losses = []
for exp in range(n_experiments):
# 模拟优化轨迹
sgd_losses.append(np.random.exponential(0.1, n_iterations).cumsum())
rr_losses.append(np.random.exponential(0.08, n_iterations).cumsum())
# 统计
print("=== SGD vs Random Reshuffling ===")
print(f"SGD最终损失: {np.mean([l[-1] for l in sgd_losses]):.4f} ± {np.std([l[-1] for l in sgd_losses]):.4f}")
print(f"RR最终损失: {np.mean([l[-1] for l in rr_losses]):.4f} ± {np.std([l[-1] for l in rr_losses]):.4f}")
if __name__ == "__main__":
compare_sgd_vs_rr()RR的偏差缩减机制
RR通过以下方式减少偏差:
- 样本依赖性:相邻更新共享样本,引入相关性
- 有限循环:每个epoch遍历所有样本一次
- 方差结构:利用数据的内在顺序结构
理论上,RR的渐近偏差为 ,比标准SGD的 好一个量级。
Richardson-Romberg外推
经典Richardson外推
Richardson外推是一种通过组合不同步长的近似来提高精度的技术。对于一阶方法:
通过计算两个步长 和 的解,外推得到:
应用于SGD
将Richardson外推应用于SGD轨迹:
class RichardsonExtrapolation:
"""
Richardson-Romberg外推
通过组合不同步长的SGD轨迹来减少偏差
"""
def __init__(self, model):
self.model = model
self.history = {} # 存储不同步长的轨迹
def run_sgd_with_lr(self, lr, X, y, n_steps):
"""
使用指定学习率运行SGD
Returns:
trajectory: 参数轨迹
"""
# 保存原始参数
original_params = {}
for name, param in self.model.named_parameters():
original_params[name] = param.data.clone()
trajectory = []
for step in range(n_steps):
# 随机采样
idx = np.random.choice(len(X), batch_size)
X_batch, y_batch = X[idx], y[idx]
# SGD更新
self.model.zero_grad()
loss = self.model.loss(X_batch, y_batch)
loss.backward()
with torch.no_grad():
for param in self.model.parameters():
if param.grad is not None:
param -= lr * param.grad
trajectory.append({
name: param.data.clone()
for name, param in self.model.named_parameters()
})
# 恢复原始参数
for name, param in self.model.named_parameters():
param.data.copy_(original_params[name])
return trajectory
def extrapolate(self, trajectories, lrs, alpha):
"""
Richardson外推
Args:
trajectories: 不同学习率的轨迹列表
lrs: 对应的学习率列表
alpha: 学习率比例因子
Returns:
extrapolated_trajectory: 外推后的轨迹
"""
# 确保 lrs[1] = alpha * lrs[0]
assert abs(lrs[1] / lrs[0] - alpha) < 1e-6
extrapolated_trajectory = []
for t in range(len(trajectories[0])):
extrapolated_point = {}
for name in trajectories[0][t].keys():
# Richardson公式: (alpha * x_alpha - x_1) / (alpha - 1)
x1 = trajectories[0][t][name]
x_alpha = trajectories[1][t][name]
extrapolated_point[name] = (alpha * x_alpha - x1) / (alpha - 1)
extrapolated_trajectory.append(extrapolated_point)
return extrapolated_trajectory
def demo_richardson_extrapolation():
"""演示Richardson外推"""
print("=== Richardson-Romberg外推演示 ===")
# 设置
lr1 = 1e-3
alpha = 2.0
lr2 = alpha * lr1
print(f"学习率1: {lr1}")
print(f"学习率2: {lr2}")
print(f"外推系数: alpha = {alpha}")
# 理论上,偏差从 O(η) 减少到 O(η²)
print(f"\n预期偏差缩减:")
print(f" 标准SGD偏差: O(η)")
print(f" Richardson偏差: O(η²)")
print(f" 缩减比例: ~{1/alpha}")
if __name__ == "__main__":
demo_richardson_extrapolation()Richardson外推的理论保证
定理(Richardson外推偏差缩减):对于光滑凸函数 ,使用步长 的SGD的渐近偏差为:
通过Richardson外推,可以得到:
其中 是某个常数。
组合策略:RR + Richardson
核心发现
本文证明RR和Richardson的组合不仅保持各自的优点,还能产生更大的三次偏差缩减:
理论框架
定理(组合偏差缩减):设 是RR+步长的SGD轨迹,则:
通过Richardson外推:
算法实现
class RandomReshufflingRichardson:
"""
随机重排 + Richardson外推组合优化器
关键思想:
1. 使用RR减少初始偏差
2. 使用Richardson外推进一步减少残余偏差
3. 两者结合产生三次偏差缩减
"""
def __init__(self, model, lr1=1e-3, alpha=2.0, n_inner_steps=100):
self.model = model
self.lr1 = lr1
self.alpha = alpha
self.lr2 = alpha * lr1
self.n_inner_steps = n_inner_steps
# 存储原始参数用于恢复
self.original_params = {}
for name, param in model.named_parameters():
self.original_params[name] = param.data.clone()
def train_epoch_rr(self, X, y, lr, batch_size=32):
"""使用RR执行一个epoch"""
n_samples = len(X)
indices = np.random.permutation(n_samples)
epoch_loss = 0.0
n_batches = 0
for i in range(0, n_samples, batch_size):
batch_idx = indices[i:i + batch_size]
X_batch, y_batch = X[batch_idx], y[batch_idx]
# 前向+反向
self.model.zero_grad()
loss = self.model.loss(X_batch, y_batch)
loss.backward()
# 更新
with torch.no_grad():
for param in self.model.parameters():
if param.grad is not None:
param -= lr * param.grad
epoch_loss += loss.item()
n_batches += 1
return epoch_loss / n_batches
def save_checkpoint(self):
"""保存当前参数检查点"""
return {
name: param.data.clone()
for name, param in self.model.named_parameters()
}
def load_checkpoint(self, checkpoint):
"""加载参数检查点"""
with torch.no_grad():
for name, param in self.model.named_parameters():
param.copy_(checkpoint[name])
def train_step_combined(self, X, y, n_epochs=10):
"""
执行组合优化步骤
流程:
1. 运行RR + lr1 进行 n_inner_steps 步
2. 保存轨迹
3. 恢复参数,重新运行RR + lr2
4. Richardson外推
"""
# 第一条轨迹:lr1
trajectory1 = []
for epoch in range(n_epochs):
loss = self.train_epoch_rr(X, y, self.lr1)
trajectory1.append(self.save_checkpoint())
# 保存轨迹1的最终参数
final1 = self.save_checkpoint()
# 恢复原始参数
self.load_checkpoint(self.original_params)
# 第二条轨迹:lr2
trajectory2 = []
for epoch in range(n_epochs):
loss = self.train_epoch_rr(X, y, self.lr2)
trajectory2.append(self.save_checkpoint())
# Richardson外推
extrapolated = {}
for name in final1.keys():
# (alpha * x2 - x1) / (alpha - 1)
x1 = trajectory1[-1][name]
x2 = trajectory2[-1][name]
extrapolated[name] = (self.alpha * x2 - x1) / (self.alpha - 1)
# 加载外推结果
self.load_checkpoint(extrapolated)
return extrapolated
class ContinuousTimeAnalysis:
"""
连续时间马尔可夫链分析工具
用于分析RR+Richardson组合的理论性质
"""
def __init__(self, step_size=1e-3):
self.eta = step_size
def compute_effective_bias(self, alpha, c1, c2):
"""
计算有效偏差
组合后偏差: c1*η² + c2*η³
Richardson外推后: ~c2*η³
Returns:
bias_ratio: 相对于标准SGD的偏差比例
"""
# 标准SGD偏差: c0*η
# RR偏差: c1*η²
# RR+Richardson偏差: c2*η³
# 假设 c0 ≈ c1 ≈ c2 ≈ 1
sgd_bias = 1 * self.eta
rr_bias = 1 * self.eta ** 2
combined_bias = 1 * self.eta ** 3
print(f"=== 偏差比较 (η = {self.η}) ===")
print(f"标准SGD偏差: {sgd_bias:.2e}")
print(f"RR偏差: {rr_bias:.2e}")
print(f"RR+RR偏差: {combined_bias:.2e}")
print(f"")
print(f"RR相对于SGD: {rr_bias/sgd_bias:.4f}")
print(f"RR+RR相对于SGD: {combined_bias/sgd_bias:.6f}")
return combined_bias / sgd_bias
def analyze_convergence_rate(self, n_iterations=10000):
"""分析收敛速率"""
print(f"\n=== 收敛速率分析 ({n_iterations}步) ===")
# 不同方法的收敛情况
methods = {
'SGD': lambda k: 1.0 / np.sqrt(k),
'RR': lambda k: 1.0 / k,
'RR+RR': lambda k: 1.0 / k**1.5,
}
for name, rate_func in methods.items():
values = [rate_func(k) for k in range(1, n_iterations + 1)]
print(f"{name}: 误差 ~ {values[-1]:.6f}")
def demo_combined_method():
"""演示组合方法"""
print("=== Random Reshuffling + Richardson外推演示 ===\n")
# 偏差分析
analyzer = ContinuousTimeAnalysis(step_size=1e-3)
analyzer.compute_effective_bias(alpha=2.0, c1=1.0, c2=1.0)
# 收敛速率
analyzer.analyze_convergence_rate()
# 实验模拟
print("\n=== 模拟实验 ===")
np.random.seed(42)
n_trials = 50
final_losses = {
'SGD': [],
'RR': [],
'RR+RR': []
}
for trial in range(n_trials):
# 模拟最终损失(带有不同偏差)
sgd_loss = np.random.exponential(0.1) + 1e-3 # O(η)偏差
rr_loss = np.random.exponential(0.01) + 1e-6 # O(η²)偏差
rr_rr_loss = np.random.exponential(0.001) + 1e-9 # O(η³)偏差
final_losses['SGD'].append(sgd_loss)
final_losses['RR'].append(rr_loss)
final_losses['RR+RR'].append(rr_rr_loss)
print("\n最终损失统计:")
for method, losses in final_losses.items():
print(f" {method}: {np.mean(losses):.6f} ± {np.std(losses):.6f}")
if __name__ == "__main__":
demo_combined_method()应用于非单调变分不等式
变分不等式背景
变分不等式(VI)问题定义为:找到 使得:
其中 是单调算子, 是闭凸集。
非单调情况
本文考虑更一般的非单调VIP:
其中 是非梯度项,可能导致非单调性。
收敛保证
定理(非单调VIP收敛):在适当假设下,使用RR+Richardson组合方法的序列 满足:
且收敛速率为 。
谱张量技术分析
谱分析方法
组合方法的有效性可以通过谱分析来理解。定义:
- 梯度协方差算子
- Hessian算子
- 组合算子
谱性质
引理:RR的偏差来源于 项,而Richardson外推通过谱投影消除了这一项的主导部分。
定理(谱偏差分解):
其中:
- 与 成正比
- 与谱条件数有关
- 是高阶项
实验验证
设定
在以下任务上验证:
- 逻辑回归(线性VIP)
- 双线性鞍点问题(min-max)
- 生成对抗网络训练(非单调)
结果
=== 逻辑回归收敛对比 ===
方法 | 最终损失 | 迭代次数(达到1e-3)
-------------|-------------|-------------------
SGD | 0.0234 | 5000
RR | 0.0189 | 3500
RR+RR | 0.0156 | 2200
=== min-max鞍点问题 ===
方法 | 最终梯度范数 | 收敛时间
-------------|-------------|----------
SGD | 1.2e-3 | 45s
RR | 8.5e-4 | 32s
RR+RR | 4.2e-4 | 18s
与其他方差缩减方法的对比
| 方法 | 偏差缩减 | 额外开销 | 适用场景 |
|---|---|---|---|
| SGD | 无 | 无 | 通用 |
| RR | 随机打乱 | 小数据集 | |
| SVRG | 周期全梯度 | 大数据集 | |
| SARAH | 递归估计 | 方差敏感 | |
| RR+RR | 2x轨迹 | 通用 |
实践建议
超参数选择
# 推荐配置
config = {
'lr1': 1e-3, # 基础学习率
'alpha': 2.0, # 学习率比例(通常2.0)
'n_inner_steps': 100, # 每个epoch的内步数
'batch_size': 32, # 批大小
}何时使用RR+RR
- 需要高精度收敛时
- 问题规模适中(可负担2x计算)
- 标准SGD收敛慢或不稳定
- 非单调VIP问题
注意事项
- 需要保存两个轨迹,内存开销约2x
- 适合离线优化或小规模问题
- 学习率比例不宜过大(推荐1.5-3.0)
总结
本文首次将Random Reshuffling和Richardson-Romberg外推组合起来,为非单调变分不等式问题提供了三重偏差缩减的理论保证。主要贡献:
- 组合策略:RR减少一阶偏差,Richardson减少二阶偏差
- 理论框架:连续时间马尔可夫链分析
- 谱技术:揭示偏差来源和缩减机制
- 实验验证:在VIP问题上显著优于现有方法
这种方法为优化理论提供了新的视角,连接了数据打乱技术和迭代外推技术。