EM算法(期望最大化算法)

期望最大化(Expectation-Maximization, EM)算法是统计学中最重要的参数估计方法之一,专门用于处理含隐变量的概率模型的最大似然估计问题。1

1. 问题引入

1.1 为什么要用EM算法?

考虑一个经典的硬币抛掷实验:有 枚不同的硬币,每枚硬币正面朝上的概率不同。观察数据是抛掷结果的序列,但我们不知道每次使用的是哪枚硬币。

设:

  • 观测数据:
  • 隐变量:(每条数据由哪个硬币产生)
  • 参数:(硬币正面概率)

直接优化似然函数的问题

对数似然函数为:

,涉及对隐变量的求和/积分,导致:

  1. 求和出现在对数内部,难以直接求导
  2. 的取值很多时,计算不可行

1.2 EM算法的核心思想

EM算法通过迭代地求解两个步骤来回避上述问题:

步骤名称操作
E步期望步给定当前参数估计,计算隐变量的后验分布
M步最大化步最大化完整数据的对数似然的期望

关键洞察:如果我们知道隐变量 ,则完全数据的对数似然 可以直接优化。


2. EM算法的推导

2.1 辅助函数(Q函数)

设当前参数为 ,定义辅助函数(也称Q函数):

Q函数的含义:在当前观测数据和参数估计下,完整数据对数似然的期望。

2.2 目标不等式

EM算法的正确性基于以下不等式:

其中 的任意分布, 是观测对数似然。

2.3 EM算法的推导

步骤1:固定 ,选择 最小化

最优解为 ,此时

步骤2:选择 最大化

由于第一步固定了 ,第二步只需最大化:

第二项是常数,所以只需最大化第一项,即

2.4 EM算法流程

def em_algorithm(X, model_init, max_iter=100, tol=1e-6):
    """
    EM算法通用框架
    
    Args:
        X: 观测数据
        model_init: 模型初始参数
        max_iter: 最大迭代次数
        tol: 收敛阈值
    
    Returns:
        theta: 估计的参数
        log_likelihoods: 对数似然轨迹
    """
    theta = model_init
    log_likelihoods = []
    
    for iteration in range(max_iter):
        # E步:计算隐变量的后验分布
        gamma = e_step(X, theta)  # posterior p(Z|X, theta)
        
        # 计算当前对数似然(用于收敛判断)
        ll = compute_log_likelihood(X, theta)
        log_likelihoods.append(ll)
        
        # M步:最大化Q函数
        theta_new = m_step(X, gamma, theta)
        
        # 检查收敛
        if abs(ll - log_likelihoods[-2]) < tol:
            break
        
        theta = theta_new
    
    return theta, log_likelihoods
 
 
def e_step(X, theta):
    """E步:计算隐变量的后验分布"""
    # p(Z|X, theta) ∝ p(X, Z|theta)
    responsibilities = compute_posterior(X, theta)
    return responsibilities
 
 
def m_step(X, gamma, theta):
    """M步:最大化Q函数更新参数"""
    # 求解 ∑_Z gamma(Z) * log p(X, Z|theta) 的最大值
    new_theta = maximize_q_function(X, gamma, theta)
    return new_theta

3. EM算法的收敛性

3.1 单调性

定理:EM算法每次迭代都会增加观测数据的对数似然,即:

证明

定义辅助函数:

由于 ,有

EM算法的两步保证:

  1. E步:选择 使 最大化且等于

  2. M步:选择 最大化 ,所以:

3.2 收敛到局部最优

EM算法收敛到对数似然函数的局部最大值(或鞍点)。不能保证全局最优,通常需要:

  • 多次随机初始化
  • 使用启发式方法(如K-Means初始化)

4. EM算法的变体

4.1 广义EM(GEM)

当M步没有解析解时,使用坐标下降最大化Q函数:

def gem_algorithm(X, theta_init, max_iter=100):
    """广义EM:不要求M步有解析解"""
    theta = theta_init
    
    for _ in range(max_iter):
        # E步(不变)
        gamma = e_step(X, theta)
        
        # M步:使用数值优化(如梯度上升、牛顿法)
        theta = m_step_numerical(X, gamma, theta)
    
    return theta

4.2 ECM算法

Expectation-Conditional-Maximization,将M步分解为多个条件最大化步骤:

def ecm_algorithm(X, theta_init, max_iter=100):
    """ECM:M步分解为多个CM步"""
    theta = theta_init
    
    for _ in range(max_iter):
        gamma = e_step(X, theta)
        
        # 条件最大化步1
        theta_1 = conditional_maximize_1(X, gamma, theta)
        # 条件最大化步2
        theta_2 = conditional_maximize_2(X, gamma, theta_1)
        
        theta = theta_2
    
    return theta

4.3 ECME算法

ECM的增强版本,使用一次EM迭代来提高收敛速度。


5. EM与坐标下降

EM算法可以看作是坐标下降在隐变量空间中的特殊形式。

5.1 坐标下降回顾

坐标下降在参数空间 上迭代优化:

5.2 EM作为坐标下降

EM算法优化的是对数似然的下界 ,等价于在以下两个坐标上交替优化:

坐标对应EM步骤
E步:
M步:

6. 高斯混合模型中的EM

EM算法最经典的应用之一是**高斯混合模型(GMM)**的参数估计。详见 高斯混合模型

6.1 模型定义

GMM的概率密度为:

其中 是混合系数, 是高斯分布。

6.2 E步

计算每个数据点属于各成分的后验概率(责任):

6.3 M步

更新参数:

6.4 完整实现

import numpy as np
from scipy.stats import multivariate_normal
 
class GaussianMixtureModel:
    """高斯混合模型的EM算法实现"""
    
    def __init__(self, n_components, max_iter=100, tol=1e-4):
        self.n_components = n_components
        self.max_iter = max_iter
        self.tol = tol
        self.weights_ = None  # 混合系数 π_k
        self.means_ = None    # 均值 μ_k
        self.covariances_ = None  # 协方差 Σ_k
        self.converged_ = False
    
    def _initialize(self, X):
        """初始化参数(使用K-Means++)"""
        n_samples, n_features = X.shape
        
        # 使用K-Means中心初始化均值
        from sklearn.cluster import KMeans
        kmeans = KMeans(n_clusters=self.n_components, n_init=1)
        kmeans.fit(X)
        self.means_ = kmeans.cluster_centers_
        
        # 初始化权重为均匀分布
        self.weights_ = np.ones(self.n_components) / self.n_components
        
        # 初始化协方差为对角矩阵
        self.covariances_ = np.array([
            np.eye(n_features) * np.var(X, axis=0).mean()
            for _ in range(self.n_components)
        ])
    
    def _e_step(self, X):
        """E步:计算责任"""
        n_samples = X.shape[0]
        responsibilities = np.zeros((n_samples, self.n_components))
        
        for k in range(self.n_components):
            responsibilities[:, k] = self.weights_[k] * multivariate_normal.pdf(
                X, mean=self.means_[k], cov=self.covariances_[k]
            )
        
        # 归一化
        responsibilities_sum = responsibilities.sum(axis=1, keepdims=True)
        responsibilities /= responsibilities_sum + 1e-10
        
        return responsibilities
    
    def _m_step(self, X, responsibilities):
        """M步:更新参数"""
        n_samples, n_features = X.shape
        N_k = responsibilities.sum(axis=0)  # 每个成分的有效样本数
        
        for k in range(self.n_components):
            # 更新均值
            self.means_[k] = (responsibilities[:, k:k+1].T @ X) / (N_k[k] + 1e-10)
            
            # 更新协方差
            diff = X - self.means_[k]
            self.covariances_[k] = (responsibilities[:, k:k+1].T @ (diff ** 2)) / (N_k[k] + 1e-10)
            # 如果要使用完整的协方差矩阵,改为矩阵形式
            
            # 更新权重
            self.weights_[k] = N_k[k] / n_samples
        
        # 确保权重和为1
        self.weights_ /= self.weights_.sum()
    
    def _compute_log_likelihood(self, X):
        """计算对数似然"""
        log_likelihood = 0
        for k in range(self.n_components):
            log_likelihood += self.weights_[k] * multivariate_normal.pdf(
                X, mean=self.means_[k], cov=self.covariances_[k]
            )
        return np.log(log_likelihood + 1e-10).sum()
    
    def fit(self, X):
        """训练GMM"""
        self._initialize(X)
        log_likelihood_prev = -np.inf
        
        for iteration in range(self.max_iter):
            # E步
            responsibilities = self._e_step(X)
            
            # 计算对数似然
            log_likelihood = self._compute_log_likelihood(X)
            
            # 检查收敛
            if abs(log_likelihood - log_likelihood_prev) < self.tol:
                self.converged_ = True
                break
            
            # M步
            self._m_step(X, responsibilities)
            log_likelihood_prev = log_likelihood
        
        return self
    
    def predict_proba(self, X):
        """预测每个样本属于各成分的概率"""
        return self._e_step(X)
    
    def predict(self, X):
        """预测每个样本的标签"""
        responsibilities = self.predict_proba(X)
        return np.argmax(responsibilities, axis=1)

7. EM算法的局限性

7.1 局部最优

EM算法可能收敛到局部最优,取决于初始化。

解决方案

  • 多次随机初始化
  • 使用K-Means等方法获得好的初始值

7.2 慢收敛

当信息矩阵接近奇异时,EM算法收敛很慢。

解决方案

  • ECME算法
  • 加速技术(如Aitken加速)

7.3 维度灾难

当隐变量维度很高时,E步计算困难。

解决方案

  • 变分推断(VI)
  • 马尔可夫链蒙特卡洛(MCMC)

8. EM与变分推断的联系

EM算法和变分推断(VI)都是为了处理隐变量模型中的推断问题。

方面EM变分推断
E步计算精确后验 近似后验
M步最大化完整似然期望最大化ELBO
目标局部最优MLE全局近似后验

详见 变分推断


9. 参考文献


相关阅读

Footnotes

  1. Dempster, A. P., Laird, N. M., & Rubin, D. B. (1977). “Maximum likelihood from incomplete data via the EM algorithm”. Journal of the Royal Statistical Society: Series B, 39(1), 1-38.