概述

扩散概率模型(Diffusion Probabilistic Models)已成为生成式AI的核心技术之一。然而,其理论收敛性质长期缺乏严格的数学分析。近年来,研究者在扩散模型的最优传输理论框架下取得了突破性进展,建立了级别的收敛率保证。1

本文系统梳理扩散模型收敛理论与最优传输的核心内容:

  • 最优传输理论基础
  • 扩散模型作为Wasserstein梯度流
  • 收敛率证明
  • 采样步数优化策略

最优传输理论基础

Wasserstein距离

Wasserstein距离是概率分布间距离的一种度量,比KL散度更适合描述分布间的”几何”差异。

对于 -Wasserstein距离定义为:

其中:

  • 是所有以 为源分布、 为目标分布的联合分布(传输计划)
  • 是底层度量(如欧氏距离)

1-Wasserstein距离的性质

1-Wasserstein距离有重要的对偶表示(Kantorovich对偶):

其中 是Lipschitz常数。

2-Wasserstein距离与黎曼几何

的欧氏度量下,2-Wasserstein距离有显式公式:

其中 是最优传输映射(当 满足某些正则性条件时存在)。

熵正则化最优传输:Sinkhorn

熵正则化最优传输定义为:

其中 是成本矩阵, 是熵,正则化参数

Sinkhorn算法通过交替投影求解:

import torch
import numpy as np
 
def sinkhorn(a, b, C, epsilon, num_iters=100):
    """
    Sinkhorn算法求解熵正则化最优传输
    
    参数:
        a: 源分布 (n,)
        b: 目标分布 (m,)
        C: 成本矩阵 (n, m)
        epsilon: 正则化参数
        num_iters: 迭代次数
    
    返回:
        gamma: 最优传输计划 (n, m)
    """
    n, m = len(a), len(b)
    
    # Kernel矩阵
    K = torch.exp(-C / epsilon)
    
    # 缩放因子初始化
    u = torch.ones(n) / n
    v = torch.ones(m) / m
    
    # 交替投影
    for _ in range(num_iters):
        u_new = a / (K @ v + 1e-8)
        v_new = b / (K.T @ u_new + 1e-8)
        
        u = u_new
        v = v_new
    
    # 构建传输计划
    gamma = u.view(-1, 1) * K * v.view(1, -1)
    
    return gamma
 
 
def compute_wasserstein_distance_2d(mu_samples, nu_samples):
    """
    经验2-Wasserstein距离计算(使用scipy的emd)
    """
    from scipy.stats import wasserstein_distance
    
    # 简化为1D情况
    if mu_samples.ndim == 1 and nu_samples.ndim == 1:
        return wasserstein_distance(mu_samples, nu_samples, 
                                     u_weights=None, v_weights=None)
    
    # 高维情况:使用scipy的emd2d
    from scipy.stats import transport_problem
    n, m = len(mu_samples), len(nu_samples)
    weights_mu = np.ones(n) / n
    weights_nu = np.ones(m) / m
    
    # 计算成本矩阵
    C = np.zeros((n, m))
    for i in range(n):
        for j in range(m):
            C[i, j] = np.sum((mu_samples[i] - nu_samples[j]) ** 2)
    
    # 求解最优传输问题
    from scipy.optimize import linear_sum_assignment
    row_ind, col_ind = linear_sum_assignment(C)
    
    return np.sqrt(C[row_ind, col_ind].sum() / n)

扩散模型的最优传输视角

去噪扩散作为最优传输

考虑前向扩散过程 和反向生成过程

前向过程(已知):

反向过程(学习):

得分匹配与Wasserstein梯度流

去噪得分匹配(DSM)目标为:

其中 是学习的得分函数。

核心洞察:去噪过程等价于在Wasserstein空间中的梯度流

其中速度场 由得分函数决定。

半软最优传输框架

研究者提出了扩散模型的半软最优传输(Semi-Relaxed Optimal Transport, SROT)解释:

为数据分布, 为生成分布。SROT目标为:

其中 是生成器, 是投影, 是熵正则项。


收敛率分析

主要定理

定理(De Bortoli et al., 2024):设数据分布 满足 ,前向过程方差 。则存在常数 ,使得对于任意

证明框架

步骤1:建立前向-反向过程的传输映射

是前向过程, 是反向过程。构造耦合 使得边缘分布正确。

步骤2:伊藤引理分析

,定义 。应用伊藤引理:

其中漂移项和扩散项满足特定条件。

步骤3:Bismut-Elworthy-Li公式

使用Bismut-Elworthy-Li公式估计Wasserstein距离:

关键引理

引理1(方差估计):对于高斯过渡核 ,有:

引理2(梯度有界):若得分函数满足 Lipschitz 条件:

则收敛率 可达。

采样复杂度

推论:为达到 -近似(),所需采样步数:

这与直觉一致:高维空间 ( 大) 需要更多步数,精度要求高 ( 小) 也需要更多步数。


采样步数优化

自适应步长策略

传统均匀步长 可能非最优。自适应策略考虑局部曲率:

import torch
import torch.nn.functional as F
 
class AdaptiveDiffusionScheduler:
    """
    自适应扩散调度器:根据局部曲率调整步长
    
    理论依据:收敛率$O(d/T)$的理论允许非均匀步长
    """
    def __init__(self, T, d_model, schedule_type='cosine'):
        self.T = T
        self.d_model = d_model
        self.schedule_type = schedule_type
        
        # 计算基础调度
        self.betas = self._compute_base_schedule()
        
        # 计算累积SNR(Signal-to-Noise Ratio)
        self.alphas = 1 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        self.snr = self.alphas_cumprod / (1 - self.alphas_cumprod)
    
    def _compute_base_schedule(self):
        """计算基础噪声调度"""
        if self.schedule_type == 'linear':
            return torch.linspace(0.0001, 0.02, self.T)
        elif self.schedule_type == 'cosine':
            # cosine调度:更平滑的噪声增加
            steps = self.T + 1
            x = torch.linspace(0, self.T, steps)
            alphas_cumprod = torch.cos(((x / self.T) + 0.008) / 1.008 * torch.pi * 0.5) ** 2
            alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
            betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
            return torch.clip(betas, 0.0001, 0.02)
        else:
            raise ValueError(f"Unknown schedule: {self.schedule_type}")
    
    def compute_adaptive_step_sizes(self, curvature_estimates):
        """
        根据曲率估计调整步长
        
        参数:
            curvature_estimates: 每个时间步的曲率估计 (T,)
        
        返回:
            adjusted_betas: 调整后的噪声调度 (T,)
        """
        # 曲率越大,步长应越小
        curvature_normalized = curvature_estimates / curvature_estimates.mean()
        
        # 调整:曲率大的时间步用更小的beta
        adjusted_betas = self.betas / (1 + 0.5 * (curvature_normalized - 1))
        
        return adjusted_betas
    
    def get_schedule(self, model_curvature=None):
        """
        获取最终调度
        """
        if model_curvature is not None:
            return self.compute_adaptive_step_sizes(model_curvature)
        return self.betas
 
 
def estimate_local_curvature(model, x_noisy, t, noise):
    """
    估计损失 landscape 的局部曲率
    
    使用Hessian-vector products近似
    """
    model.eval()
    
    # 前向传播
    output = model(x_noisy, t)
    loss = F.mse_loss(output, noise)
    
    # 计算梯度
    grad = torch.autograd.grad(loss, model.parameters(), create_graph=True)
    
    # Hessian-vector product近似
    v = [torch.randn_like(g) for g in grad]
    hvp = torch.autograd.grad(grad, model.parameters(), grad_outputs=v)
    
    # 计算有效曲率(梯度范数/HVP范数比)
    grad_norm = sum(g.norm() for g in grad)
    hvp_norm = sum(h.norm() for h in hvp if h is not None)
    
    curvature = grad_norm / (hvp_norm + 1e-8)
    
    return curvature

快速采样策略

DDIM(Denoising Diffusion Implicit Models)

DDIM通过非马尔可夫反向过程加速采样:

def ddim_step(x_t, t, t_prev, pred_noise, eta=0.0):
    """
    DDIM单步采样
    
    参数:
        x_t: 当前含噪图像
        t: 当前时间步
        t_prev: 前一时间步
        pred_noise: 预测的噪声
        eta: 随机性参数 (eta=0为确定性)
    """
    # 计算预测的清洁图像
    alpha_t = compute_alpha(t)
    alpha_t_prev = compute_alpha(t_prev)
    
    # 预测方向
    pred_x0 = (x_t - pred_noise * torch.sqrt(1 - alpha_t)) / torch.sqrt(alpha_t)
    
    # 方向向量
    dir_xt = torch.sqrt(1 - alpha_t_prev) * pred_noise
    
    # 随机噪声项
    if eta > 0:
        sigma_t = eta * torch.sqrt((1 - alpha_t_prev) / (1 - alpha_t)) * torch.sqrt(1 - alpha_t / alpha_t_prev)
        noise = torch.randn_like(x_t)
    else:
        sigma_t = 0
        noise = 0
    
    # DDIM更新
    x_t_prev = torch.sqrt(alpha_t_prev) * pred_x0 + dir_xt + sigma_t * noise
    
    return x_t_prev
 
 
def compute_alpha(t, schedule='cosine'):
    """计算累积alpha"""
    if schedule == 'cosine':
        # cosine schedule
        return torch.cos((t + 0.008) / 1.008 * torch.pi * 0.5) ** 2
    else:
        # linear schedule
        return 1 - 0.02 * t

DPM-Solver

DPM-Solver使用高阶ODE求解器:

class DPMSolver:
    """
    DPM-Solver:扩散模型的常微分方程求解器
    
    理论上保证$O(1/T)$的收敛率
    """
    def __init__(self, model, alphas_cumprod):
        self.model = model
        self.alphas_cumprod = alphas_cumprod
    
    def ddim_step(self, x, t, dt, pred_noise):
        """
        DDIM风格单步(用于DPM-Solver)
        """
        alpha_t = self.alphas_cumprod[t]
        alpha_s = self.alphas_cumprod[t - dt]
        
        # 预测x0
        pred_x0 = (x - pred_noise * torch.sqrt(1 - alpha_t)) / torch.sqrt(alpha_t)
        
        # 更新
        x_s = torch.sqrt(alpha_s) * pred_x0 + \
              torch.sqrt(1 - alpha_s) * pred_noise
        
        return x_s
    
    def multistep_dpm_solver(self, x_T, timesteps, order=2):
        """
        多步DPM-Solver
        
        参数:
            x_T: 初始噪声
            timesteps: 时间步序列
            order: 求解器阶数 (1, 2, 或 3)
        """
        x = x_T
        
        for i in range(len(timesteps)):
            t = timesteps[i]
            dt = 1.0 / len(timesteps)
            
            # 预测噪声
            pred_noise = self.model(x, t * torch.ones(x.shape[0]))
            
            if order == 1:
                # 一阶:DDIM
                x = self.ddim_step(x, t, dt, pred_noise)
            
            elif order == 2:
                # 二阶:使用两个噪声估计
                if i < len(timesteps) - 1:
                    # 预演一步
                    x_temp = self.ddim_step(x, t, dt, pred_noise)
                    pred_noise_2 = self.model(x_temp, (t - dt) * torch.ones(x.shape[0]))
                    
                    # Heun方法
                    x = self.ddim_step(x, t, dt, (pred_noise + pred_noise_2) / 2)
                else:
                    x = self.ddim_step(x, t, dt, pred_noise)
            
            elif order == 3:
                # 三阶:经典Runge-Kutta
                # 实现省略,原理类似
                x = self.ddim_step(x, t, dt, pred_noise)
        
        return x

与Flow Matching的关系

Flow Matching框架

Flow Matching提出了一种统一生成模型的框架,扩散模型是其特例。

定义速度场 ,概率路径 满足:

最优速度场通过Flow Matching目标学习:

连续归一化流的Wasserstein梯度流

归一化流满足确定性ODE:

其解轨迹在Wasserstein空间中沿损失函数的梯度流演化。

扩散与Flow Matching的统一

定理:设 为扩散模型诱导的概率路径, 为Flow Matching路径。则:

这解释了为什么两者在实践中表现相似。


实践建议

采样步数选择

目标质量最小步数 推荐步数方法
快速预览10-20DDIM确定性,少量伪影
标准质量50-100DPM-Solver-2平衡速度与质量
高质量200-500DPM-Solver-3 / 原生DDPM最佳质量
超高质量1000+原生DDPM最慢,质量上限

调度策略选择

def choose_schedule(dataset_size, compute_budget, target_quality):
    """
    根据资源选择最优调度策略
    """
    if compute_budget < 100:
        return 'ddim_20steps'
    elif compute_budget < 500:
        if target_quality == 'high':
            return 'dpm_solver_2_100steps'
        else:
            return 'ddim_50steps'
    elif compute_budget < 2000:
        return 'dpm_solver_3_200steps'
    else:
        return 'ddpm_1000steps'

数学附录

核心公式速查

概念公式说明
Wasserstein-2距离平方积分最优传输
熵正则化OTSinkhorn目标
得分函数对数密度梯度
DDPM损失去噪匹配
Flow Matching速度场匹配

收敛率比较

方法收敛率步数复杂度备注
原生DDPM最基础
DDIM确定性加速
DPM-Solver-2二阶收敛
DPM-Solver-3三阶收敛
理论最优基于保证

参考文献


相关主题

Footnotes

  1. De Bortoli, V., et al. (2024). Diffusion models with convergence rate. NeurIPS 2024.