EM算法(期望最大化算法)
1. 算法引入
1.1 隐变量的挑战
在许多概率模型中,我们面临这样的问题:观测数据 是可见的,但存在隐变量(latent variables) 使得完整数据 的分布比边缘分布 更容易建模。
典型场景:
- 高斯混合模型:每个数据点属于哪个组件是隐变量
- 隐马尔可夫模型:每个时间步的隐藏状态是隐变量
- 因子分析:潜在因子是隐变量
核心困难:直接最大化对数似然 通常涉及对隐变量的求和/积分,这在连续情况下往往是难解的(intractable)。
1.2 为什么需要EM?
考虑高斯混合模型的似然函数:
这个对数和内部求和的组合使得直接求导并设为零非常困难。EM算法通过迭代地固定一 part,优化另一 part 的策略巧妙地解决了这个问题。
2. EM算法推导
2.1 ELBO的导出
EM算法的核心是构造一个下界函数(Evidence Lower Bound, ELBO),然后迭代地最大化这个下界。
定义完整数据的对数似然:
对于任意分布 ,我们有:
由Jensen不等式( 是凹函数):
其中 是任意分布,令 得到 E步。
2.2 E步:固定参数,估计隐变量分布
E步计算在当前参数 下,隐变量的后验分布:
E步的目标:计算隐变量 的后验分布 ,为M步做好准备。
2.3 M步:固定分布,最大化期望似然
M步最大化关于 的期望完全数据对数似然:
关键洞察:我们不需要计算 ,只需最大化完全数据的期望对数似然。
2.4 EM算法的完整形式
def em_algorithm(X, K, max_iter=100, tol=1e-6):
"""
EM算法基本实现
参数:
X: 观测数据 (N, D)
K: 隐变量数量(高斯组件数)
max_iter: 最大迭代次数
tol: 收敛阈值
返回:
theta: 估计的参数
history: 迭代历史
"""
N, D = X.shape
# 初始化
pi = np.ones(K) / K # 混合系数
mu = X[np.random.choice(N, K, replace=False)] # 随机选择数据点作为均值
sigma = [np.eye(D) for _ in range(K)] # 协方差矩阵
history = {'log_likelihood': []}
for iteration in range(max_iter):
# ===== E步 =====
# 计算每个数据点属于每个组件的后验概率
gamma = np.zeros((N, K))
for k in range(K):
gamma[:, k] = pi[k] * multivariate_normal.pdf(X, mu[k], sigma[k])
# 归一化
gamma = gamma / gamma.sum(axis=1, keepdims=True)
# ===== M步 =====
N_k = gamma.sum(axis=0) # 每个组件的有效数据点数
# 更新混合系数
pi = N_k / N
# 更新均值
for k in range(K):
mu[k] = (gamma[:, k:k+1] * X).sum(axis=0) / N_k[k]
# 更新协方差矩阵
for k in range(K):
X_centered = X - mu[k]
sigma[k] = (X_centered.T * gamma[:, k]) @ X_centered / N_k[k]
# 计算对数似然(用于收敛判断)
log_likelihood = 0
for k in range(K):
log_likelihood += pi[k] * multivariate_normal.pdf(X, mu[k], sigma[k])
log_likelihood = np.log(log_likelihood.sum(axis=1)).sum()
history['log_likelihood'].append(log_likelihood)
# 检查收敛
if iteration > 0:
if abs(log_likelihood - history['log_likelihood'][-2]) < tol:
break
return {'pi': pi, 'mu': mu, 'sigma': sigma}, history3. EM收敛性分析
3.1 似然单调递增
定理:EM算法每一步迭代都会使对数似然 非递减。
证明:
设 ,定义:
则:
E步选择 使 ,所以:
M步选择 ,所以:
E步性质:,所以:
3.2 收敛性边界
EM算法保证收敛到局部最优解,而非全局最优。存在以下几种情况:
| 情况 | 描述 |
|---|---|
| 局部最优 | 算法收敛到对数似然面的局部最大值 |
| 鞍点 | 可能收敛到鞍点(概率较低) |
| 平原 | 在参数空间中存在似然相近的区域 |
避免局部最优的策略:
- 多次随机初始化
- 使用k-means结果初始化GMM
- 确定性退火EM(Deterministic Annealing EM)
3.3 收敛速率
定理(Wu, 1983):在正则条件下,EM算法在真参数 的邻域内具有线性收敛速率,且收敛常数与信息矩阵之比有关。
\|\theta^{(t+1)} - \theta^*\| \approx \left(1 - \frac{\text{tr}}(F(\theta^*) J(\theta^*)^{-1}}\right) \|\theta^{(t)} - \theta^*\|其中 是Fisher信息矩阵, 是完全数据的Fisher信息矩阵。
4. EM变体
4.1 广义EM(GEM)
当M步没有解析解时,我们可以执行梯度上升或任何能增加期望似然的操作:
def gem_m_step(X, gamma, theta_old, step_size=0.1):
"""
广义EM的M步 - 使用梯度上升
"""
# 计算期望完全数据对数似然的梯度
grad = compute_expected_complete_log_likelihood_gradient(X, gamma, theta_old)
# 梯度上升更新
theta_new = theta_old + step_size * grad
# 确保参数合法性(如正定性)
theta_new = project_to_valid_space(theta_new)
return theta_new4.2 ECM算法(Expectation-Conditional Maximization)
ECM将M步分解为多个条件最大化步骤,每个步骤只优化部分参数:
特点:
- 计算更高效
- 利用问题的特殊结构
- 保证收敛性
4.3 ECME算法
ECM的扩展(ECM(E)),在某些CM步骤直接最大化原似然而非ELBO,提高收敛速度。
5. 与变分推断的关系
5.1 本质区别
| 方面 | EM | 变分推断 |
|---|---|---|
| 隐变量后验 | 精确计算 | 近似(如平均场) |
| 参数估计 | 精确优化 | 近似优化 |
| 适用性 | 后验可精确计算 | 后验 intractable |
| 计算复杂度 | M步可能 intractable | 可扩展(SVI) |
| 解的性质 | 局部最优 | 局部最优(ELBO上界) |
5.2 联系:变分推断是EM的随机/近似版本
EM的E步本质上是计算 ,当这个计算 intractable 时,我们可以:
- 随机近似:使用随机变分推断(SVI),用Monte Carlo估计
- 确定性近似:使用变分推断用近似分布 替代真实后验
def em_vs_vi_comparison():
"""
EM vs VI 对比图示
EM: E步 = 精确后验 (当可计算时)
VI: E步 ≈ 近似后验 (平均场或其他族)
"""
"""
EM的E步(M步):
E步: q(Z) = p(Z|X, θ_old) ← 精确后验
M步: θ = argmax E_q[log p(X,Z|θ)]
VI的E步(M步):
E步: q(Z) ≈ p(Z|X, θ) ← 近似后验(通常用平均场)
M步: θ = argmax E_q[log p(X,Z|θ)] ← 同EM,但q更简单
"""
pass6. 应用案例
6.1 高斯混合模型(GMM)
GMM是最经典的EM应用。完整推导参见gaussian-mixture-model。
import numpy as np
from scipy.stats import multivariate_normal
class GMM:
"""高斯混合模型的EM实现"""
def __init__(self, n_components, max_iter=100, tol=1e-6):
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
def fit(self, X):
N, D = X.shape
# 初始化
self.weights = np.ones(self.n_components) / self.n_components
self.means = X[np.random.choice(N, self.n_components, replace=False)]
self.covs = [np.eye(D) for _ in range(self.n_components)]
for _ in range(self.max_iter):
# E步
responsibilities = self._e_step(X)
# M步
self._m_step(X, responsibilities)
def _e_step(self, X):
"""计算后验概率(隐变量的后验)"""
N = X.shape[0]
log_resp = np.zeros((N, self.n_components))
for k in range(self.n_components):
log_resp[:, k] = np.log(self.weights[k] + 1e-10) + \
multivariate_normal.logpdf(X, self.means[k], self.covs[k])
# 归一化为概率
log_resp -= log_resp.max(axis=1, keepdims=True)
resp = np.exp(log_resp)
resp /= resp.sum(axis=1, keepdims=True)
return resp
def _m_step(self, X, resp):
"""更新参数"""
N, D = X.shape
Nk = resp.sum(axis=0)
# 更新权重
self.weights = Nk / N
# 更新均值和协方差
for k in range(self.n_components):
self.means[k] = (resp[:, k:k+1] * X).sum(axis=0) / Nk[k]
X_centered = X - self.means[k]
self.covs[k] = (X_centered.T * resp[:, k]) @ X_centered / Nk[k]6.2 隐马尔可夫模型(HMM)
HMM的参数学习(Baum-Welch算法)本质上是EM算法的特例。参见hidden-markov-model。
6.3 因子分析
因子分析的EM算法推导简洁,参见factor-analysis。
7. EM算法的局限性
7.1 计算挑战
| 挑战 | 描述 | 解决方案 |
|---|---|---|
| M步 intractable | 期望对数似然无法解析优化 | GEM、ECM、坐标下降 |
| 局部最优 | 可能收敛到局部最优 | 多次初始化、annealing |
| 收敛慢 | 信息矩阵之比接近1时 | ECME、加速EM |
| 隐变量维度高 | 后验分布复杂 | 变分近似 |
7.2 当EM不适用时
- 后验 intractable:使用变分EM(变分推断作为近似E步)
- 参数维度高:使用随机变分推断(SVI)
- 需要全局最优:使用混合整数规划或基于采样的方法
8. 代码实现:完整EM算法框架
import numpy as np
from abc import ABC, abstractmethod
from typing import Dict, Any, Tuple
class EMBase(ABC):
"""EM算法的抽象基类"""
@abstractmethod
def e_step(self, X: np.ndarray) -> np.ndarray:
"""E步:计算隐变量后验分布
Args:
X: 观测数据
Returns:
gamma: 隐变量的后验概率 (N, K)
"""
pass
@abstractmethod
def m_step(self, X: np.ndarray, gamma: np.ndarray) -> None:
"""M步:最大化期望完全数据对数似然
Args:
X: 观测数据
gamma: 隐变量的后验概率
"""
pass
def compute_log_likelihood(self, X: np.ndarray) -> float:
"""计算观测数据的对数似然"""
pass
def fit(self, X: np.ndarray, max_iter: int = 100,
tol: float = 1e-6, verbose: bool = True) -> Dict[str, Any]:
"""EM算法主循环
Args:
X: 观测数据
max_iter: 最大迭代次数
tol: 收敛阈值
verbose: 是否打印迭代信息
Returns:
history: 迭代历史
"""
history = {'log_likelihood': [], 'params': []}
for t in range(max_iter):
# E步
gamma = self.e_step(X)
# M步
self.m_step(X, gamma)
# 计算对数似然
ll = self.compute_log_likelihood(X)
history['log_likelihood'].append(ll)
if verbose and t % 10 == 0:
print(f"Iter {t}: Log-likelihood = {ll:.4f}")
# 收敛检查
if t > 0:
if abs(history['log_likelihood'][-1] -
history['log_likelihood'][-2]) < tol:
if verbose:
print(f"Converged at iteration {t}")
break
return history
class GMM_EM(EMBase):
"""高斯混合模型的EM实现"""
def __init__(self, n_components: int):
self.n_components = n_components
def _initialize(self, X: np.ndarray) -> None:
"""初始化参数"""
N, D = X.shape
self.weights = np.ones(self.n_components) / self.n_components
self.means = X[np.random.choice(N, self.n_components, replace=False)]
self.covs = [np.eye(D) for _ in range(self.n_components)]
def e_step(self, X: np.ndarray) -> np.ndarray:
N = X.shape[0]
gamma = np.zeros((N, self.n_components))
for k in range(self.n_components):
gamma[:, k] = self.weights[k] * multivariate_normal.pdf(
X, self.means[k], self.covs[k]
)
gamma /= gamma.sum(axis=1, keepdims=True)
return gamma
def m_step(self, X: np.ndarray, gamma: np.ndarray) -> None:
N, D = X.shape
Nk = gamma.sum(axis=0)
self.weights = Nk / N
for k in range(self.n_components):
self.means[k] = (gamma[:, k:k+1] * X).sum(axis=0) / Nk[k]
X_centered = X - self.means[k]
self.covs[k] = (X_centered.T * gamma[:, k]) @ X_centered / Nk[k]
def compute_log_likelihood(self, X: np.ndarray) -> float:
ll = np.zeros(X.shape[0])
for k in range(self.n_components):
ll += self.weights[k] * multivariate_normal.pdf(
X, self.means[k], self.covs[k]
)
return np.log(ll + 1e-10).sum()