MCMC方法

马尔可夫链蒙特卡洛(Markov Chain Monte Carlo, MCMC)是一类通过构造马尔可夫链来进行概率采样的强大方法。在贝叶斯推断中,MCMC用于从复杂后验分布中采样,是统计推断的核心工具。1

一、背景与动机

为什么需要MCMC

在贝叶斯推断中,我们关心后验分布:

问题

  • 归一化常数 通常无法解析计算
  • 当参数维度高、分布复杂时,直接采样困难

MCMC的核心思想:构造一个马尔可夫链,使其平稳分布恰好是我们想要采样的目标分布。

蒙特卡洛基础回顾

蒙特卡洛方法通过随机采样估计期望:

简单采样

  • 逆变换采样
  • 拒绝采样
  • 重要性采样

详见 信息论

二、马尔可夫链基础

马尔可夫链定义

离散时间马尔可夫链由状态空间 和转移核 描述:

马尔可夫性:未来只依赖于当前状态,与历史无关。

平稳分布

分布 是马尔可夫链的平稳分布,若满足:

或矩阵形式:

细致平稳条件

细致平稳条件(Detailed Balance)是证明平稳分布的充分条件:

定理:若转移核满足细致平稳条件,则 是平稳分布。

随机游走

最简单的马尔可夫链:对称随机游走

对称性:,所以任意分布都是其平稳分布。

三、Metropolis-Hastings算法

算法原理

Metropolis-Hastings (MH) 算法通过接受-拒绝机制构造满足细致平稳条件的链。

核心思想

  1. 从提议分布 采样候选状态
  2. 以概率 接受, 拒绝
  3. 接受率确保细致平稳条件

接受率推导

目标:

,代入:

解得接受率:

算法流程

def metropolis_hastings(p_log_pdf, q_sample, q_log_pdf, x0, n_samples, burn_in=1000):
    """
    Metropolis-Hastings采样算法
    
    Args:
        p_log_pdf: 目标分布的对数密度(无需归一化)
        q_sample: 提议分布的采样函数
        q_log_pdf: 提议分布的对数密度
        x0: 初始状态
        n_samples: 样本数量
        burn_in: burn-in期(丢弃的样本)
    
    Returns:
        samples: 采样序列
    """
    samples = []
    x = x0
    
    for i in range(n_samples + burn_in):
        # 1. 从提议分布采样候选状态
        x_proposed = q_sample(x)
        
        # 2. 计算接受率
        log_accept_prob = (
            p_log_pdf(x_proposed) + q_log_pdf(x, x_proposed) -
            p_log_pdf(x) - q_log_pdf(x_proposed, x)
        )
        log_accept_prob = min(0, log_accept_prob)  # 截断
        
        # 3. 接受或拒绝
        if np.log(np.random.random()) < log_accept_prob:
            x = x_proposed
        
        # 4. 保存样本(跳过burn-in)
        if i >= burn_in:
            samples.append(x)
    
    return np.array(samples)

提议分布选择

对称提议:Metropolis算法

(对称),接受率简化为:

class RandomWalkMetropolis:
    """随机游走Metropolis算法"""
    
    def __init__(self, target_log_pdf, proposal_std=1.0):
        self.target_log_pdf = target_log_pdf
        self.proposal_std = proposal_std
    
    def sample(self, x0, n_samples, burn_in=1000):
        samples = []
        x = x0
        
        for _ in range(n_samples + burn_in):
            # 提议分布:各向同性高斯
            x_proposed = x + np.random.randn(len(x)) * self.proposal_std
            
            # 接受率
            log_alpha = self.target_log_pdf(x_proposed) - self.target_log_pdf(x)
            log_alpha = min(0, log_alpha)
            
            if np.log(np.random.random()) < log_alpha:
                x = x_proposed
            
            samples.append(x)
        
        return np.array(samples[burn_in:])

非对称提议:通用MH

class AdaptiveMH:
    """自适应提议的MH算法"""
    
    def __init__(self, target_log_pdf, proposal_fns, proposal_probs):
        """
        Args:
            proposal_fns: 提议分布采样函数列表
            proposal_probs: 各提议分布的选择概率
        """
        self.target_log_pdf = target_log_pdf
        self.proposal_fns = proposal_fns
        self.proposal_probs = proposal_probs
    
    def sample(self, x0, n_samples, burn_in=1000):
        samples = []
        x = x0
        
        for _ in range(n_samples + burn_in):
            # 选择提议分布
            k = np.random.choice(len(self.proposal_fns), p=self.proposal_probs)
            
            # 采样
            x_proposed = self.proposal_fns[k](x)
            
            # 计算接受率(需要提议分布的对数密度)
            log_alpha = (
                self.target_log_pdf(x_proposed) + 
                self.proposal_fns[k].log_pdf(x, x_proposed) -
                self.target_log_pdf(x) -
                self.proposal_fns[k].log_pdf(x_proposed, x)
            )
            log_alpha = min(0, log_alpha)
            
            if np.log(np.random.random()) < log_alpha:
                x = x_proposed
            
            samples.append(x)
        
        return np.array(samples[burn_in:])

收敛诊断

自相关与有效样本量

def compute_autocorrelation(samples, max_lag=50):
    """计算自相关函数"""
    n = len(samples)
    mean = np.mean(samples)
    var = np.var(samples)
    
    acf = []
    for lag in range(max_lag):
        if lag == 0:
            acf.append(1.0)
        else:
            cov = np.mean((samples[:-lag] - mean) * (samples[lag:] - mean))
            acf.append(cov / var)
    
    return np.array(acf)
 
def effective_sample_size(samples):
    """
    计算有效样本量 (ESS)
    
    ESS = N / (1 + 2 * sum(ACF))
    """
    n = len(samples)
    acf = compute_autocorrelation(samples, max_lag=n//2)
    
    # 找到ACF首次变负的位置
    try:
        cutoff = np.where(acf < 0)[0][0]
    except:
        cutoff = len(acf)
    
    rho_sum = np.sum(acf[1:cutoff])
    ess = n / (1 + 2 * rho_sum)
    
    return int(ess)

Gelman-Rubin诊断

def gelman_rubin_diagnostic(chains, eps=1e-6):
    """
    Gelman-Rubin R 统计量
    
    R ≈ 1 表示收敛
    
    Args:
        chains: list of numpy arrays, 每个链的样本
    """
    m = len(chains)  # 链数量
    n = len(chains[0])  # 每链样本数
    
    # 计算链间方差
    chain_means = np.array([np.mean(c) for c in chains])
    B = n * np.var(chain_means, ddof=1)
    
    # 计算链内方差
    W = np.mean([np.var(c, ddof=1) for c in chains])
    
    # 合并方差估计
    var_hat = (1 - 1/n) * W + (1/m) * B
    
    # R 统计量
    R = np.sqrt(var_hat / W)
    
    return R

四、Gibbs采样

算法原理

Gibbs采样是MH算法的特例,适用于条件分布容易采样的情形。

核心思想:在高维空间中,依次对每个维度条件采样:

为什么Gibbs采样有效

,其中 表示除 外的所有变量。

条件分布作为提议

接受率

因为 (都是条件分布),所以接受率恒为1。

算法实现

class GibbsSampler:
    """通用Gibbs采样器"""
    
    def __init__(self, conditional_samplers, initial_state):
        """
        Args:
            conditional_samplers: dict, {变量名: 条件采样函数}
            initial_state: dict, 初始状态
        """
        self.cond_samplers = conditional_samplers
        self.state = initial_state.copy()
    
    def sample(self, n_samples, variables=None):
        """
        Gibbs采样
        
        Args:
            n_samples: 采样数量
            variables: 要更新的变量列表(None表示全部)
        
        Returns:
            samples: {变量名: 样本数组}
        """
        if variables is None:
            variables = list(self.cond_samplers.keys())
        
        samples = {var: [] for var in variables}
        
        for _ in range(n_samples):
            for var in variables:
                # 从条件分布采样
                new_value = self.cond_samplers[var](self.state, var)
                self.state[var] = new_value
                samples[var].append(new_value)
        
        return {var: np.array(vals) for var, vals in samples.items()}

二维高斯示例

def gaussian_gibbs_sampler(mean, cov, n_samples, burn_in=1000):
    """
    二维高斯分布的Gibbs采样
    
    目标分布:N(μ, Σ),其中
    Σ = [[σ₁², ρσ₁σ₂],
         [ρσ₁σ₂, σ₂²]]
    """
    mu1, mu2 = mean
    s1, s2 = np.sqrt(cov[0, 0]), np.sqrt(cov[1, 1])
    rho = cov[0, 1] / (s1 * s2)
    
    samples_x = []
    samples_y = []
    x, y = 0.0, 0.0
    
    for _ in range(n_samples + burn_in):
        # P(x | y) = N(μ₁ + ρ(σ₁/σ₂)(y - μ₂), σ₁²(1-ρ²))
        var_x_given_y = s1**2 * (1 - rho**2)
        mean_x_given_y = mu1 + rho * (s1 / s2) * (y - mu2)
        x = np.random.normal(mean_x_given_y, np.sqrt(var_x_given_y))
        
        # P(y | x) = N(μ₂ + ρ(σ₂/σ₁)(x - μ₁), σ₂²(1-ρ²))
        var_y_given_x = s2**2 * (1 - rho**2)
        mean_y_given_x = mu2 + rho * (s2 / s1) * (x - mu1)
        y = np.random.normal(mean_y_given_x, np.sqrt(var_y_given_x))
        
        if _ >= burn_in:
            samples_x.append(x)
            samples_y.append(y)
    
    return np.array(samples_x), np.array(samples_y)

混合模型示例

class MixtureModelGibbs:
    """
    高斯混合模型的Gibbs采样
    
    模型:
    - z_i ~ Cat(π): 隐变量
    - x_i | z_i=k ~ N(μ_k, σ_k²): 观测
    """
    
    def __init__(self, X, K, alpha=1.0, prior_mu=0, prior_sigma=1):
        self.X = X
        self.n, self.d = X.shape
        self.K = K
        self.alpha = alpha
        
        # 先验参数
        self.prior_mu = prior_mu
        self.prior_sigma = prior_sigma
        
        # 初始化
        self.z = np.random.randint(0, K, self.n)  # 隐变量
        self.mu = np.random.randn(K, self.d) * prior_sigma + prior_mu
        self.sigma = np.ones(K) * prior_sigma
        self.pi = np.ones(K) / K  # 混合系数
    
    def sample_pi(self):
        """采样混合系数 π"""
        counts = np.bincount(self.z, minlength=self.K)
        return np.random.dirichlet(counts + self.alpha)
    
    def sample_mu(self):
        """采样均值 μ"""
        for k in range(self.K):
            n_k = np.sum(self.z == k)
            if n_k == 0:
                self.mu[k] = np.random.randn(self.d) * self.prior_sigma + self.prior_mu
            else:
                x_k = self.X[self.z == k]
                posterior_var = 1 / (n_k / self.sigma[k]**2 + 1 / self.prior_sigma**2)
                posterior_mean = posterior_var * (n_k * x_k.mean(axis=0) / self.sigma[k]**2 + 
                                                    self.prior_mu / self.prior_sigma**2)
                self.mu[k] = np.random.randn(self.d) * np.sqrt(posterior_var) + posterior_mean
    
    def sample_z(self):
        """采样隐变量 z"""
        for i in range(self.n):
            # P(z_i = k | x_i, π, μ, σ) ∝ π_k * N(x_i | μ_k, σ_k²)
            log_probs = np.log(self.pi)
            for k in range(self.K):
                log_probs[k] += -0.5 * np.sum((self.X[i] - self.mu[k])**2) / self.sigma[k]**2
            log_probs -= np.max(log_probs)  # 数值稳定
            probs = np.exp(log_probs)
            probs /= probs.sum()
            self.z[i] = np.random.choice(self.K, p=probs)
    
    def sample(self, n_iterations):
        """运行Gibbs采样"""
        for _ in range(n_iterations):
            self.sample_pi()
            self.sample_mu()
            self.sample_z()
            
            if _ % 100 == 0:
                print(f"Iteration {_}: "
                      f"pi={self.pi[:3].round(3)}, "
                      f"mu={self.mu[:3].round(3)}")

五、Hamiltonian Monte Carlo

基本思想

Hamiltonian Monte Carlo (HMC) 通过引入辅助动量变量构造更高效的采样方案。

物理类比

  • 参数 类似于粒子位置
  • 动量 类似于粒子动量
  • Hamiltonian 类似于总能量

其中势能 ,动能

算法流程

class HamiltonianMC:
    """
    Hamiltonian Monte Carlo
    
    使用Leapfrog积分近似Hamiltonian dynamics
    """
    
    def __init__(self, log_prob, mass=None, step_size=0.1, n_steps=10):
        self.log_prob = log_prob
        self.grad_log_prob = grad(log_prob)  # 需要autograd
        self.step_size = step_size
        self.n_steps = n_steps
        self.d = len(mass) if mass is not None else None
        self.mass = mass if mass is not None else np.ones(self.d)
    
    def leapfrog(self, q, p):
        """
        Leapfrog积分步骤
        
        1. p(t + ε/2) = p(t) - (ε/2) * ∇U(q(t))
        2. q(t + ε) = q(t) + ε * p(t + ε/2) / M
        3. p(t + ε) = p(t + ε/2) - (ε/2) * ∇U(q(t + ε))
        """
        # 半步更新动量
        p = p - 0.5 * self.step_size * self.grad_log_prob(q)
        
        # 完整步更新位置
        q = q + self.step_size * p / self.mass
        
        # 半步更新动量
        p = p - 0.5 * self.step_size * self.grad_log_prob(q)
        
        return q, p
    
    def sample(self, q0, n_samples, burn_in=1000):
        samples = []
        q = q0
        
        for i in range(n_samples + burn_in):
            # 采样初始动量
            p = np.random.randn(self.d) * np.sqrt(self.mass)
            
            # 保存当前位置
            q_old = q.copy()
            
            # Leapfrog积分
            for _ in range(self.n_steps):
                q, p = self.leapfrog(q, p)
            
            # 计算接受率
            log_u = self.log_prob(q) - 0.5 * np.sum(p**2 / self.mass)
            log_q = self.log_prob(q_old) - 0.5 * np.sum(p**2 / self.mass)
            
            # Metropolis接受
            if np.log(np.random.random()) < log_u - log_q:
                pass  # 接受(q已更新)
            else:
                q = q_old  # 拒绝
            
            if i >= burn_in:
                samples.append(q)
        
        return np.array(samples)

HMC vs 简单MH

特性Random Walk MHHMC
提议分布局部(高斯)全局(Hamiltonian轨迹)
接受率通常较低通常较高
效率慢(随机游走)快(利用梯度信息)
维度扩展困难

六、MCMC在贝叶斯推断中的应用

贝叶斯线性回归

class BayesianLinearRegressionMCMC:
    """
    使用MCMC的贝叶斯线性回归
    
    模型:
    y ~ N(Xβ, σ²I)
    β ~ N(0, τ²I)
    σ² ~ Inverse-Gamma(a, b)
    """
    
    def __init__(self, X, y, prior_tau=10, prior_a=0.01, prior_b=0.01):
        self.X = X
        self.y = y
        self.n, self.d = X.shape
        self.prior_tau = prior_tau
        self.prior_a = prior_a
        self.prior_b = prior_b
    
    def sample_beta_given_sigma(self, sigma2):
        """P(β | σ², y) = N(μ_n, Σ_n)"""
        precision = np.eye(self.d) / self.prior_tau**2 + self.X.T @ self.X / sigma2
        cov = np.linalg.inv(precision)
        mean = cov @ (self.X.T @ self.y / sigma2)
        return np.random.multivariate_normal(mean, cov)
    
    def sample_sigma2_given_beta(self, beta):
        """P(σ² | β, y) = Inverse-Gamma(a_n, b_n)"""
        residuals = self.y - self.X @ beta
        a_n = self.prior_a + self.n / 2
        b_n = self.prior_b + 0.5 * np.sum(residuals**2)
        return 1 / np.random.gamma(a_n, 1 / b_n)
    
    def sample(self, n_iterations):
        """Gibbs采样"""
        beta_samples = []
        sigma2_samples = []
        
        beta = np.zeros(self.d)
        sigma2 = 1.0
        
        for _ in range(n_iterations):
            beta = self.sample_beta_given_sigma(sigma2)
            sigma2 = self.sample_sigma2_given_beta(beta)
            
            beta_samples.append(beta)
            sigma2_samples.append(sigma2)
        
        return np.array(beta_samples), np.array(sigma2_samples)

变分推断 vs MCMC

特性变分推断MCMC
结果近似分布真实后验样本
速度快(优化)慢(采样)
方差确定随机
收敛判断简单困难
实现复杂度中等较高
扩展到大数据易(随机梯度)困难

详见 变分推断

七、MCMC的实现库

PyMC

import pymc as pm
import arviz as az
 
# 定义模型
with pm.Model() as model:
    # 先验
    alpha = pm.Normal('alpha', mu=0, sigma=10)
    beta = pm.Normal('beta', mu=0, sigma=10, shape=2)
    sigma = pm.HalfNormal('sigma', sigma=1)
    
    # 似然
    mu = alpha + beta[0] * X[:, 0] + beta[1] * X[:, 1]
    y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y)
    
    # 采样
    trace = pm.sample(2000, tune=1000, return_inferencedata=True)
 
# 分析
az.plot_trace(trace)
az.summary(trace)

NumPyro

import jax.numpy as jnp
from jax import random
import numpyro
import numpyro.distributions as dist
from numpyro.infer import MCMC, NUTS
 
def model(X, y=None):
    alpha = numpyro.sample('alpha', dist.Normal(0, 10))
    beta = numpyro.sample('beta', dist.Normal(0, 10), sample_shape=(2,))
    sigma = numpyro.sample('sigma', dist.HalfNormal(1))
    
    mu = alpha + jnp.dot(X, beta)
    numpyro.sample('y', dist.Normal(mu, sigma), obs=y)
 
# 运行NUTS采样器
mcmc = MCMC(NUTS(model), num_warmup=1000, num_samples=2000)
mcmc.run(random.PRNGKey(0), X, y)
mcmc.print_summary()

八、高级主题

切片采样

def slice_sample(log_pdf, x0, width=1.0, n_samples=1000):
    """切片采样"""
    samples = []
    x = x0
    
    for _ in range(n_samples):
        # 在条件分布上均匀采样
        log_p_x = log_pdf(x)
        log_y = log_p_x + np.log(np.random.random())  # 切片下界
        
        # 构造水平集
        interval_l = x - width * np.random.random()
        interval_r = interval_l + width
        
        # 收缩直到接受
        while True:
            x_new = interval_l + np.random.random() * (interval_r - interval_l)
            if log_pdf(x_new) > log_y:
                x = x_new
                break
            
            # 收缩区间
            if x_new < x:
                interval_l = x_new
            else:
                interval_r = x_new
    
        samples.append(x)
    
    return np.array(samples)

蛇形采样器

处理多模态分布:

def蛇形采样器(target_pdf, x0, n_samples, proposal_width=1.0):
    """蛇形采样器(用于多模态分布)"""
    samples = []
    x = x0
    
    for _ in range(n_samples):
        # 从当前模式随机游走
        x_proposed = x + np.random.randn() * proposal_width
        
        # 计算接受率
        alpha = min(1, target_pdf(x_proposed) / target_pdf(x))
        
        if np.random.random() < alpha:
            x = x_proposed
        
        samples.append(x)
    
    return np.array(samples)

九、收敛诊断与实践建议

收敛诊断检查清单

  1. 链间收敛:多条独立链的Gelman-Rubin统计量
  2. 链内稳定:去除burn-in后的样本统计量稳定
  3. 有效样本量:ESS足够大(通常 > 1000)
  4. 自相关:低自相关,快速衰减
  5. Geweke检验:前后段样本均值无显著差异

实践建议

def run_mcmc_diagnostics(samples, true_mean=None):
    """完整的MCMC诊断"""
    print("=" * 50)
    print("MCMC 诊断报告")
    print("=" * 50)
    
    # 基本统计
    print(f"\n样本数: {len(samples)}")
    print(f"均值: {np.mean(samples):.4f}")
    print(f"标准差: {np.std(samples):.4f}")
    print(f"中位数: {np.median(samples):.4f}")
    
    # 有效样本量
    ess = effective_sample_size(samples)
    print(f"\n有效样本量 (ESS): {ess}")
    print(f"ESS/N: {ess/len(samples):.4f}")
    
    # 自相关
    acf = compute_autocorrelation(samples, max_lag=20)
    print(f"\n前5个滞后自相关: {acf[1:6].round(3)}")
    
    # Geweke检验
    from scipy import stats
    n = len(samples)
    first = samples[:int(n*0.1)]
    last = samples[int(n*0.5):]
    z_score = (np.mean(first) - np.mean(last)) / np.sqrt(
        np.var(first)/len(first) + np.var(last)/len(last))
    print(f"\nGeweke z-score: {z_score:.4f}")
    print(f"p-value: {2 * (1 - stats.norm.cdf(abs(z_score))):.4f}")
    
    # 与真值比较(如果有)
    if true_mean is not None:
        print(f"\n真值: {true_mean:.4f}")
        print(f"偏差: {np.mean(samples) - true_mean:.4f}")
    
    print("\n" + "=" * 50)

Burn-in选择

def plot_trace_and_autocorrelation(samples, true_mean=None):
    """绘制trace图和自相关图"""
    fig, axes = plt.subplots(2, 1, figsize=(12, 8))
    
    # Trace plot
    axes[0].plot(samples, alpha=0.7)
    axes[0].axhline(np.mean(samples), color='r', linestyle='--', label='Mean')
    if true_mean is not None:
        axes[0].axhline(true_mean, color='g', linestyle='-', label='True')
    axes[0].set_xlabel('Iteration')
    axes[0].set_ylabel('Value')
    axes[0].set_title('Trace Plot')
    axes[0].legend()
    
    # Autocorrelation
    acf = compute_autocorrelation(samples, max_lag=50)
    axes[1].bar(range(len(acf)), acf)
    axes[1].axhline(0, color='k')
    axes[1].axhline(1.96/np.sqrt(len(samples)), color='r', linestyle='--', 
                   label='95% CI')
    axes[1].axhline(-1.96/np.sqrt(len(samples)), color='r', linestyle='--')
    axes[1].set_xlabel('Lag')
    axes[1].set_ylabel('Autocorrelation')
    axes[1].set_title('Autocorrelation Function')
    axes[1].legend()
    
    plt.tight_layout()
    plt.show()

参考

相关阅读

Footnotes

  1. Robert, C.P., & Casella, G. (2004). Monte Carlo Statistical Methods. Springer.