概述

期望最大化(Expectation-Maximization, EM)算法是处理含隐变量的概率模型参数估计的经典方法。1本专题深入探讨EM算法的理论保证、高级变体以及与变分推断的联系。


EM算法的收敛性分析

收敛性定理

EM算法的核心性质是单调收敛性:每次迭代都会增加(或至少保持)观测数据的对数似然。

定理:设 是第 次迭代的参数估计,则

其中 是观测数据的边际对数似然。

收敛性证明

定义 Q 函数

引理1:对于任意

引理2:KL散度非负,即

由引理1和引理2可得:

定义 ,则

EM算法的 M步 选择 ,因此:

收敛速度分析

EM算法的收敛速度取决于信息矩阵的比例:

其中:

  • :Fisher信息矩阵(观测信息)
  • :完整数据的Fisher信息矩阵

缺失信息比例(Missing Information Ratio):

EM算法的收敛速度与 成反比。当 接近1时,EM收敛快;当 接近0时,EM收敛慢。

import numpy as np
import matplotlib.pyplot as plt
 
def simulate_em_convergence(true_params, n_samples, noise_scale):
    """
    模拟EM算法的收敛过程
    """
    np.random.seed(42)
    
    # 生成数据
    X = generate_data(true_params, n_samples, noise_scale)
    
    # 初始化
    theta = initialize_params()
    log_likelihoods = [compute_log_likelihood(X, theta)]
    
    for _ in range(max_iterations):
        # E步:计算隐变量的后验
        Q = e_step(X, theta)
        
        # M步:更新参数
        theta = m_step(X, Q)
        
        # 记录对数似然
        log_likelihoods.append(compute_log_likelihood(X, theta))
        
        # 检查收敛
        if abs(log_likelihoods[-1] - log_likelihoods[-2]) < tol:
            break
    
    return log_likelihoods
 
def plot_convergence(log_likelihoods):
    """
    可视化EM算法的收敛过程
    """
    plt.figure(figsize=(10, 6))
    plt.plot(log_likelihoods, 'b-', linewidth=2)
    plt.xlabel('迭代次数')
    plt.ylabel('观测对数似然')
    plt.title('EM算法收敛曲线')
    plt.grid(True, alpha=0.3)
    plt.axhline(y=max(log_likelihoods), color='r', linestyle='--', 
                 label=f'最大似然: {max(log_likelihoods):.2f}')
    plt.legend()
    plt.show()

EM算法的变体

ECM算法(Expectation-Conditional Maximization)

ECM算法将M步分解为多个条件M步,每次只优化一部分参数。

class ECM:
    """
    ECM算法:期望条件最大化
    
    将M步分解为多个更简单的条件最大化步骤
    """
    def __init__(self, model):
        self.model = model
    
    def fit(self, X, n_iterations=100, tol=1e-6):
        """
        ECM算法主循环
        """
        theta = self.model.init_params(X)
        log_liks = []
        
        for iteration in range(n_iterations):
            # E步
            Q = self.e_step(X, theta)
            
            # CM步 1:最大化部分参数
            theta_1 = self.cm_step_1(X, Q, theta)
            
            # CM步 2:最大化另一部分参数
            theta = self.cm_step_2(X, Q, theta_1)
            
            # 记录似然
            log_lik = self.model.log_likelihood(X, theta)
            log_liks.append(log_lik)
            
            # 收敛检查
            if iteration > 0 and abs(log_liks[-1] - log_liks[-2]) < tol:
                break
        
        return theta, log_liks
    
    def e_step(self, X, theta):
        """
        E步:计算隐变量的后验分布
        """
        return self.model.compute_posterior(X, theta)
    
    def cm_step_1(self, X, Q, theta):
        """
        CM步1:条件最大化第一组参数
        """
        # 只更新第一组参数,固定其他参数
        theta_new = theta.copy()
        theta_new['group1'] = self.optimize_group1(X, Q, theta)
        return theta_new
    
    def cm_step_2(self, X, Q, theta):
        """
        CM步2:条件最大化第二组参数
        """
        theta_new = theta.copy()
        theta_new['group2'] = self.optimize_group2(X, Q, theta)
        return theta_new

ECME算法(ECM Extensions)

ECME是ECM的扩展,允许在CM步直接最大化原始似然函数而不是Q函数。

优势

  • 收敛更快
  • 可以处理更复杂的约束
class ECME:
    """
    ECME算法:ECM扩展
    """
    def fit(self, X, n_iterations=100):
        theta = self.init_params(X)
        
        for _ in range(n_iterations):
            Q = self.e_step(X, theta)
            
            # 直接最大化观测似然
            theta = self.maximize_likelihood(X, Q, theta)
        
        return theta
    
    def maximize_likelihood(self, X, Q, theta):
        """
        直接最大化原始似然
        """
        # 使用数值优化方法
        def objective(params):
            return -self.log_likelihood(X, self.update_theta(theta, params))
        
        result = minimize(objective, theta_to_array(theta))
        return array_to_theta(result.x, theta)

SEM算法(Stochastic EM)

SEM算法在E步添加随机采样,增强探索能力。

class SEM:
    """
    SEM算法:随机EM
    
    在E步使用随机采样而非条件期望
    """
    def e_step_stochastic(self, X, theta):
        """
        随机E步:采样隐变量而不是计算期望
        """
        # 计算后验分布参数
        posterior_params = self.compute_posterior_params(X, theta)
        
        # 随机采样
        z_samples = self.sample_from_posterior(posterior_params)
        
        return z_samples
    
    def fit(self, X, n_iterations=100, annealing_schedule=None):
        """
        带退火的SEM算法
        """
        theta = self.init_params(X)
        
        for t in range(n_iterations):
            # 随机E步
            z_samples = self.e_step_stochastic(X, theta)
            
            # M步
            theta = self.m_step(X, z_samples)
            
            # 退火(可选)
            if annealing_schedule is not None:
                temperature = annealing_schedule(t)
                theta = self.apply_temperature(theta, temperature)
        
        return theta

EM算法在高斯混合模型中的应用

高斯混合模型的EM推导

模型

  • 混合权重:
  • 每个分量:

E步:计算每个数据点属于每个分量的后验概率

M步:更新参数

class GaussianMixtureEM:
    """
    高斯混合模型的EM算法实现
    """
    def __init__(self, n_components, max_iter=100, tol=1e-4, reg_covar=1e-6):
        self.n_components = n_components
        self.max_iter = max_iter
        self.tol = tol
        self.reg_covar = reg_covar  # 正则化防止奇异协方差矩阵
    
    def fit(self, X):
        """
        训练高斯混合模型
        """
        n_samples, n_features = X.shape
        
        # 初始化参数
        self._initialize_parameters(X)
        
        log_likelihood_history = []
        
        for iteration in range(self.max_iter):
            # E步:计算 responsibilities
            responsibilities = self._e_step(X)
            
            # 记录对数似然
            log_lik = self._compute_log_likelihood(X, responsibilities)
            log_likelihood_history.append(log_lik)
            
            # M步:更新参数
            self._m_step(X, responsibilities)
            
            # 收敛检查
            if iteration > 0:
                change = abs(log_lik - log_likelihood_history[-2])
                if change < self.tol:
                    break
        
        return self
    
    def _initialize_parameters(self, X):
        """
        使用K-means初始化参数
        """
        from sklearn.cluster import KMeans
        
        n_samples, n_features = X.shape
        
        # K-means初始化
        kmeans = KMeans(n_clusters=self.n_components, random_state=42)
        kmeans.fit(X)
        
        # 初始化均值
        self.means_ = kmeans.cluster_centers_
        
        # 初始化协方差矩阵
        self.covariances_ = np.zeros((self.n_components, n_features, n_features))
        for k in range(self.n_components):
            self.covariances_[k] = np.cov(X.T) / self.n_components
        
        # 初始化混合权重
        self.weights_ = np.full(self.n_components, 1.0 / self.n_components)
    
    def _e_step(self, X):
        """
        E步:计算 responsibilities
        """
        responsibilities = self._compute_responsibilities(X)
        return responsibilities
    
    def _m_step(self, X, responsibilities):
        """
        M步:更新参数
        """
        n_samples = X.shape[0]
        
        # 更新混合权重
        Nk = responsibilities.sum(axis=0)
        self.weights_ = Nk / n_samples
        
        # 更新均值
        for k in range(self.n_components):
            self.means_[k] = (responsibilities[:, k:k+1] * X).sum(axis=0) / Nk[k]
        
        # 更新协方差矩阵
        for k in range(self.n_components):
            diff = X - self.means_[k]
            cov = (responsibilities[:, k:k+1] * diff).T @ diff / Nk[k]
            # 添加正则化
            self.covariances_[k] = cov + self.reg_covar * np.eye(X.shape[1])
    
    def _compute_responsibilities(self, X):
        """
        计算每个样本属于每个分量的后验概率
        """
        from scipy.stats import multivariate_normal
        
        n_samples = X.shape[0]
        responsibilities = np.zeros((n_samples, self.n_components))
        
        for k in range(self.n_components):
            responsibilities[:, k] = self.weights_[k] * \
                multivariate_normal.pdf(X, self.means_[k], self.covariances_[k])
        
        # 归一化
        responsibilities /= responsibilities.sum(axis=1, keepdims=True)
        
        return responsibilities
    
    def predict(self, X):
        """
        预测每个样本的簇标签
        """
        responsibilities = self._compute_responsibilities(X)
        return responsibilities.argmax(axis=1)
    
    def predict_proba(self, X):
        """
        返回每个样本属于每个簇的概率
        """
        return self._compute_responsibilities(X)

EM算法与变分推断的联系

EM作为变分推断的特殊情况

EM算法可以看作是变分推断的一个特例,其中变分分布被限制为点估计

这意味着变分分布与真实后验分布之间的KL散度为0或无穷大。

变分EM

变分EM将变分推断的思想引入EM算法,允许在E步使用近似后验分布:

class VariationalEM:
    """
    变分EM算法
    
    在E步使用变分分布近似后验
    """
    def __init__(self, model, variational_family='mean_field'):
        self.model = model
        self.variational_family = variational_family
    
    def fit(self, X, n_iterations=100):
        theta = self.model.init_params(X)
        
        for _ in range(n_iterations):
            # 变分E步:优化变分分布
            q_z = self.variational_e_step(X, theta)
            
            # M步:优化参数
            theta = self.m_step(X, q_z)
        
        return theta, q_z
    
    def variational_e_step(self, X, theta):
        """
        变分E步:优化变分分布
        
        使用坐标上升变分推断
        """
        q = self.init_variational_dist()
        
        for vi_iter in range(max_vi_iterations):
            q_old = q.copy()
            
            # 迭代更新每个变分因子
            for i in range(self.model.n_latent):
                q[i] = self.update_variational_factor(X, q, i, theta)
            
            # 检查收敛
            if self.variational_dist_change(q, q_old) < vi_tol:
                break
        
        return q

比较分析

方面EM算法变分EM
E步计算后验期望优化变分分布
M步点估计参数可能也是分布
隐变量积分近似直接优化
适用场景共轭模型非共轭模型
计算复杂度中等

EM算法的实践技巧

初始化策略

def smart_initialization(X, n_components, method='kmeans'):
    """
    智能初始化策略
    """
    if method == 'kmeans':
        # K-means初始化
        from sklearn.cluster import KMeans
        kmeans = KMeans(n_clusters=n_components, n_init=10)
        labels = kmeans.fit_predict(X)
        
        means = kmeans.cluster_centers_
        covariances = []
        weights = []
        
        for k in range(n_components):
            cluster_data = X[labels == k]
            if len(cluster_data) > 1:
                covariances.append(np.cov(cluster_data.T))
                weights.append(len(cluster_data) / len(X))
            else:
                covariances.append(np.cov(X.T))
                weights.append(1.0 / n_components)
        
        return {'means': np.array(means),
                'covariances': np.array(covariances),
                'weights': np.array(weights)}
    
    elif method == 'random':
        # 随机初始化
        indices = np.random.choice(len(X), n_components, replace=False)
        means = X[indices]
        covariances = np.array([np.cov(X.T)] * n_components)
        weights = np.ones(n_components) / n_components
        
        return {'means': means,
                'covariances': covariances,
                'weights': weights}
    
    elif method == 'spectral':
        # 谱初始化
        from sklearn.cluster import SpectralClustering
        labels = SpectralClustering(n_clusters=n_components).fit_predict(X)
        # ... 类似 kmeans 的处理

处理奇异性

当某个分量的协方差矩阵趋近于零时,似然函数会趋向无穷大,导致EM算法不稳定。

解决方案

  1. 正则化:添加小的正定矩阵
  2. 约束:限制协方差矩阵的最小特征值
  3. 重新初始化:检测到奇异性时重新初始化该分量
def safe_covariance_update(X, responsibilities, k, min_eigenvalue=1e-6):
    """
    安全协方差更新:防止奇异性
    """
    Nk = responsibilities.sum()
    mean = (responsibilities[:, k:k+1] * X).sum(axis=0) / Nk
    
    diff = X - mean
    cov = (responsibilities[:, k:k+1] * diff).T @ diff / Nk
    
    # 特征值分解
    eigenvalues, eigenvectors = np.linalg.eigh(cov)
    
    # 限制最小特征值
    eigenvalues = np.maximum(eigenvalues, min_eigenvalue)
    
    # 重建协方差矩阵
    cov = eigenvectors @ np.diag(eigenvalues) @ eigenvectors.T
    
    return cov

收敛诊断

def diagnose_convergence(log_likelihoods, threshold=1e-4):
    """
    收敛诊断
    """
    changes = np.diff(log_likelihoods)
    
    # 检查单调性
    is_monotonic = np.all(changes >= -1e-10)
    
    # 检查收敛
    final_change = abs(changes[-1])
    is_converged = final_change < threshold
    
    # 计算收敛速率
    if len(changes) > 1:
        relative_changes = np.abs(changes[1:]) / (np.abs(log_likelihoods[:-1]) + 1e-10)
        avg_convergence_rate = np.mean(relative_changes[-10:])
    else:
        avg_convergence_rate = np.inf
    
    return {
        'is_monotonic': is_monotonic,
        'is_converged': is_converged,
        'final_change': final_change,
        'convergence_rate': avg_convergence_rate
    }

应用示例:混合专家模型

EM算法可以用于训练混合专家(Mixture of Experts)模型:

class MixtureOfExperts:
    """
    混合专家模型的EM训练
    """
    def __init__(self, n_experts, expert_model):
        self.n_experts = n_experts
        self.expert_model = expert_model
    
    def fit(self, X, y, n_iterations=100):
        """
        使用EM算法训练MoE模型
        """
        n_samples = len(X)
        
        # 初始化
        self.gating_weights = np.random.randn(self.n_experts, X.shape[1])
        self.gating_bias = np.zeros(self.n_experts)
        
        for iteration in range(n_iterations):
            # E步:计算每个专家的激活权重
            logits = X @ self.gating_weights.T + self.gating_bias
            probs = softmax(logits, axis=1)  # gating probabilities
            
            # M步:更新每个专家
            for k in range(self.n_experts):
                # 加权数据
                weights = probs[:, k]
                total_weight = weights.sum()
                
                # 训练专家 k
                self.experts[k] = train_weighted_regressor(
                    X, y, sample_weight=weights
                )
                
                # 更新gating网络
                self.gating_weights[k] = compute_gating_weight(
                    X, y, self.experts[k], probs[:, k]
                )
        
        return self
    
    def predict(self, X):
        """
        预测:加权集成
        """
        # Gating
        logits = X @ self.gating_weights.T + self.gating_bias
        probs = softmax(logits, axis=1)
        
        # 加权预测
        predictions = np.zeros(len(X))
        for k in range(self.n_experts):
            predictions += probs[:, k] * self.experts[k].predict(X)
        
        return predictions

参考


相关链接

Footnotes

  1. Dempster, A. P., Laird, N. M., & Rubin, D. B. (1977). Maximum Likelihood from Incomplete Data via the EM Algorithm. Journal of the Royal Statistical Society.