MCMC方法
马尔可夫链蒙特卡洛(Markov Chain Monte Carlo, MCMC)是一类通过构造马尔可夫链来进行概率采样的强大方法。在贝叶斯推断中,MCMC用于从复杂后验分布中采样,是统计推断的核心工具。1
一、背景与动机
为什么需要MCMC
在贝叶斯推断中,我们关心后验分布:
问题:
- 归一化常数 通常无法解析计算
- 当参数维度高、分布复杂时,直接采样困难
MCMC的核心思想:构造一个马尔可夫链,使其平稳分布恰好是我们想要采样的目标分布。
蒙特卡洛基础回顾
蒙特卡洛方法通过随机采样估计期望:
简单采样:
- 逆变换采样
- 拒绝采样
- 重要性采样
详见 信息论。
二、马尔可夫链基础
马尔可夫链定义
离散时间马尔可夫链由状态空间 和转移核 描述:
马尔可夫性:未来只依赖于当前状态,与历史无关。
平稳分布
分布 是马尔可夫链的平稳分布,若满足:
或矩阵形式:
细致平稳条件
细致平稳条件(Detailed Balance)是证明平稳分布的充分条件:
定理:若转移核满足细致平稳条件,则 是平稳分布。
随机游走
最简单的马尔可夫链:对称随机游走
对称性:,所以任意分布都是其平稳分布。
三、Metropolis-Hastings算法
算法原理
Metropolis-Hastings (MH) 算法通过接受-拒绝机制构造满足细致平稳条件的链。
核心思想:
- 从提议分布 采样候选状态
- 以概率 接受, 拒绝
- 接受率确保细致平稳条件
接受率推导:
目标:
令 ,代入:
解得接受率:
算法流程
def metropolis_hastings(p_log_pdf, q_sample, q_log_pdf, x0, n_samples, burn_in=1000):
"""
Metropolis-Hastings采样算法
Args:
p_log_pdf: 目标分布的对数密度(无需归一化)
q_sample: 提议分布的采样函数
q_log_pdf: 提议分布的对数密度
x0: 初始状态
n_samples: 样本数量
burn_in: burn-in期(丢弃的样本)
Returns:
samples: 采样序列
"""
samples = []
x = x0
for i in range(n_samples + burn_in):
# 1. 从提议分布采样候选状态
x_proposed = q_sample(x)
# 2. 计算接受率
log_accept_prob = (
p_log_pdf(x_proposed) + q_log_pdf(x, x_proposed) -
p_log_pdf(x) - q_log_pdf(x_proposed, x)
)
log_accept_prob = min(0, log_accept_prob) # 截断
# 3. 接受或拒绝
if np.log(np.random.random()) < log_accept_prob:
x = x_proposed
# 4. 保存样本(跳过burn-in)
if i >= burn_in:
samples.append(x)
return np.array(samples)提议分布选择
对称提议:Metropolis算法
若 (对称),接受率简化为:
class RandomWalkMetropolis:
"""随机游走Metropolis算法"""
def __init__(self, target_log_pdf, proposal_std=1.0):
self.target_log_pdf = target_log_pdf
self.proposal_std = proposal_std
def sample(self, x0, n_samples, burn_in=1000):
samples = []
x = x0
for _ in range(n_samples + burn_in):
# 提议分布:各向同性高斯
x_proposed = x + np.random.randn(len(x)) * self.proposal_std
# 接受率
log_alpha = self.target_log_pdf(x_proposed) - self.target_log_pdf(x)
log_alpha = min(0, log_alpha)
if np.log(np.random.random()) < log_alpha:
x = x_proposed
samples.append(x)
return np.array(samples[burn_in:])非对称提议:通用MH
class AdaptiveMH:
"""自适应提议的MH算法"""
def __init__(self, target_log_pdf, proposal_fns, proposal_probs):
"""
Args:
proposal_fns: 提议分布采样函数列表
proposal_probs: 各提议分布的选择概率
"""
self.target_log_pdf = target_log_pdf
self.proposal_fns = proposal_fns
self.proposal_probs = proposal_probs
def sample(self, x0, n_samples, burn_in=1000):
samples = []
x = x0
for _ in range(n_samples + burn_in):
# 选择提议分布
k = np.random.choice(len(self.proposal_fns), p=self.proposal_probs)
# 采样
x_proposed = self.proposal_fns[k](x)
# 计算接受率(需要提议分布的对数密度)
log_alpha = (
self.target_log_pdf(x_proposed) +
self.proposal_fns[k].log_pdf(x, x_proposed) -
self.target_log_pdf(x) -
self.proposal_fns[k].log_pdf(x_proposed, x)
)
log_alpha = min(0, log_alpha)
if np.log(np.random.random()) < log_alpha:
x = x_proposed
samples.append(x)
return np.array(samples[burn_in:])收敛诊断
自相关与有效样本量
def compute_autocorrelation(samples, max_lag=50):
"""计算自相关函数"""
n = len(samples)
mean = np.mean(samples)
var = np.var(samples)
acf = []
for lag in range(max_lag):
if lag == 0:
acf.append(1.0)
else:
cov = np.mean((samples[:-lag] - mean) * (samples[lag:] - mean))
acf.append(cov / var)
return np.array(acf)
def effective_sample_size(samples):
"""
计算有效样本量 (ESS)
ESS = N / (1 + 2 * sum(ACF))
"""
n = len(samples)
acf = compute_autocorrelation(samples, max_lag=n//2)
# 找到ACF首次变负的位置
try:
cutoff = np.where(acf < 0)[0][0]
except:
cutoff = len(acf)
rho_sum = np.sum(acf[1:cutoff])
ess = n / (1 + 2 * rho_sum)
return int(ess)Gelman-Rubin诊断
def gelman_rubin_diagnostic(chains, eps=1e-6):
"""
Gelman-Rubin R 统计量
R ≈ 1 表示收敛
Args:
chains: list of numpy arrays, 每个链的样本
"""
m = len(chains) # 链数量
n = len(chains[0]) # 每链样本数
# 计算链间方差
chain_means = np.array([np.mean(c) for c in chains])
B = n * np.var(chain_means, ddof=1)
# 计算链内方差
W = np.mean([np.var(c, ddof=1) for c in chains])
# 合并方差估计
var_hat = (1 - 1/n) * W + (1/m) * B
# R 统计量
R = np.sqrt(var_hat / W)
return R四、Gibbs采样
算法原理
Gibbs采样是MH算法的特例,适用于条件分布容易采样的情形。
核心思想:在高维空间中,依次对每个维度条件采样:
为什么Gibbs采样有效
令 ,其中 表示除 外的所有变量。
条件分布作为提议:
接受率:
因为 (都是条件分布),所以接受率恒为1。
算法实现
class GibbsSampler:
"""通用Gibbs采样器"""
def __init__(self, conditional_samplers, initial_state):
"""
Args:
conditional_samplers: dict, {变量名: 条件采样函数}
initial_state: dict, 初始状态
"""
self.cond_samplers = conditional_samplers
self.state = initial_state.copy()
def sample(self, n_samples, variables=None):
"""
Gibbs采样
Args:
n_samples: 采样数量
variables: 要更新的变量列表(None表示全部)
Returns:
samples: {变量名: 样本数组}
"""
if variables is None:
variables = list(self.cond_samplers.keys())
samples = {var: [] for var in variables}
for _ in range(n_samples):
for var in variables:
# 从条件分布采样
new_value = self.cond_samplers[var](self.state, var)
self.state[var] = new_value
samples[var].append(new_value)
return {var: np.array(vals) for var, vals in samples.items()}二维高斯示例
def gaussian_gibbs_sampler(mean, cov, n_samples, burn_in=1000):
"""
二维高斯分布的Gibbs采样
目标分布:N(μ, Σ),其中
Σ = [[σ₁², ρσ₁σ₂],
[ρσ₁σ₂, σ₂²]]
"""
mu1, mu2 = mean
s1, s2 = np.sqrt(cov[0, 0]), np.sqrt(cov[1, 1])
rho = cov[0, 1] / (s1 * s2)
samples_x = []
samples_y = []
x, y = 0.0, 0.0
for _ in range(n_samples + burn_in):
# P(x | y) = N(μ₁ + ρ(σ₁/σ₂)(y - μ₂), σ₁²(1-ρ²))
var_x_given_y = s1**2 * (1 - rho**2)
mean_x_given_y = mu1 + rho * (s1 / s2) * (y - mu2)
x = np.random.normal(mean_x_given_y, np.sqrt(var_x_given_y))
# P(y | x) = N(μ₂ + ρ(σ₂/σ₁)(x - μ₁), σ₂²(1-ρ²))
var_y_given_x = s2**2 * (1 - rho**2)
mean_y_given_x = mu2 + rho * (s2 / s1) * (x - mu1)
y = np.random.normal(mean_y_given_x, np.sqrt(var_y_given_x))
if _ >= burn_in:
samples_x.append(x)
samples_y.append(y)
return np.array(samples_x), np.array(samples_y)混合模型示例
class MixtureModelGibbs:
"""
高斯混合模型的Gibbs采样
模型:
- z_i ~ Cat(π): 隐变量
- x_i | z_i=k ~ N(μ_k, σ_k²): 观测
"""
def __init__(self, X, K, alpha=1.0, prior_mu=0, prior_sigma=1):
self.X = X
self.n, self.d = X.shape
self.K = K
self.alpha = alpha
# 先验参数
self.prior_mu = prior_mu
self.prior_sigma = prior_sigma
# 初始化
self.z = np.random.randint(0, K, self.n) # 隐变量
self.mu = np.random.randn(K, self.d) * prior_sigma + prior_mu
self.sigma = np.ones(K) * prior_sigma
self.pi = np.ones(K) / K # 混合系数
def sample_pi(self):
"""采样混合系数 π"""
counts = np.bincount(self.z, minlength=self.K)
return np.random.dirichlet(counts + self.alpha)
def sample_mu(self):
"""采样均值 μ"""
for k in range(self.K):
n_k = np.sum(self.z == k)
if n_k == 0:
self.mu[k] = np.random.randn(self.d) * self.prior_sigma + self.prior_mu
else:
x_k = self.X[self.z == k]
posterior_var = 1 / (n_k / self.sigma[k]**2 + 1 / self.prior_sigma**2)
posterior_mean = posterior_var * (n_k * x_k.mean(axis=0) / self.sigma[k]**2 +
self.prior_mu / self.prior_sigma**2)
self.mu[k] = np.random.randn(self.d) * np.sqrt(posterior_var) + posterior_mean
def sample_z(self):
"""采样隐变量 z"""
for i in range(self.n):
# P(z_i = k | x_i, π, μ, σ) ∝ π_k * N(x_i | μ_k, σ_k²)
log_probs = np.log(self.pi)
for k in range(self.K):
log_probs[k] += -0.5 * np.sum((self.X[i] - self.mu[k])**2) / self.sigma[k]**2
log_probs -= np.max(log_probs) # 数值稳定
probs = np.exp(log_probs)
probs /= probs.sum()
self.z[i] = np.random.choice(self.K, p=probs)
def sample(self, n_iterations):
"""运行Gibbs采样"""
for _ in range(n_iterations):
self.sample_pi()
self.sample_mu()
self.sample_z()
if _ % 100 == 0:
print(f"Iteration {_}: "
f"pi={self.pi[:3].round(3)}, "
f"mu={self.mu[:3].round(3)}")五、Hamiltonian Monte Carlo
基本思想
Hamiltonian Monte Carlo (HMC) 通过引入辅助动量变量构造更高效的采样方案。
物理类比:
- 参数 类似于粒子位置
- 动量 类似于粒子动量
- Hamiltonian 类似于总能量
其中势能 ,动能 。
算法流程
class HamiltonianMC:
"""
Hamiltonian Monte Carlo
使用Leapfrog积分近似Hamiltonian dynamics
"""
def __init__(self, log_prob, mass=None, step_size=0.1, n_steps=10):
self.log_prob = log_prob
self.grad_log_prob = grad(log_prob) # 需要autograd
self.step_size = step_size
self.n_steps = n_steps
self.d = len(mass) if mass is not None else None
self.mass = mass if mass is not None else np.ones(self.d)
def leapfrog(self, q, p):
"""
Leapfrog积分步骤
1. p(t + ε/2) = p(t) - (ε/2) * ∇U(q(t))
2. q(t + ε) = q(t) + ε * p(t + ε/2) / M
3. p(t + ε) = p(t + ε/2) - (ε/2) * ∇U(q(t + ε))
"""
# 半步更新动量
p = p - 0.5 * self.step_size * self.grad_log_prob(q)
# 完整步更新位置
q = q + self.step_size * p / self.mass
# 半步更新动量
p = p - 0.5 * self.step_size * self.grad_log_prob(q)
return q, p
def sample(self, q0, n_samples, burn_in=1000):
samples = []
q = q0
for i in range(n_samples + burn_in):
# 采样初始动量
p = np.random.randn(self.d) * np.sqrt(self.mass)
# 保存当前位置
q_old = q.copy()
# Leapfrog积分
for _ in range(self.n_steps):
q, p = self.leapfrog(q, p)
# 计算接受率
log_u = self.log_prob(q) - 0.5 * np.sum(p**2 / self.mass)
log_q = self.log_prob(q_old) - 0.5 * np.sum(p**2 / self.mass)
# Metropolis接受
if np.log(np.random.random()) < log_u - log_q:
pass # 接受(q已更新)
else:
q = q_old # 拒绝
if i >= burn_in:
samples.append(q)
return np.array(samples)HMC vs 简单MH
| 特性 | Random Walk MH | HMC |
|---|---|---|
| 提议分布 | 局部(高斯) | 全局(Hamiltonian轨迹) |
| 接受率 | 通常较低 | 通常较高 |
| 效率 | 慢(随机游走) | 快(利用梯度信息) |
| 维度扩展 | 困难 | 好 |
六、MCMC在贝叶斯推断中的应用
贝叶斯线性回归
class BayesianLinearRegressionMCMC:
"""
使用MCMC的贝叶斯线性回归
模型:
y ~ N(Xβ, σ²I)
β ~ N(0, τ²I)
σ² ~ Inverse-Gamma(a, b)
"""
def __init__(self, X, y, prior_tau=10, prior_a=0.01, prior_b=0.01):
self.X = X
self.y = y
self.n, self.d = X.shape
self.prior_tau = prior_tau
self.prior_a = prior_a
self.prior_b = prior_b
def sample_beta_given_sigma(self, sigma2):
"""P(β | σ², y) = N(μ_n, Σ_n)"""
precision = np.eye(self.d) / self.prior_tau**2 + self.X.T @ self.X / sigma2
cov = np.linalg.inv(precision)
mean = cov @ (self.X.T @ self.y / sigma2)
return np.random.multivariate_normal(mean, cov)
def sample_sigma2_given_beta(self, beta):
"""P(σ² | β, y) = Inverse-Gamma(a_n, b_n)"""
residuals = self.y - self.X @ beta
a_n = self.prior_a + self.n / 2
b_n = self.prior_b + 0.5 * np.sum(residuals**2)
return 1 / np.random.gamma(a_n, 1 / b_n)
def sample(self, n_iterations):
"""Gibbs采样"""
beta_samples = []
sigma2_samples = []
beta = np.zeros(self.d)
sigma2 = 1.0
for _ in range(n_iterations):
beta = self.sample_beta_given_sigma(sigma2)
sigma2 = self.sample_sigma2_given_beta(beta)
beta_samples.append(beta)
sigma2_samples.append(sigma2)
return np.array(beta_samples), np.array(sigma2_samples)变分推断 vs MCMC
| 特性 | 变分推断 | MCMC |
|---|---|---|
| 结果 | 近似分布 | 真实后验样本 |
| 速度 | 快(优化) | 慢(采样) |
| 方差 | 确定 | 随机 |
| 收敛判断 | 简单 | 困难 |
| 实现复杂度 | 中等 | 较高 |
| 扩展到大数据 | 易(随机梯度) | 困难 |
详见 变分推断。
七、MCMC的实现库
PyMC
import pymc as pm
import arviz as az
# 定义模型
with pm.Model() as model:
# 先验
alpha = pm.Normal('alpha', mu=0, sigma=10)
beta = pm.Normal('beta', mu=0, sigma=10, shape=2)
sigma = pm.HalfNormal('sigma', sigma=1)
# 似然
mu = alpha + beta[0] * X[:, 0] + beta[1] * X[:, 1]
y_obs = pm.Normal('y_obs', mu=mu, sigma=sigma, observed=y)
# 采样
trace = pm.sample(2000, tune=1000, return_inferencedata=True)
# 分析
az.plot_trace(trace)
az.summary(trace)NumPyro
import jax.numpy as jnp
from jax import random
import numpyro
import numpyro.distributions as dist
from numpyro.infer import MCMC, NUTS
def model(X, y=None):
alpha = numpyro.sample('alpha', dist.Normal(0, 10))
beta = numpyro.sample('beta', dist.Normal(0, 10), sample_shape=(2,))
sigma = numpyro.sample('sigma', dist.HalfNormal(1))
mu = alpha + jnp.dot(X, beta)
numpyro.sample('y', dist.Normal(mu, sigma), obs=y)
# 运行NUTS采样器
mcmc = MCMC(NUTS(model), num_warmup=1000, num_samples=2000)
mcmc.run(random.PRNGKey(0), X, y)
mcmc.print_summary()八、高级主题
切片采样
def slice_sample(log_pdf, x0, width=1.0, n_samples=1000):
"""切片采样"""
samples = []
x = x0
for _ in range(n_samples):
# 在条件分布上均匀采样
log_p_x = log_pdf(x)
log_y = log_p_x + np.log(np.random.random()) # 切片下界
# 构造水平集
interval_l = x - width * np.random.random()
interval_r = interval_l + width
# 收缩直到接受
while True:
x_new = interval_l + np.random.random() * (interval_r - interval_l)
if log_pdf(x_new) > log_y:
x = x_new
break
# 收缩区间
if x_new < x:
interval_l = x_new
else:
interval_r = x_new
samples.append(x)
return np.array(samples)蛇形采样器
处理多模态分布:
def蛇形采样器(target_pdf, x0, n_samples, proposal_width=1.0):
"""蛇形采样器(用于多模态分布)"""
samples = []
x = x0
for _ in range(n_samples):
# 从当前模式随机游走
x_proposed = x + np.random.randn() * proposal_width
# 计算接受率
alpha = min(1, target_pdf(x_proposed) / target_pdf(x))
if np.random.random() < alpha:
x = x_proposed
samples.append(x)
return np.array(samples)九、收敛诊断与实践建议
收敛诊断检查清单
- 链间收敛:多条独立链的Gelman-Rubin统计量
- 链内稳定:去除burn-in后的样本统计量稳定
- 有效样本量:ESS足够大(通常 > 1000)
- 自相关:低自相关,快速衰减
- Geweke检验:前后段样本均值无显著差异
实践建议
def run_mcmc_diagnostics(samples, true_mean=None):
"""完整的MCMC诊断"""
print("=" * 50)
print("MCMC 诊断报告")
print("=" * 50)
# 基本统计
print(f"\n样本数: {len(samples)}")
print(f"均值: {np.mean(samples):.4f}")
print(f"标准差: {np.std(samples):.4f}")
print(f"中位数: {np.median(samples):.4f}")
# 有效样本量
ess = effective_sample_size(samples)
print(f"\n有效样本量 (ESS): {ess}")
print(f"ESS/N: {ess/len(samples):.4f}")
# 自相关
acf = compute_autocorrelation(samples, max_lag=20)
print(f"\n前5个滞后自相关: {acf[1:6].round(3)}")
# Geweke检验
from scipy import stats
n = len(samples)
first = samples[:int(n*0.1)]
last = samples[int(n*0.5):]
z_score = (np.mean(first) - np.mean(last)) / np.sqrt(
np.var(first)/len(first) + np.var(last)/len(last))
print(f"\nGeweke z-score: {z_score:.4f}")
print(f"p-value: {2 * (1 - stats.norm.cdf(abs(z_score))):.4f}")
# 与真值比较(如果有)
if true_mean is not None:
print(f"\n真值: {true_mean:.4f}")
print(f"偏差: {np.mean(samples) - true_mean:.4f}")
print("\n" + "=" * 50)Burn-in选择
def plot_trace_and_autocorrelation(samples, true_mean=None):
"""绘制trace图和自相关图"""
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# Trace plot
axes[0].plot(samples, alpha=0.7)
axes[0].axhline(np.mean(samples), color='r', linestyle='--', label='Mean')
if true_mean is not None:
axes[0].axhline(true_mean, color='g', linestyle='-', label='True')
axes[0].set_xlabel('Iteration')
axes[0].set_ylabel('Value')
axes[0].set_title('Trace Plot')
axes[0].legend()
# Autocorrelation
acf = compute_autocorrelation(samples, max_lag=50)
axes[1].bar(range(len(acf)), acf)
axes[1].axhline(0, color='k')
axes[1].axhline(1.96/np.sqrt(len(samples)), color='r', linestyle='--',
label='95% CI')
axes[1].axhline(-1.96/np.sqrt(len(samples)), color='r', linestyle='--')
axes[1].set_xlabel('Lag')
axes[1].set_ylabel('Autocorrelation')
axes[1].set_title('Autocorrelation Function')
axes[1].legend()
plt.tight_layout()
plt.show()参考
相关阅读
Footnotes
-
Robert, C.P., & Casella, G. (2004). Monte Carlo Statistical Methods. Springer. ↩