EM算法(期望最大化算法)

1. 算法引入

1.1 隐变量的挑战

在许多概率模型中,我们面临这样的问题:观测数据 是可见的,但存在隐变量(latent variables) 使得完整数据 的分布比边缘分布 更容易建模。

典型场景

  • 高斯混合模型:每个数据点属于哪个组件是隐变量
  • 隐马尔可夫模型:每个时间步的隐藏状态是隐变量
  • 因子分析:潜在因子是隐变量

核心困难:直接最大化对数似然 通常涉及对隐变量的求和/积分,这在连续情况下往往是难解的(intractable)。

1.2 为什么需要EM?

考虑高斯混合模型的似然函数:

这个对数和内部求和的组合使得直接求导并设为零非常困难。EM算法通过迭代地固定一 part,优化另一 part 的策略巧妙地解决了这个问题。


2. EM算法推导

2.1 ELBO的导出

EM算法的核心是构造一个下界函数(Evidence Lower Bound, ELBO),然后迭代地最大化这个下界。

定义完整数据的对数似然

对于任意分布 ,我们有:

Jensen不等式 是凹函数):

其中 是任意分布,令 得到 E步

2.2 E步:固定参数,估计隐变量分布

E步计算在当前参数 下,隐变量的后验分布:

E步的目标:计算隐变量 的后验分布 ,为M步做好准备。

2.3 M步:固定分布,最大化期望似然

M步最大化关于 的期望完全数据对数似然:

关键洞察:我们不需要计算 ,只需最大化完全数据的期望对数似然。

2.4 EM算法的完整形式

def em_algorithm(X, K, max_iter=100, tol=1e-6):
    """
    EM算法基本实现
    
    参数:
        X: 观测数据 (N, D)
        K: 隐变量数量(高斯组件数)
        max_iter: 最大迭代次数
        tol: 收敛阈值
    
    返回:
        theta: 估计的参数
        history: 迭代历史
    """
    N, D = X.shape
    
    # 初始化
    pi = np.ones(K) / K  # 混合系数
    mu = X[np.random.choice(N, K, replace=False)]  # 随机选择数据点作为均值
    sigma = [np.eye(D) for _ in range(K)]  # 协方差矩阵
    
    history = {'log_likelihood': []}
    
    for iteration in range(max_iter):
        # ===== E步 =====
        # 计算每个数据点属于每个组件的后验概率
        gamma = np.zeros((N, K))
        for k in range(K):
            gamma[:, k] = pi[k] * multivariate_normal.pdf(X, mu[k], sigma[k])
        # 归一化
        gamma = gamma / gamma.sum(axis=1, keepdims=True)
        
        # ===== M步 =====
        N_k = gamma.sum(axis=0)  # 每个组件的有效数据点数
        
        # 更新混合系数
        pi = N_k / N
        
        # 更新均值
        for k in range(K):
            mu[k] = (gamma[:, k:k+1] * X).sum(axis=0) / N_k[k]
        
        # 更新协方差矩阵
        for k in range(K):
            X_centered = X - mu[k]
            sigma[k] = (X_centered.T * gamma[:, k]) @ X_centered / N_k[k]
        
        # 计算对数似然(用于收敛判断)
        log_likelihood = 0
        for k in range(K):
            log_likelihood += pi[k] * multivariate_normal.pdf(X, mu[k], sigma[k])
        log_likelihood = np.log(log_likelihood.sum(axis=1)).sum()
        history['log_likelihood'].append(log_likelihood)
        
        # 检查收敛
        if iteration > 0:
            if abs(log_likelihood - history['log_likelihood'][-2]) < tol:
                break
    
    return {'pi': pi, 'mu': mu, 'sigma': sigma}, history

3. EM收敛性分析

3.1 似然单调递增

定理:EM算法每一步迭代都会使对数似然 非递减。

证明

,定义:

则:

E步选择 使 ,所以:

M步选择 ,所以:

E步性质:,所以:

3.2 收敛性边界

EM算法保证收敛到局部最优解,而非全局最优。存在以下几种情况:

情况描述
局部最优算法收敛到对数似然面的局部最大值
鞍点可能收敛到鞍点(概率较低)
平原在参数空间中存在似然相近的区域

避免局部最优的策略

  • 多次随机初始化
  • 使用k-means结果初始化GMM
  • 确定性退火EM(Deterministic Annealing EM)

3.3 收敛速率

定理(Wu, 1983):在正则条件下,EM算法在真参数 的邻域内具有线性收敛速率,且收敛常数与信息矩阵之比有关。

\|\theta^{(t+1)} - \theta^*\| \approx \left(1 - \frac{\text{tr}}(F(\theta^*) J(\theta^*)^{-1}}\right) \|\theta^{(t)} - \theta^*\|

其中 是Fisher信息矩阵, 是完全数据的Fisher信息矩阵。


4. EM变体

4.1 广义EM(GEM)

当M步没有解析解时,我们可以执行梯度上升或任何能增加期望似然的操作:

def gem_m_step(X, gamma, theta_old, step_size=0.1):
    """
    广义EM的M步 - 使用梯度上升
    """
    # 计算期望完全数据对数似然的梯度
    grad = compute_expected_complete_log_likelihood_gradient(X, gamma, theta_old)
    
    # 梯度上升更新
    theta_new = theta_old + step_size * grad
    
    # 确保参数合法性(如正定性)
    theta_new = project_to_valid_space(theta_new)
    
    return theta_new

4.2 ECM算法(Expectation-Conditional Maximization)

ECM将M步分解为多个条件最大化步骤,每个步骤只优化部分参数:

特点

  • 计算更高效
  • 利用问题的特殊结构
  • 保证收敛性

4.3 ECME算法

ECM的扩展(ECM(E)),在某些CM步骤直接最大化原似然而非ELBO,提高收敛速度。


5. 与变分推断的关系

5.1 本质区别

方面EM变分推断
隐变量后验精确计算近似(如平均场)
参数估计精确优化近似优化
适用性后验可精确计算后验 intractable
计算复杂度M步可能 intractable可扩展(SVI)
解的性质局部最优局部最优(ELBO上界)

5.2 联系:变分推断是EM的随机/近似版本

EM的E步本质上是计算 ,当这个计算 intractable 时,我们可以:

  1. 随机近似:使用随机变分推断(SVI),用Monte Carlo估计
  2. 确定性近似:使用变分推断用近似分布 替代真实后验
def em_vs_vi_comparison():
    """
    EM vs VI 对比图示
    
    EM:   E步 = 精确后验 (当可计算时)
    VI:   E步 ≈ 近似后验 (平均场或其他族)
    
    """
    
    """
    EM的E步(M步):
    E步: q(Z) = p(Z|X, θ_old)  ← 精确后验
    M步: θ = argmax E_q[log p(X,Z|θ)]
    
    VI的E步(M步):
    E步: q(Z) ≈ p(Z|X, θ)      ← 近似后验(通常用平均场)
    M步: θ = argmax E_q[log p(X,Z|θ)]  ← 同EM,但q更简单
    """
    pass

6. 应用案例

6.1 高斯混合模型(GMM)

GMM是最经典的EM应用。完整推导参见gaussian-mixture-model

import numpy as np
from scipy.stats import multivariate_normal
 
class GMM:
    """高斯混合模型的EM实现"""
    
    def __init__(self, n_components, max_iter=100, tol=1e-6):
        self.n_components = n_components
        self.max_iter = max_iter
        self.tol = tol
        
    def fit(self, X):
        N, D = X.shape
        
        # 初始化
        self.weights = np.ones(self.n_components) / self.n_components
        self.means = X[np.random.choice(N, self.n_components, replace=False)]
        self.covs = [np.eye(D) for _ in range(self.n_components)]
        
        for _ in range(self.max_iter):
            # E步
            responsibilities = self._e_step(X)
            
            # M步
            self._m_step(X, responsibilities)
            
    def _e_step(self, X):
        """计算后验概率(隐变量的后验)"""
        N = X.shape[0]
        log_resp = np.zeros((N, self.n_components))
        
        for k in range(self.n_components):
            log_resp[:, k] = np.log(self.weights[k] + 1e-10) + \
                            multivariate_normal.logpdf(X, self.means[k], self.covs[k])
        
        # 归一化为概率
        log_resp -= log_resp.max(axis=1, keepdims=True)
        resp = np.exp(log_resp)
        resp /= resp.sum(axis=1, keepdims=True)
        
        return resp
    
    def _m_step(self, X, resp):
        """更新参数"""
        N, D = X.shape
        Nk = resp.sum(axis=0)
        
        # 更新权重
        self.weights = Nk / N
        
        # 更新均值和协方差
        for k in range(self.n_components):
            self.means[k] = (resp[:, k:k+1] * X).sum(axis=0) / Nk[k]
            X_centered = X - self.means[k]
            self.covs[k] = (X_centered.T * resp[:, k]) @ X_centered / Nk[k]

6.2 隐马尔可夫模型(HMM)

HMM的参数学习(Baum-Welch算法)本质上是EM算法的特例。参见hidden-markov-model

6.3 因子分析

因子分析的EM算法推导简洁,参见factor-analysis


7. EM算法的局限性

7.1 计算挑战

挑战描述解决方案
M步 intractable期望对数似然无法解析优化GEM、ECM、坐标下降
局部最优可能收敛到局部最优多次初始化、annealing
收敛慢信息矩阵之比接近1时ECME、加速EM
隐变量维度高后验分布复杂变分近似

7.2 当EM不适用时

  • 后验 intractable:使用变分EM(变分推断作为近似E步)
  • 参数维度高:使用随机变分推断(SVI)
  • 需要全局最优:使用混合整数规划基于采样的方法

8. 代码实现:完整EM算法框架

import numpy as np
from abc import ABC, abstractmethod
from typing import Dict, Any, Tuple
 
class EMBase(ABC):
    """EM算法的抽象基类"""
    
    @abstractmethod
    def e_step(self, X: np.ndarray) -> np.ndarray:
        """E步:计算隐变量后验分布
        
        Args:
            X: 观测数据
            
        Returns:
            gamma: 隐变量的后验概率 (N, K)
        """
        pass
    
    @abstractmethod
    def m_step(self, X: np.ndarray, gamma: np.ndarray) -> None:
        """M步:最大化期望完全数据对数似然
        
        Args:
            X: 观测数据
            gamma: 隐变量的后验概率
        """
        pass
    
    def compute_log_likelihood(self, X: np.ndarray) -> float:
        """计算观测数据的对数似然"""
        pass
    
    def fit(self, X: np.ndarray, max_iter: int = 100, 
            tol: float = 1e-6, verbose: bool = True) -> Dict[str, Any]:
        """EM算法主循环
        
        Args:
            X: 观测数据
            max_iter: 最大迭代次数
            tol: 收敛阈值
            verbose: 是否打印迭代信息
            
        Returns:
            history: 迭代历史
        """
        history = {'log_likelihood': [], 'params': []}
        
        for t in range(max_iter):
            # E步
            gamma = self.e_step(X)
            
            # M步
            self.m_step(X, gamma)
            
            # 计算对数似然
            ll = self.compute_log_likelihood(X)
            history['log_likelihood'].append(ll)
            
            if verbose and t % 10 == 0:
                print(f"Iter {t}: Log-likelihood = {ll:.4f}")
            
            # 收敛检查
            if t > 0:
                if abs(history['log_likelihood'][-1] - 
                       history['log_likelihood'][-2]) < tol:
                    if verbose:
                        print(f"Converged at iteration {t}")
                    break
        
        return history
 
 
class GMM_EM(EMBase):
    """高斯混合模型的EM实现"""
    
    def __init__(self, n_components: int):
        self.n_components = n_components
        
    def _initialize(self, X: np.ndarray) -> None:
        """初始化参数"""
        N, D = X.shape
        self.weights = np.ones(self.n_components) / self.n_components
        self.means = X[np.random.choice(N, self.n_components, replace=False)]
        self.covs = [np.eye(D) for _ in range(self.n_components)]
    
    def e_step(self, X: np.ndarray) -> np.ndarray:
        N = X.shape[0]
        gamma = np.zeros((N, self.n_components))
        
        for k in range(self.n_components):
            gamma[:, k] = self.weights[k] * multivariate_normal.pdf(
                X, self.means[k], self.covs[k]
            )
        
        gamma /= gamma.sum(axis=1, keepdims=True)
        return gamma
    
    def m_step(self, X: np.ndarray, gamma: np.ndarray) -> None:
        N, D = X.shape
        Nk = gamma.sum(axis=0)
        
        self.weights = Nk / N
        
        for k in range(self.n_components):
            self.means[k] = (gamma[:, k:k+1] * X).sum(axis=0) / Nk[k]
            X_centered = X - self.means[k]
            self.covs[k] = (X_centered.T * gamma[:, k]) @ X_centered / Nk[k]
    
    def compute_log_likelihood(self, X: np.ndarray) -> float:
        ll = np.zeros(X.shape[0])
        for k in range(self.n_components):
            ll += self.weights[k] * multivariate_normal.pdf(
                X, self.means[k], self.covs[k]
            )
        return np.log(ll + 1e-10).sum()

参考文献