层次贝叶斯模型

概述

层次贝叶斯模型(Hierarchical Bayesian Models)是贝叶斯统计中处理多层次、嵌套或分组数据结构的核心框架12。它通过引入超先验层次,自然地实现部分池化(Partial Pooling),在完全个体化和完全聚合之间取得平衡。

核心思想:不同组之间共享信息,同时保持组间差异。


1. 层次结构的动机

1.1 经典问题:学校成绩预测

问题:预测8所学校的教育项目效果。

学校估计效果样本量
A+28n=150
B+8n=60
C-3n=30

极端选项

  • 完全个体化:只相信每个学校的估计(小样本不可靠)
  • 完全聚合:所有学校使用相同估计(忽视组间差异)

1.2 部分池化方案

层次模型提供一个加权平均

其中 是池化系数,取决于组内样本量和组间方差。

1.3 层次结构的三层表示

第一层(数据层):    y_i | θ_j(i) ~ P(y_i | θ_j(i))
第二层(组层):      θ_j ~ N(μ, τ²)
第三层(超参数层):   μ ~ P(μ), τ² ~ P(τ²)

2. 层次模型的数学框架

2.1 三层层次模型

第一层:似然函数

其中 是第 组第 个观测, 是第 组的效应。

第二层:组先验

第三层:超先验

2.2 边缘似然

观测数据的边缘分布:

2.3 后验分布


3. shrinkage估计

3.1 Empirical Bayes估计

经验贝叶斯方法

  1. 首先估计超参数 (通过边缘化或矩估计)
  2. 然后计算每个 的后验

收缩公式

3.2 收缩系数

定义收缩系数

  • 小(样本少):,向 收缩
  • 大(样本多):,信任

3.3 shrinkage的可视化

import numpy as np
import matplotlib.pyplot as plt
 
def shrinkage_visualization():
    """
    展示shrinkage效果
    """
    np.random.seed(42)
    
    # 8所学校的数据
    n = np.array([150, 60, 30, 80, 40, 100, 45, 70])
    y_bar = np.array([28, 8, -3, 15, 5, 20, 10, 12])
    sigma = 15 / np.sqrt(n)  # 标准误差
    
    # 真实效应(模拟)
    true_theta = np.random.normal(10, 8, 8)
    y_obs = y_bar  # 使用观测值
    
    # 超参数估计
    tau_hat = 10  # 假设已知
    
    # 收缩估计
    shrinkage_factor = tau_hat**2 / (tau_hat**2 + sigma**2)
    theta_shrunk = shrinkage_factor * y_obs + (1 - shrinkage_factor) * np.mean(y_obs)
    
    # 绘图
    plt.figure(figsize=(10, 6))
    x = np.arange(8)
    width = 0.35
    
    plt.bar(x - width/2, y_obs, width, label='观测均值', alpha=0.7)
    plt.bar(x + width/2, theta_shrunk, width, label='收缩估计', alpha=0.7)
    plt.axhline(y=np.mean(y_obs), color='k', linestyle='--', label='总体均值')
    
    plt.xlabel('学校编号')
    plt.ylabel('效果估计')
    plt.title('Shrinkage估计效果')
    plt.legend()
    plt.savefig('shrinkage_effect.png', dpi=150)
    plt.close()
    
    return shrinkage_factor

4. 完全贝叶斯推断

4.1 MCMC采样

import numpy as np
from scipy import stats
 
class HierarchicalBayesianModel:
    """层次贝叶斯模型"""
    
    def __init__(self, y, groups, sigma=None):
        """
        参数:
            y: 观测数据 [N]
            groups: 组标签 [N]
            sigma: 已知标准差(可选)
        """
        self.y = y
        self.groups = groups
        self.n_groups = len(np.unique(groups))
        
        if sigma is None:
            # 从数据估计
            self.sigma = np.array([
                np.std(y[groups == g]) / np.sqrt(np.sum(groups == g))
                for g in range(self.n_groups)
            ])
        else:
            self.sigma = sigma
    
    def metropolis_hastings(self, n_samples=10000, burn_in=1000):
        """
        Metropolis-Hastings采样
        """
        # 初始化
        mu = np.mean(self.y)  # 超参数
        tau = np.std(self.y)  # 组间标准差
        theta = np.array([
            np.mean(self.y[self.groups == g])
            for g in range(self.n_groups)
        ])
        
        # 存储
        samples = {
            'mu': [], 'tau': [], 'theta': [[] for _ in range(self.n_groups)]
        }
        
        for i in range(n_samples + burn_in):
            # 更新每个theta_j
            for j in range(self.n_groups):
                # 似然和先验
                post_var = 1 / (1/self.sigma[j]**2 + 1/tau**2)
                post_mean = post_var * (self.y[self.groups == j].mean()/self.sigma[j]**2 + mu/tau**2)
                
                theta[j] = np.random.normal(post_mean, np.sqrt(post_var))
            
            # 更新mu
            mu_var = 1 / (self.n_groups/tau**2 + 1/100**2)  # 先验方差100
            mu_mean = mu_var * (np.sum(theta)/tau**2)
            mu = np.random.normal(mu_mean, np.sqrt(mu_var))
            
            # 更新tau(用MH步)
            tau_prop = max(0.1, tau + np.random.normal(0, 0.5))
            log_alpha = self._log_posterior_tau(tau_prop, theta, mu) - \
                       self._log_posterior_tau(tau, theta, mu)
            
            if np.log(np.random.random()) < log_alpha:
                tau = tau_prop
            
            # 存储(去除burn-in)
            if i >= burn_in:
                samples['mu'].append(mu)
                samples['tau'].append(tau)
                for j in range(self.n_groups):
                    samples['theta'][j].append(theta[j])
        
        return samples
    
    def _log_posterior_tau(self, tau, theta, mu):
        """tau的对数后验(up to常数)"""
        log_prior = np.sum(stats.norm.logpdf(theta, mu, tau))
        log_prior += stats.halfnorm.logpdf(tau, scale=20)
        return log_prior

4.2 Gibbs采样

def gibbs_sampler(y, groups, n_iter=10000, burn_in=1000):
    """
    Gibbs采样器
    
    数据结构: y[ij] ~ N(theta[j], sigma²)
              theta[j] ~ N(mu, tau²)
              mu ~ N(mu0, sigma0²)
              tau² ~ IG(alpha, beta)
    """
    n_groups = len(np.unique(groups))
    
    # 初始值
    theta = np.array([np.mean(y[groups == g]) for g in range(n_groups)])
    mu = np.mean(theta)
    tau_sq = np.var(theta)
    
    # 预计算
    sigma_sq = np.array([np.var(y[groups == g]) for g in range(n_groups)])
    n_j = np.array([np.sum(groups == g) for g in range(n_groups)])
    
    # 存储
    samples = {'theta': [], 'mu': [], 'tau_sq': []}
    
    for _ in range(n_iter + burn_in):
        # 更新theta
        for j in range(n_groups):
            post_var = 1 / (n_j[j]/sigma_sq[j] + 1/tau_sq)
            post_mean = post_var * (n_j[j]*np.mean(y[groups == j])/sigma_sq[j] + mu/tau_sq)
            theta[j] = np.random.normal(post_mean, np.sqrt(post_var))
        
        # 更新mu
        mu_var = tau_sq / n_groups
        mu_mean = np.mean(theta)
        mu = np.random.normal(mu_mean, np.sqrt(mu_var))
        
        # 更新tau_sq
        alpha_post = 0.5 * n_groups + 0.001
        beta_post = 0.5 * np.sum((theta - mu)**2) + 0.001
        tau_sq = 1 / np.random.gamma(alpha_post, 1/beta_post)
        
        if _ >= burn_in:
            samples['theta'].append(theta.copy())
            samples['mu'].append(mu)
            samples['tau_sq'].append(tau_sq)
    
    return samples

5. 典型应用场景

5.1 多中心临床试验

数据结构

  • 第一层:每个医院的患者
  • 第二层:医院效应
  • 第三层:医院间差异的超参数
def clinical_trial_analysis():
    """
    多中心临床试验分析
    
    医院效应模型:
    y_ij = μ + θ_hospital + ε_ij
    
    其中 θ_hospital ~ N(0, τ²)
    """
    # 数据: 每家医院的治疗效果
    hospital_data = {
        'A': {'n': 100, 'mean': 5.2, 'std': 2.1},
        'B': {'n': 80, 'mean': 4.8, 'std': 2.3},
        'C': {'n': 60, 'mean': 3.1, 'std': 1.9},
        # ...
    }
    
    # 层次模型分析
    # ... (实现同上)
    
    return hierarchical_results

5.2 工厂-机器-零件方差分析

嵌套结构

  • :工厂效应
  • :机器效应(嵌套于工厂)
  • :零件误差

5.3 生态学:种群估计

捕获-再捕获模型


6. 经验贝叶斯 vs 完全贝叶斯

6.1 对比

特性经验贝叶斯完全贝叶斯
超参数处理点估计完全积分
计算复杂度较低较高(MCMC)
不确定性量化忽略完全考虑
先验信息
渐近性质可能欠正则一致

6.2 收缩效果的比较

经验贝叶斯的收缩:

完全贝叶斯的后验均值:


7. 层次模型中的计算挑战

7.1 维度灾难

当组数 很大时:

  • 计算量增加
  • 收敛诊断更复杂
  • 需要更多的样本

7.2 收敛诊断

def convergence_diagnosis(samples, threshold=1.2):
    """
    收敛诊断(使用R-hat)
    """
    # 对多个链比较
    n_chains = len(samples['mu'])
    
    # 计算R-hat
    for param in ['mu', 'tau']:
        chain_means = [np.mean(samples[param][c]) for c in range(n_chains)]
        chain_vars = [np.var(samples[param][c]) for c in range(n_chains)]
        
        # 链间方差
        B = np.var(chain_means) * len(samples[param][0])
        
        # 链内方差平均
        W = np.mean(chain_vars)
        
        # R-hat
        var_hat = (len(samples[param][0]) - 1) / len(samples[param][0]) * W + B / len(samples[param][0])
        R_hat = np.sqrt(var_hat / W)
        
        print(f"{param}: R-hat = {R_hat:.3f}", 
              "✓ Converged" if R_hat < threshold else "⚠ Not converged")
    
    # 有效样本量
    for param in ['mu', 'tau']:
        ess = compute_ess(samples[param])
        print(f"{param}: ESS = {ess:.0f}")

7.3 参数化技巧

非中心参数化(Non-centered parameterization):

def non_centered_hierarchical():
    """
    非中心参数化
    
    原始: theta_j ~ N(mu, tau²)
    非中心: theta_j = mu + tau * z_j, z_j ~ N(0, 1)
    
    优点: 减少参数间的相关性,提高采样效率
    """
    pass

8. 实践指南

8.1 何时使用层次模型

场景推荐
分组数据✓ 层次模型
多层次结构✓ 嵌套层次
样本量不均✓ 层次模型
完全独立✗ 普通模型

8.2 先验选择

层次推荐先验理由
超先验 宽正态无信息
超先验 Half-Cauchy/均匀正性约束
组先验 正态自然共轭

8.3 模型检查

def posterior_predictive_check(y, y_rep_samples, bins=30):
    """
    后验预测检查
    """
    # 计算观测统计量
    obs_mean = np.mean(y)
    obs_std = np.std(y)
    
    # 计算复制数据统计量
    rep_means = [np.mean(y_rep) for y_rep in y_rep_samples]
    rep_stds = [np.std(y_rep) for y_rep in y_rep_samples]
    
    # 计算p值
    p_mean = np.mean(np.array(rep_means) > obs_mean)
    p_std = np.mean(np.array(rep_stds) > obs_std)
    
    print(f"Mean p-value: {p_mean:.3f}")
    print(f"Std p-value: {p_std:.3f}")
    
    return p_mean, p_std

9. 层次模型与混合效应模型

9.1 关系

层次贝叶斯模型与线性混合效应模型(LMM)在数学上等价:

混合效应模型层次贝叶斯
固定效应超参数
随机效应
随机效应方差

9.2 频率学派 vs 贝叶斯

频率学派LMM

  • 使用REML或ML估计方差
  • 近似推断
  • 置信区间基于渐近理论

层次贝叶斯

  • 完全后验分布
  • MCMC推断
  • 可信区间自然解释

10. 总结

核心概念

  1. 层次结构:数据→组→超参数的自然分层
  2. 部分池化:在个体化和聚合之间平衡
  3. shrinkage:向组均值收缩,提高小样本估计的稳定性
  4. 超先验:编码组间变异的信息

优势

  • 自然处理分组数据
  • 共享信息,提高估计效率
  • 完整的不确定性量化
  • 灵活的模型扩展

挑战

  • 计算复杂度(通常需要MCMC)
  • 先验敏感性
  • 收敛诊断
  • 模型选择

参考资料


相关主题

Footnotes

  1. Gelman, A., et al. (2013). “Bayesian Data Analysis.” CRC Press.

  2. Box, G. E. P., & Tiao, G. C. (1973). “Bayesian Inference in Statistical Analysis.” Wiley.