概述
期望最大化(Expectation-Maximization, EM)算法是处理含隐变量的概率模型参数估计的经典方法。1本专题深入探讨EM算法的理论保证、高级变体以及与变分推断的联系。
EM算法的收敛性分析
收敛性定理
EM算法的核心性质是单调收敛性:每次迭代都会增加(或至少保持)观测数据的对数似然。
定理:设 是第 次迭代的参数估计,则
其中 是观测数据的边际对数似然。
收敛性证明
定义 Q 函数:
引理1:对于任意 ,
引理2:KL散度非负,即 。
由引理1和引理2可得:
定义 ,则 。
EM算法的 M步 选择 ,因此:
收敛速度分析
EM算法的收敛速度取决于信息矩阵的比例:
其中:
- :Fisher信息矩阵(观测信息)
- :完整数据的Fisher信息矩阵
缺失信息比例(Missing Information Ratio):
EM算法的收敛速度与 成反比。当 接近1时,EM收敛快;当 接近0时,EM收敛慢。
import numpy as np
import matplotlib.pyplot as plt
def simulate_em_convergence(true_params, n_samples, noise_scale):
"""
模拟EM算法的收敛过程
"""
np.random.seed(42)
# 生成数据
X = generate_data(true_params, n_samples, noise_scale)
# 初始化
theta = initialize_params()
log_likelihoods = [compute_log_likelihood(X, theta)]
for _ in range(max_iterations):
# E步:计算隐变量的后验
Q = e_step(X, theta)
# M步:更新参数
theta = m_step(X, Q)
# 记录对数似然
log_likelihoods.append(compute_log_likelihood(X, theta))
# 检查收敛
if abs(log_likelihoods[-1] - log_likelihoods[-2]) < tol:
break
return log_likelihoods
def plot_convergence(log_likelihoods):
"""
可视化EM算法的收敛过程
"""
plt.figure(figsize=(10, 6))
plt.plot(log_likelihoods, 'b-', linewidth=2)
plt.xlabel('迭代次数')
plt.ylabel('观测对数似然')
plt.title('EM算法收敛曲线')
plt.grid(True, alpha=0.3)
plt.axhline(y=max(log_likelihoods), color='r', linestyle='--',
label=f'最大似然: {max(log_likelihoods):.2f}')
plt.legend()
plt.show()EM算法的变体
ECM算法(Expectation-Conditional Maximization)
ECM算法将M步分解为多个条件M步,每次只优化一部分参数。
class ECM:
"""
ECM算法:期望条件最大化
将M步分解为多个更简单的条件最大化步骤
"""
def __init__(self, model):
self.model = model
def fit(self, X, n_iterations=100, tol=1e-6):
"""
ECM算法主循环
"""
theta = self.model.init_params(X)
log_liks = []
for iteration in range(n_iterations):
# E步
Q = self.e_step(X, theta)
# CM步 1:最大化部分参数
theta_1 = self.cm_step_1(X, Q, theta)
# CM步 2:最大化另一部分参数
theta = self.cm_step_2(X, Q, theta_1)
# 记录似然
log_lik = self.model.log_likelihood(X, theta)
log_liks.append(log_lik)
# 收敛检查
if iteration > 0 and abs(log_liks[-1] - log_liks[-2]) < tol:
break
return theta, log_liks
def e_step(self, X, theta):
"""
E步:计算隐变量的后验分布
"""
return self.model.compute_posterior(X, theta)
def cm_step_1(self, X, Q, theta):
"""
CM步1:条件最大化第一组参数
"""
# 只更新第一组参数,固定其他参数
theta_new = theta.copy()
theta_new['group1'] = self.optimize_group1(X, Q, theta)
return theta_new
def cm_step_2(self, X, Q, theta):
"""
CM步2:条件最大化第二组参数
"""
theta_new = theta.copy()
theta_new['group2'] = self.optimize_group2(X, Q, theta)
return theta_newECME算法(ECM Extensions)
ECME是ECM的扩展,允许在CM步直接最大化原始似然函数而不是Q函数。
优势:
- 收敛更快
- 可以处理更复杂的约束
class ECME:
"""
ECME算法:ECM扩展
"""
def fit(self, X, n_iterations=100):
theta = self.init_params(X)
for _ in range(n_iterations):
Q = self.e_step(X, theta)
# 直接最大化观测似然
theta = self.maximize_likelihood(X, Q, theta)
return theta
def maximize_likelihood(self, X, Q, theta):
"""
直接最大化原始似然
"""
# 使用数值优化方法
def objective(params):
return -self.log_likelihood(X, self.update_theta(theta, params))
result = minimize(objective, theta_to_array(theta))
return array_to_theta(result.x, theta)SEM算法(Stochastic EM)
SEM算法在E步添加随机采样,增强探索能力。
class SEM:
"""
SEM算法:随机EM
在E步使用随机采样而非条件期望
"""
def e_step_stochastic(self, X, theta):
"""
随机E步:采样隐变量而不是计算期望
"""
# 计算后验分布参数
posterior_params = self.compute_posterior_params(X, theta)
# 随机采样
z_samples = self.sample_from_posterior(posterior_params)
return z_samples
def fit(self, X, n_iterations=100, annealing_schedule=None):
"""
带退火的SEM算法
"""
theta = self.init_params(X)
for t in range(n_iterations):
# 随机E步
z_samples = self.e_step_stochastic(X, theta)
# M步
theta = self.m_step(X, z_samples)
# 退火(可选)
if annealing_schedule is not None:
temperature = annealing_schedule(t)
theta = self.apply_temperature(theta, temperature)
return thetaEM算法在高斯混合模型中的应用
高斯混合模型的EM推导
模型:
- 混合权重:
- 每个分量:
E步:计算每个数据点属于每个分量的后验概率
M步:更新参数
class GaussianMixtureEM:
"""
高斯混合模型的EM算法实现
"""
def __init__(self, n_components, max_iter=100, tol=1e-4, reg_covar=1e-6):
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
self.reg_covar = reg_covar # 正则化防止奇异协方差矩阵
def fit(self, X):
"""
训练高斯混合模型
"""
n_samples, n_features = X.shape
# 初始化参数
self._initialize_parameters(X)
log_likelihood_history = []
for iteration in range(self.max_iter):
# E步:计算 responsibilities
responsibilities = self._e_step(X)
# 记录对数似然
log_lik = self._compute_log_likelihood(X, responsibilities)
log_likelihood_history.append(log_lik)
# M步:更新参数
self._m_step(X, responsibilities)
# 收敛检查
if iteration > 0:
change = abs(log_lik - log_likelihood_history[-2])
if change < self.tol:
break
return self
def _initialize_parameters(self, X):
"""
使用K-means初始化参数
"""
from sklearn.cluster import KMeans
n_samples, n_features = X.shape
# K-means初始化
kmeans = KMeans(n_clusters=self.n_components, random_state=42)
kmeans.fit(X)
# 初始化均值
self.means_ = kmeans.cluster_centers_
# 初始化协方差矩阵
self.covariances_ = np.zeros((self.n_components, n_features, n_features))
for k in range(self.n_components):
self.covariances_[k] = np.cov(X.T) / self.n_components
# 初始化混合权重
self.weights_ = np.full(self.n_components, 1.0 / self.n_components)
def _e_step(self, X):
"""
E步:计算 responsibilities
"""
responsibilities = self._compute_responsibilities(X)
return responsibilities
def _m_step(self, X, responsibilities):
"""
M步:更新参数
"""
n_samples = X.shape[0]
# 更新混合权重
Nk = responsibilities.sum(axis=0)
self.weights_ = Nk / n_samples
# 更新均值
for k in range(self.n_components):
self.means_[k] = (responsibilities[:, k:k+1] * X).sum(axis=0) / Nk[k]
# 更新协方差矩阵
for k in range(self.n_components):
diff = X - self.means_[k]
cov = (responsibilities[:, k:k+1] * diff).T @ diff / Nk[k]
# 添加正则化
self.covariances_[k] = cov + self.reg_covar * np.eye(X.shape[1])
def _compute_responsibilities(self, X):
"""
计算每个样本属于每个分量的后验概率
"""
from scipy.stats import multivariate_normal
n_samples = X.shape[0]
responsibilities = np.zeros((n_samples, self.n_components))
for k in range(self.n_components):
responsibilities[:, k] = self.weights_[k] * \
multivariate_normal.pdf(X, self.means_[k], self.covariances_[k])
# 归一化
responsibilities /= responsibilities.sum(axis=1, keepdims=True)
return responsibilities
def predict(self, X):
"""
预测每个样本的簇标签
"""
responsibilities = self._compute_responsibilities(X)
return responsibilities.argmax(axis=1)
def predict_proba(self, X):
"""
返回每个样本属于每个簇的概率
"""
return self._compute_responsibilities(X)EM算法与变分推断的联系
EM作为变分推断的特殊情况
EM算法可以看作是变分推断的一个特例,其中变分分布被限制为点估计:
这意味着变分分布与真实后验分布之间的KL散度为0或无穷大。
变分EM
变分EM将变分推断的思想引入EM算法,允许在E步使用近似后验分布:
class VariationalEM:
"""
变分EM算法
在E步使用变分分布近似后验
"""
def __init__(self, model, variational_family='mean_field'):
self.model = model
self.variational_family = variational_family
def fit(self, X, n_iterations=100):
theta = self.model.init_params(X)
for _ in range(n_iterations):
# 变分E步:优化变分分布
q_z = self.variational_e_step(X, theta)
# M步:优化参数
theta = self.m_step(X, q_z)
return theta, q_z
def variational_e_step(self, X, theta):
"""
变分E步:优化变分分布
使用坐标上升变分推断
"""
q = self.init_variational_dist()
for vi_iter in range(max_vi_iterations):
q_old = q.copy()
# 迭代更新每个变分因子
for i in range(self.model.n_latent):
q[i] = self.update_variational_factor(X, q, i, theta)
# 检查收敛
if self.variational_dist_change(q, q_old) < vi_tol:
break
return q比较分析
| 方面 | EM算法 | 变分EM |
|---|---|---|
| E步 | 计算后验期望 | 优化变分分布 |
| M步 | 点估计参数 | 可能也是分布 |
| 隐变量 | 积分近似 | 直接优化 |
| 适用场景 | 共轭模型 | 非共轭模型 |
| 计算复杂度 | 低 | 中等 |
EM算法的实践技巧
初始化策略
def smart_initialization(X, n_components, method='kmeans'):
"""
智能初始化策略
"""
if method == 'kmeans':
# K-means初始化
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_components, n_init=10)
labels = kmeans.fit_predict(X)
means = kmeans.cluster_centers_
covariances = []
weights = []
for k in range(n_components):
cluster_data = X[labels == k]
if len(cluster_data) > 1:
covariances.append(np.cov(cluster_data.T))
weights.append(len(cluster_data) / len(X))
else:
covariances.append(np.cov(X.T))
weights.append(1.0 / n_components)
return {'means': np.array(means),
'covariances': np.array(covariances),
'weights': np.array(weights)}
elif method == 'random':
# 随机初始化
indices = np.random.choice(len(X), n_components, replace=False)
means = X[indices]
covariances = np.array([np.cov(X.T)] * n_components)
weights = np.ones(n_components) / n_components
return {'means': means,
'covariances': covariances,
'weights': weights}
elif method == 'spectral':
# 谱初始化
from sklearn.cluster import SpectralClustering
labels = SpectralClustering(n_clusters=n_components).fit_predict(X)
# ... 类似 kmeans 的处理处理奇异性
当某个分量的协方差矩阵趋近于零时,似然函数会趋向无穷大,导致EM算法不稳定。
解决方案:
- 正则化:添加小的正定矩阵
- 约束:限制协方差矩阵的最小特征值
- 重新初始化:检测到奇异性时重新初始化该分量
def safe_covariance_update(X, responsibilities, k, min_eigenvalue=1e-6):
"""
安全协方差更新:防止奇异性
"""
Nk = responsibilities.sum()
mean = (responsibilities[:, k:k+1] * X).sum(axis=0) / Nk
diff = X - mean
cov = (responsibilities[:, k:k+1] * diff).T @ diff / Nk
# 特征值分解
eigenvalues, eigenvectors = np.linalg.eigh(cov)
# 限制最小特征值
eigenvalues = np.maximum(eigenvalues, min_eigenvalue)
# 重建协方差矩阵
cov = eigenvectors @ np.diag(eigenvalues) @ eigenvectors.T
return cov收敛诊断
def diagnose_convergence(log_likelihoods, threshold=1e-4):
"""
收敛诊断
"""
changes = np.diff(log_likelihoods)
# 检查单调性
is_monotonic = np.all(changes >= -1e-10)
# 检查收敛
final_change = abs(changes[-1])
is_converged = final_change < threshold
# 计算收敛速率
if len(changes) > 1:
relative_changes = np.abs(changes[1:]) / (np.abs(log_likelihoods[:-1]) + 1e-10)
avg_convergence_rate = np.mean(relative_changes[-10:])
else:
avg_convergence_rate = np.inf
return {
'is_monotonic': is_monotonic,
'is_converged': is_converged,
'final_change': final_change,
'convergence_rate': avg_convergence_rate
}应用示例:混合专家模型
EM算法可以用于训练混合专家(Mixture of Experts)模型:
class MixtureOfExperts:
"""
混合专家模型的EM训练
"""
def __init__(self, n_experts, expert_model):
self.n_experts = n_experts
self.expert_model = expert_model
def fit(self, X, y, n_iterations=100):
"""
使用EM算法训练MoE模型
"""
n_samples = len(X)
# 初始化
self.gating_weights = np.random.randn(self.n_experts, X.shape[1])
self.gating_bias = np.zeros(self.n_experts)
for iteration in range(n_iterations):
# E步:计算每个专家的激活权重
logits = X @ self.gating_weights.T + self.gating_bias
probs = softmax(logits, axis=1) # gating probabilities
# M步:更新每个专家
for k in range(self.n_experts):
# 加权数据
weights = probs[:, k]
total_weight = weights.sum()
# 训练专家 k
self.experts[k] = train_weighted_regressor(
X, y, sample_weight=weights
)
# 更新gating网络
self.gating_weights[k] = compute_gating_weight(
X, y, self.experts[k], probs[:, k]
)
return self
def predict(self, X):
"""
预测:加权集成
"""
# Gating
logits = X @ self.gating_weights.T + self.gating_bias
probs = softmax(logits, axis=1)
# 加权预测
predictions = np.zeros(len(X))
for k in range(self.n_experts):
predictions += probs[:, k] * self.experts[k].predict(X)
return predictions参考
相关链接
Footnotes
-
Dempster, A. P., Laird, N. M., & Rubin, D. B. (1977). Maximum Likelihood from Incomplete Data via the EM Algorithm. Journal of the Royal Statistical Society. ↩