高斯混合模型(GMM)
高斯混合模型(Gaussian Mixture Model, GMM)是一种经典的生成式聚类算法,也是概率密度估计的重要方法。GMM假设数据由多个高斯分布混合生成,通过EM算法估计模型参数。1
1. 模型定义
1.1 直观理解
想象一个数据集包含两类人群:一类是大学生(身高分布均值165cm,方差较小),另一类是篮球运动员(身高分布均值190cm,方差较大)。
GMM将整体数据建模为多个高斯分布的加权混合:
其中:
- :混合成分数量
- :第 个成分的混合系数,满足
- :第 个高斯分布的概率密度
- :均值向量
- :协方差矩阵
1.2 生成过程
GMM的隐变量视角揭示了数据生成的真实过程:
- 首先,从多项分布 中采样一个隐变量
- 然后,从对应的高斯分布 中采样观测数据
这个隐变量 通常称为潜在类别或聚类标签。
1.3 参数
对于 维数据,GMM的参数包括:
| 参数 | 数量 | 约束 |
|---|---|---|
| 无 | ||
| 正定对称 |
总参数数量:
2. 参数估计:EM算法
GMM的参数估计是EM算法的经典应用。详见 EM算法。
2.1 E步:计算责任
责任(Responsibility):第 个成分对第 个数据点负责的程度。
含义:给定当前参数, 表示数据点 属于第 个成分的后验概率。
2.2 M步:更新参数
更新混合系数:
其中 是第 个成分的”有效样本数”。
更新均值:
更新协方差:
2.3 协方差类型
为了控制模型复杂度和防止过拟合,可以限制协方差矩阵的形式:
| 类型 | 协方差形式 | 参数数量/成分 | 特点 |
|---|---|---|---|
| Full | 任意正定矩阵 | 最灵活,可能过拟合 | |
| Diagonal | 对角矩阵 | 轴对齐分布 | |
| Tied | 所有成分共享 | 共享形状,仅位置不同 | |
| Spherical | 各向同性球形 |
class GMMVariants:
"""GMM的不同协方差类型"""
def __init__(self, covariance_type='full'):
self.covariance_type = covariance_type # 'full', 'tied', 'diag', 'spherical'
def _init_covariances(self, X, n_components):
"""根据类型初始化协方差"""
n_features = X.shape[1]
var = np.var(X, axis=0).mean() # 数据方差
if self.covariance_type == 'full':
# 任意正定矩阵
return [np.eye(n_features) * var for _ in range(n_components)]
elif self.covariance_type == 'tied':
# 所有成分共享
return np.eye(n_features) * var
elif self.covariance_type == 'diag':
# 对角矩阵
return [np.eye(n_features) * var for _ in range(n_components)]
elif self.covariance_type == 'spherical':
# 标量 × 单位矩阵
return [var for _ in range(n_components)]3. GMM用于聚类
3.1 软聚类 vs 硬聚类
GMM提供的是软聚类(Soft Clustering),每个数据点属于每个聚类的程度由责任 表示。
硬聚类(Hard Clustering):将每个点分配给责任最大的类别:
3.2 聚类边界
当协方差矩阵相等时,GMM退化为K-Means。一般来说,GMM的聚类边界是二次曲面(quadratic surfaces),比K-Means的线性边界更灵活。
import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
def plot_gmm_clustering(X, true_labels):
"""可视化GMM聚类结果"""
# 训练GMM
gmm = GaussianMixture(n_components=3, covariance_type='full', random_state=42)
gmm.fit(X)
# 预测类别(硬聚类)
predicted_labels = gmm.predict(X)
# 预测概率(软聚类)
probs = gmm.predict_proba(X)
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# 真实标签
axes[0].scatter(X[:, 0], X[:, 1], c=true_labels, cmap='viridis', alpha=0.6)
axes[0].set_title('True Labels')
# GMM预测
axes[1].scatter(X[:, 0], X[:, 1], c=predicted_labels, cmap='viridis', alpha=0.6)
axes[1].set_title('GMM Hard Clustering')
# GMM概率热力图
axes[2].scatter(X[:, 0], X[:, 1], c=probs[:, 0], cmap='coolwarm', alpha=0.6)
axes[2].set_title('GMM Soft Clustering (Class 1 Probability)')
plt.tight_layout()
plt.show()
return predicted_labels, probs4. GMM与K-Means的关系
4.1 K-Means的GMM视角
K-Means可以看作是GMM的极限情况:
当 GMM 的协方差矩阵满足 ()时,EM算法的E步会趋于硬聚类:
4.2 对比
| 方面 | K-Means | GMM |
|---|---|---|
| 聚类形状 | 球形 | 椭圆形(任意方向) |
| 边界类型 | Voronoi图(线性) | 二次曲面 |
| 输出 | 硬标签 | 软概率 |
| 参数 | 仅均值 | 均值+协方差+权重 |
| 初始化敏感度 | 高 | 高 |
| 对协方差约束 | 无(隐式为单位阵) | 可选 |
# K-Means 可以视为 GMM 的特例
from sklearn.cluster import KMeans
def gmm_as_kmeans_limit(X, n_clusters, epsilon=1e-6):
"""
当协方差趋近于0时,GMM的行为趋近于K-Means
"""
# 使用固定的极小协方差
gmm = GaussianMixture(
n_components=n_clusters,
covariance_type='spherical',
reg_covar=epsilon, # 防止数值问题
random_state=42
)
gmm.fit(X)
# 比较K-Means和GMM结果
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
kmeans.fit(X)
return gmm.predict(X), kmeans.labels_5. 初始化与收敛
5.1 初始化方法
GMM对初始化非常敏感。常用初始化方法:
| 方法 | 描述 | 优点 |
|---|---|---|
| 随机初始化 | 随机选择数据点作为均值 | 简单 |
| K-Means初始化 | 先运行K-Means,用聚类中心作为均值 | 稳定 |
| 随机分块 | 随机将数据分成K组,用组统计量初始化 | 保证覆盖 |
| GMC初始化 | 基于数据密度的初始化 | 更好初始值 |
def initialize_gmm(X, n_components, method='kmeans'):
"""GMM参数初始化"""
if method == 'random':
# 随机初始化
indices = np.random.choice(len(X), n_components, replace=False)
means = X[indices]
covariances = [np.cov(X.T) for _ in range(n_components)]
weights = np.ones(n_components) / n_components
elif method == 'kmeans':
# K-Means初始化
kmeans = KMeans(n_clusters=n_components, n_init=10)
kmeans.fit(X)
means = kmeans.cluster_centers_
# 用每个簇的统计量初始化协方差和权重
labels = kmeans.labels_
covariances = []
weights = np.zeros(n_components)
for k in range(n_components):
cluster_points = X[labels == k]
weights[k] = len(cluster_points) / len(X)
if len(cluster_points) > 1:
covariances.append(np.cov(cluster_points.T) + 1e-6 * np.eye(X.shape[1]))
else:
covariances.append(np.eye(X.shape[1]))
return means, covariances, weights5.2 收敛判断
EM算法的收敛判断标准:
def fit_with_convergence_tracking(X, gmm):
"""追踪收敛过程"""
log_likelihoods = []
while True:
# E步
responsibilities = gmm._e_step(X)
# 计算当前对数似然
ll = gmm._compute_log_likelihood(X)
log_likelihoods.append(ll)
# 检查收敛
if len(log_likelihoods) > 1:
improvement = log_likelihoods[-1] - log_likelihoods[-2]
if abs(improvement) < gmm.tol:
break
# M步
gmm._m_step(X, responsibilities)
return log_likelihoods
def plot_convergence(log_likelihoods):
"""可视化收敛过程"""
plt.figure(figsize=(8, 4))
plt.plot(log_likelihoods)
plt.xlabel('Iteration')
plt.ylabel('Log Likelihood')
plt.title('EM Convergence')
plt.grid(True, alpha=0.3)
plt.show()6. 模型选择:BIC与AIC
6.1 BIC准则
贝叶斯信息准则(Bayesian Information Criterion):
其中:
- :最大对数似然
- :参数数量
- :样本数量
6.2 AIC准则
赤池信息准则(Akaike Information Criterion):
6.3 选择最优K
def select_optimal_k(X, max_k=10):
"""使用BIC/AIC选择最优聚类数"""
results = {
'k': [],
'bic': [],
'aic': [],
'log_likelihood': []
}
for k in range(1, max_k + 1):
gmm = GaussianMixture(n_components=k, covariance_type='full', n_init=5)
gmm.fit(X)
results['k'].append(k)
results['bic'].append(gmm.bic(X))
results['aic'].append(gmm.aic(X))
results['log_likelihood'].append(gmm.score(X) * len(X))
# 找到最优K
optimal_k_bic = results['k'][np.argmin(results['bic'])]
optimal_k_aic = results['k'][np.argmin(results['aic'])]
return results, optimal_k_bic, optimal_k_aic
def plot_model_selection(results):
"""可视化模型选择结果"""
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(results['k'], results['bic'], 'b-o')
axes[0].set_xlabel('Number of Components (K)')
axes[0].set_ylabel('BIC')
axes[0].set_title('BIC vs K')
axes[0].axvline(x=results['k'][np.argmin(results['bic'])], color='r', linestyle='--')
axes[1].plot(results['k'], results['aic'], 'g-o')
axes[1].set_xlabel('Number of Components (K)')
axes[1].set_ylabel('AIC')
axes[1].set_title('AIC vs K')
axes[1].axvline(x=results['k'][np.argmin(results['aic'])], color='r', linestyle='--')
plt.tight_layout()
plt.show()7. GMM的应用场景
7.1 密度估计
GMM最直接的应用是概率密度估计:
def density_estimation(X_train, X_test):
"""使用GMM进行密度估计"""
gmm = GaussianMixture(n_components=5, covariance_type='full')
gmm.fit(X_train)
# 计算测试集的密度
log_probs = gmm.score_samples(X_test)
# 识别异常点(低密度区域)
threshold = np.percentile(log_probs, 5)
anomalies = X_test[log_probs < threshold]
return log_probs, anomalies7.2 图像分割
def gmm_image_segmentation(image, n_segments=5):
"""使用GMM进行图像分割"""
# 将图像重塑为像素样本
h, w, c = image.shape
X = image.reshape(-1, c)
# 训练GMM
gmm = GaussianMixture(n_components=n_segments, covariance_type='full')
labels = gmm.fit_predict(X)
# 重塑为分割图像
segmented = labels.reshape(h, w)
return segmented7.3 声音信号建模
GMM用于说话人识别和语音合成:
def gmm_speaker_recognition(features, speaker_models, n_components=16):
"""
使用GMM进行说话人识别
features: 声学特征 (n_frames, n_features)
speaker_models: dict[str, GMM]
"""
log_likelihoods = {}
for speaker, gmm in speaker_models.items():
log_likelihoods[speaker] = gmm.score(features)
recognized_speaker = max(log_likelihoods, key=log_likelihoods.get)
return recognized_speaker, log_likelihoods8. GMM的局限性
8.1 对初始化敏感
不同初始化可能导致完全不同的结果。
解决方案:多次随机初始化,选择BIC/AIC最优的。
8.2 无法自动确定K
需要预先指定成分数量。
解决方案:使用BIC/AIC进行模型选择,或使用DPP(Dirichlet Process Mixture)。
8.3 协方差矩阵奇异性
当某个成分的协方差矩阵接近奇异时,EM算法可能失效。
解决方案:添加正则化项(如 reg_covar)。
8.4 局部最优
EM算法只能保证收敛到局部最优。
解决方案:多次初始化,或使用Split-and-Merge等变体。
9. 代码实现汇总
import numpy as np
from scipy.stats import multivariate_normal
from sklearn.cluster import KMeans
class CompleteGMM:
"""完整的高斯混合模型实现"""
def __init__(self, n_components, covariance_type='full',
max_iter=100, tol=1e-4, n_init=5, random_state=None):
self.n_components = n_components
self.covariance_type = covariance_type
self.max_iter = max_iter
self.tol = tol
self.n_init = n_init
self.random_state = random_state
self.weights_ = None
self.means_ = None
self.covariances_ = None
self.converged_ = False
self.n_iter_ = 0
self.lower_bound_ = -np.inf
def _initialize(self, X):
"""使用K-Means初始化"""
kmeans = KMeans(n_clusters=self.n_components, n_init=10, random_state=self.random_state)
kmeans.fit(X)
self.means_ = kmeans.cluster_centers_
labels = kmeans.labels_
# 初始化权重
self.weights_ = np.bincount(labels, minlength=self.n_components) / len(X)
# 初始化协方差
n_features = X.shape[1]
self.covariances_ = []
for k in range(self.n_components):
cluster_points = X[labels == k]
if len(cluster_points) > 1:
cov = np.cov(cluster_points.T)
# 确保正定
cov += 1e-6 * np.eye(n_features)
else:
cov = np.eye(n_features)
self.covariances_.append(cov)
if self.covariance_type == 'tied':
# 所有成分共享协方差
self.covariances_ = np.mean(self.covariances_, axis=0)
def _e_step(self, X):
"""E步:计算责任"""
n_samples = X.shape[0]
responsibilities = np.zeros((n_samples, self.n_components))
for k in range(self.n_components):
if self.covariance_type == 'tied':
cov = self.covariances_
else:
cov = self.covariances_[k]
responsibilities[:, k] = self.weights_[k] * multivariate_normal.pdf(
X, mean=self.means_[k], cov=cov
)
# 归一化
responsibilities_sum = responsibilities.sum(axis=1, keepdims=True)
responsibilities /= responsibilities_sum + 1e-300
return responsibilities
def _m_step(self, X, responsibilities):
"""M步:更新参数"""
n_samples, n_features = X.shape
N_k = responsibilities.sum(axis=0)
for k in range(self.n_components):
# 更新均值
self.means_[k] = (responsibilities[:, k:k+1].T @ X) / (N_k[k] + 1e-10)
# 更新协方差
diff = X - self.means_[k]
cov_k = (responsibilities[:, k:k+1].T @ (diff * diff)) / (N_k[k] + 1e-10)
# 根据类型调整
if self.covariance_type == 'spherical':
cov_k = np.mean(cov_k) # 标量
elif self.covariance_type == 'diag':
cov_k = np.diag(np.diag(cov_k)) # 对角
# else: full covariance
# 确保正定
cov_k += 1e-6 * np.eye(n_features)
if self.covariance_type == 'tied':
self.covariances_ += cov_k * N_k[k] / n_samples
else:
self.covariances_[k] = cov_k
# 更新权重
self.weights_[k] = N_k[k] / n_samples
if self.covariance_type == 'tied':
self.covariances_ += 1e-6 * np.eye(n_features)
def _compute_log_likelihood(self, X):
"""计算对数似然"""
n_samples = X.shape[0]
log_likelihood = np.zeros(n_samples)
for k in range(self.n_components):
if self.covariance_type == 'tied':
cov = self.covariances_
else:
cov = self.covariances_[k]
log_likelihood += self.weights_[k] * multivariate_normal.pdf(
X, mean=self.means_[k], cov=cov
)
return np.log(log_likelihood + 1e-300).sum()
def fit(self, X):
"""训练GMM(多次初始化)"""
best_lower_bound = -np.inf
best_params = None
for init in range(self.n_init):
self._initialize(X)
for iteration in range(self.max_iter):
responsibilities = self._e_step(X)
self._m_step(X, responsibilities)
current_bound = self._compute_log_likelihood(X)
if iteration > 0 and abs(current_bound - self.lower_bound_) < self.tol:
self.converged_ = True
break
self.lower_bound_ = current_bound
if self.lower_bound_ > best_lower_bound:
best_lower_bound = self.lower_bound_
best_params = {
'weights': self.weights_.copy(),
'means': self.means_.copy(),
'covariances': [c.copy() for c in self.covariances_] if self.covariance_type != 'tied' else self.covariances_.copy(),
'n_iter': iteration,
'converged': self.converged_
}
# 恢复最优参数
self.weights_ = best_params['weights']
self.means_ = best_params['means']
self.covariances_ = best_params['covariances']
self.n_iter_ = best_params['n_iter']
self.converged_ = best_params['converged']
return self
def predict(self, X):
"""预测类别(硬聚类)"""
responsibilities = self._e_step(X)
return np.argmax(responsibilities, axis=1)
def predict_proba(self, X):
"""预测概率(软聚类)"""
return self._e_step(X)
def score(self, X):
"""计算平均对数似然"""
return self._compute_log_likelihood(X) / len(X)
def bic(self, X):
"""计算BIC"""
n_samples, n_features = X.shape[0], X.shape[1]
# 计算参数数量
if self.covariance_type == 'full':
n_params = self.n_components * (n_features + n_features * (n_features + 1) // 2)
elif self.covariance_type == 'tied':
n_params = n_features + n_features * (n_features + 1) // 2
elif self.covariance_type == 'diag':
n_params = self.n_components * (2 * n_features)
else: # spherical
n_params = self.n_components * (n_features + 1)
n_params += self.n_components - 1 # 权重参数
return -2 * self._compute_log_likelihood(X) + n_params * np.log(n_samples)
def aic(self, X):
"""计算AIC"""
n_samples, n_features = X.shape[0], X.shape[1]
# 参数数量(同上)
if self.covariance_type == 'full':
n_params = self.n_components * (n_features + n_features * (n_features + 1) // 2)
elif self.covariance_type == 'tied':
n_params = n_features + n_features * (n_features + 1) // 2
elif self.covariance_type == 'diag':
n_params = self.n_components * (2 * n_features)
else:
n_params = self.n_components * (n_features + 1)
n_params += self.n_components - 1
return -2 * self._compute_log_likelihood(X) + 2 * n_params10. 参考文献
相关阅读
Footnotes
-
Bishop, C. M. (2006). Pattern Recognition and Machine Learning. Springer. ↩