高斯混合模型(GMM)

高斯混合模型(Gaussian Mixture Model, GMM)是一种经典的生成式聚类算法,也是概率密度估计的重要方法。GMM假设数据由多个高斯分布混合生成,通过EM算法估计模型参数。1

1. 模型定义

1.1 直观理解

想象一个数据集包含两类人群:一类是大学生(身高分布均值165cm,方差较小),另一类是篮球运动员(身高分布均值190cm,方差较大)。

GMM将整体数据建模为多个高斯分布的加权混合:

其中:

  • :混合成分数量
  • :第 个成分的混合系数,满足
  • :第 个高斯分布的概率密度
  • :均值向量
  • :协方差矩阵

1.2 生成过程

GMM的隐变量视角揭示了数据生成的真实过程:

  1. 首先,从多项分布 中采样一个隐变量
  2. 然后,从对应的高斯分布 中采样观测数据

这个隐变量 通常称为潜在类别聚类标签

1.3 参数

对于 维数据,GMM的参数包括:

参数数量约束
正定对称

总参数数量:


2. 参数估计:EM算法

GMM的参数估计是EM算法的经典应用。详见 EM算法

2.1 E步:计算责任

责任(Responsibility):第 个成分对第 个数据点负责的程度。

含义:给定当前参数, 表示数据点 属于第 个成分的后验概率。

2.2 M步:更新参数

更新混合系数

其中 是第 个成分的”有效样本数”。

更新均值

更新协方差

2.3 协方差类型

为了控制模型复杂度和防止过拟合,可以限制协方差矩阵的形式:

类型协方差形式参数数量/成分特点
Full任意正定矩阵最灵活,可能过拟合
Diagonal对角矩阵轴对齐分布
Tied所有成分共享共享形状,仅位置不同
Spherical各向同性球形
class GMMVariants:
    """GMM的不同协方差类型"""
    
    def __init__(self, covariance_type='full'):
        self.covariance_type = covariance_type  # 'full', 'tied', 'diag', 'spherical'
    
    def _init_covariances(self, X, n_components):
        """根据类型初始化协方差"""
        n_features = X.shape[1]
        var = np.var(X, axis=0).mean()  # 数据方差
        
        if self.covariance_type == 'full':
            # 任意正定矩阵
            return [np.eye(n_features) * var for _ in range(n_components)]
        
        elif self.covariance_type == 'tied':
            # 所有成分共享
            return np.eye(n_features) * var
        
        elif self.covariance_type == 'diag':
            # 对角矩阵
            return [np.eye(n_features) * var for _ in range(n_components)]
        
        elif self.covariance_type == 'spherical':
            # 标量 × 单位矩阵
            return [var for _ in range(n_components)]

3. GMM用于聚类

3.1 软聚类 vs 硬聚类

GMM提供的是软聚类(Soft Clustering),每个数据点属于每个聚类的程度由责任 表示。

硬聚类(Hard Clustering):将每个点分配给责任最大的类别:

3.2 聚类边界

当协方差矩阵相等时,GMM退化为K-Means。一般来说,GMM的聚类边界是二次曲面(quadratic surfaces),比K-Means的线性边界更灵活。

import numpy as np
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
 
def plot_gmm_clustering(X, true_labels):
    """可视化GMM聚类结果"""
    
    # 训练GMM
    gmm = GaussianMixture(n_components=3, covariance_type='full', random_state=42)
    gmm.fit(X)
    
    # 预测类别(硬聚类)
    predicted_labels = gmm.predict(X)
    
    # 预测概率(软聚类)
    probs = gmm.predict_proba(X)
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    
    # 真实标签
    axes[0].scatter(X[:, 0], X[:, 1], c=true_labels, cmap='viridis', alpha=0.6)
    axes[0].set_title('True Labels')
    
    # GMM预测
    axes[1].scatter(X[:, 0], X[:, 1], c=predicted_labels, cmap='viridis', alpha=0.6)
    axes[1].set_title('GMM Hard Clustering')
    
    # GMM概率热力图
    axes[2].scatter(X[:, 0], X[:, 1], c=probs[:, 0], cmap='coolwarm', alpha=0.6)
    axes[2].set_title('GMM Soft Clustering (Class 1 Probability)')
    
    plt.tight_layout()
    plt.show()
    
    return predicted_labels, probs

4. GMM与K-Means的关系

4.1 K-Means的GMM视角

K-Means可以看作是GMM的极限情况

当 GMM 的协方差矩阵满足 )时,EM算法的E步会趋于硬聚类:

4.2 对比

方面K-MeansGMM
聚类形状球形椭圆形(任意方向)
边界类型Voronoi图(线性)二次曲面
输出硬标签软概率
参数仅均值均值+协方差+权重
初始化敏感度
对协方差约束无(隐式为单位阵)可选
# K-Means 可以视为 GMM 的特例
from sklearn.cluster import KMeans
 
def gmm_as_kmeans_limit(X, n_clusters, epsilon=1e-6):
    """
    当协方差趋近于0时,GMM的行为趋近于K-Means
    """
    # 使用固定的极小协方差
    gmm = GaussianMixture(
        n_components=n_clusters,
        covariance_type='spherical',
        reg_covar=epsilon,  # 防止数值问题
        random_state=42
    )
    gmm.fit(X)
    
    # 比较K-Means和GMM结果
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    kmeans.fit(X)
    
    return gmm.predict(X), kmeans.labels_

5. 初始化与收敛

5.1 初始化方法

GMM对初始化非常敏感。常用初始化方法:

方法描述优点
随机初始化随机选择数据点作为均值简单
K-Means初始化先运行K-Means,用聚类中心作为均值稳定
随机分块随机将数据分成K组,用组统计量初始化保证覆盖
GMC初始化基于数据密度的初始化更好初始值
def initialize_gmm(X, n_components, method='kmeans'):
    """GMM参数初始化"""
    
    if method == 'random':
        # 随机初始化
        indices = np.random.choice(len(X), n_components, replace=False)
        means = X[indices]
        covariances = [np.cov(X.T) for _ in range(n_components)]
        weights = np.ones(n_components) / n_components
    
    elif method == 'kmeans':
        # K-Means初始化
        kmeans = KMeans(n_clusters=n_components, n_init=10)
        kmeans.fit(X)
        means = kmeans.cluster_centers_
        
        # 用每个簇的统计量初始化协方差和权重
        labels = kmeans.labels_
        covariances = []
        weights = np.zeros(n_components)
        
        for k in range(n_components):
            cluster_points = X[labels == k]
            weights[k] = len(cluster_points) / len(X)
            if len(cluster_points) > 1:
                covariances.append(np.cov(cluster_points.T) + 1e-6 * np.eye(X.shape[1]))
            else:
                covariances.append(np.eye(X.shape[1]))
    
    return means, covariances, weights

5.2 收敛判断

EM算法的收敛判断标准:

def fit_with_convergence_tracking(X, gmm):
    """追踪收敛过程"""
    log_likelihoods = []
    
    while True:
        # E步
        responsibilities = gmm._e_step(X)
        
        # 计算当前对数似然
        ll = gmm._compute_log_likelihood(X)
        log_likelihoods.append(ll)
        
        # 检查收敛
        if len(log_likelihoods) > 1:
            improvement = log_likelihoods[-1] - log_likelihoods[-2]
            if abs(improvement) < gmm.tol:
                break
        
        # M步
        gmm._m_step(X, responsibilities)
    
    return log_likelihoods
 
def plot_convergence(log_likelihoods):
    """可视化收敛过程"""
    plt.figure(figsize=(8, 4))
    plt.plot(log_likelihoods)
    plt.xlabel('Iteration')
    plt.ylabel('Log Likelihood')
    plt.title('EM Convergence')
    plt.grid(True, alpha=0.3)
    plt.show()

6. 模型选择:BIC与AIC

6.1 BIC准则

贝叶斯信息准则(Bayesian Information Criterion):

其中:

  • :最大对数似然
  • :参数数量
  • :样本数量

6.2 AIC准则

赤池信息准则(Akaike Information Criterion):

6.3 选择最优K

def select_optimal_k(X, max_k=10):
    """使用BIC/AIC选择最优聚类数"""
    results = {
        'k': [],
        'bic': [],
        'aic': [],
        'log_likelihood': []
    }
    
    for k in range(1, max_k + 1):
        gmm = GaussianMixture(n_components=k, covariance_type='full', n_init=5)
        gmm.fit(X)
        
        results['k'].append(k)
        results['bic'].append(gmm.bic(X))
        results['aic'].append(gmm.aic(X))
        results['log_likelihood'].append(gmm.score(X) * len(X))
    
    # 找到最优K
    optimal_k_bic = results['k'][np.argmin(results['bic'])]
    optimal_k_aic = results['k'][np.argmin(results['aic'])]
    
    return results, optimal_k_bic, optimal_k_aic
 
def plot_model_selection(results):
    """可视化模型选择结果"""
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    axes[0].plot(results['k'], results['bic'], 'b-o')
    axes[0].set_xlabel('Number of Components (K)')
    axes[0].set_ylabel('BIC')
    axes[0].set_title('BIC vs K')
    axes[0].axvline(x=results['k'][np.argmin(results['bic'])], color='r', linestyle='--')
    
    axes[1].plot(results['k'], results['aic'], 'g-o')
    axes[1].set_xlabel('Number of Components (K)')
    axes[1].set_ylabel('AIC')
    axes[1].set_title('AIC vs K')
    axes[1].axvline(x=results['k'][np.argmin(results['aic'])], color='r', linestyle='--')
    
    plt.tight_layout()
    plt.show()

7. GMM的应用场景

7.1 密度估计

GMM最直接的应用是概率密度估计

def density_estimation(X_train, X_test):
    """使用GMM进行密度估计"""
    gmm = GaussianMixture(n_components=5, covariance_type='full')
    gmm.fit(X_train)
    
    # 计算测试集的密度
    log_probs = gmm.score_samples(X_test)
    
    # 识别异常点(低密度区域)
    threshold = np.percentile(log_probs, 5)
    anomalies = X_test[log_probs < threshold]
    
    return log_probs, anomalies

7.2 图像分割

def gmm_image_segmentation(image, n_segments=5):
    """使用GMM进行图像分割"""
    # 将图像重塑为像素样本
    h, w, c = image.shape
    X = image.reshape(-1, c)
    
    # 训练GMM
    gmm = GaussianMixture(n_components=n_segments, covariance_type='full')
    labels = gmm.fit_predict(X)
    
    # 重塑为分割图像
    segmented = labels.reshape(h, w)
    
    return segmented

7.3 声音信号建模

GMM用于说话人识别语音合成

def gmm_speaker_recognition(features, speaker_models, n_components=16):
    """
    使用GMM进行说话人识别
    features: 声学特征 (n_frames, n_features)
    speaker_models: dict[str, GMM]
    """
    log_likelihoods = {}
    
    for speaker, gmm in speaker_models.items():
        log_likelihoods[speaker] = gmm.score(features)
    
    recognized_speaker = max(log_likelihoods, key=log_likelihoods.get)
    
    return recognized_speaker, log_likelihoods

8. GMM的局限性

8.1 对初始化敏感

不同初始化可能导致完全不同的结果。

解决方案:多次随机初始化,选择BIC/AIC最优的。

8.2 无法自动确定K

需要预先指定成分数量。

解决方案:使用BIC/AIC进行模型选择,或使用DPP(Dirichlet Process Mixture)。

8.3 协方差矩阵奇异性

当某个成分的协方差矩阵接近奇异时,EM算法可能失效。

解决方案:添加正则化项(如 reg_covar)。

8.4 局部最优

EM算法只能保证收敛到局部最优。

解决方案:多次初始化,或使用Split-and-Merge等变体。


9. 代码实现汇总

import numpy as np
from scipy.stats import multivariate_normal
from sklearn.cluster import KMeans
 
class CompleteGMM:
    """完整的高斯混合模型实现"""
    
    def __init__(self, n_components, covariance_type='full', 
                 max_iter=100, tol=1e-4, n_init=5, random_state=None):
        self.n_components = n_components
        self.covariance_type = covariance_type
        self.max_iter = max_iter
        self.tol = tol
        self.n_init = n_init
        self.random_state = random_state
        
        self.weights_ = None
        self.means_ = None
        self.covariances_ = None
        self.converged_ = False
        self.n_iter_ = 0
        self.lower_bound_ = -np.inf
    
    def _initialize(self, X):
        """使用K-Means初始化"""
        kmeans = KMeans(n_clusters=self.n_components, n_init=10, random_state=self.random_state)
        kmeans.fit(X)
        
        self.means_ = kmeans.cluster_centers_
        labels = kmeans.labels_
        
        # 初始化权重
        self.weights_ = np.bincount(labels, minlength=self.n_components) / len(X)
        
        # 初始化协方差
        n_features = X.shape[1]
        self.covariances_ = []
        
        for k in range(self.n_components):
            cluster_points = X[labels == k]
            if len(cluster_points) > 1:
                cov = np.cov(cluster_points.T)
                # 确保正定
                cov += 1e-6 * np.eye(n_features)
            else:
                cov = np.eye(n_features)
            self.covariances_.append(cov)
        
        if self.covariance_type == 'tied':
            # 所有成分共享协方差
            self.covariances_ = np.mean(self.covariances_, axis=0)
    
    def _e_step(self, X):
        """E步:计算责任"""
        n_samples = X.shape[0]
        responsibilities = np.zeros((n_samples, self.n_components))
        
        for k in range(self.n_components):
            if self.covariance_type == 'tied':
                cov = self.covariances_
            else:
                cov = self.covariances_[k]
            
            responsibilities[:, k] = self.weights_[k] * multivariate_normal.pdf(
                X, mean=self.means_[k], cov=cov
            )
        
        # 归一化
        responsibilities_sum = responsibilities.sum(axis=1, keepdims=True)
        responsibilities /= responsibilities_sum + 1e-300
        
        return responsibilities
    
    def _m_step(self, X, responsibilities):
        """M步:更新参数"""
        n_samples, n_features = X.shape
        N_k = responsibilities.sum(axis=0)
        
        for k in range(self.n_components):
            # 更新均值
            self.means_[k] = (responsibilities[:, k:k+1].T @ X) / (N_k[k] + 1e-10)
            
            # 更新协方差
            diff = X - self.means_[k]
            cov_k = (responsibilities[:, k:k+1].T @ (diff * diff)) / (N_k[k] + 1e-10)
            
            # 根据类型调整
            if self.covariance_type == 'spherical':
                cov_k = np.mean(cov_k)  # 标量
            elif self.covariance_type == 'diag':
                cov_k = np.diag(np.diag(cov_k))  # 对角
            # else: full covariance
            
            # 确保正定
            cov_k += 1e-6 * np.eye(n_features)
            
            if self.covariance_type == 'tied':
                self.covariances_ += cov_k * N_k[k] / n_samples
            else:
                self.covariances_[k] = cov_k
            
            # 更新权重
            self.weights_[k] = N_k[k] / n_samples
        
        if self.covariance_type == 'tied':
            self.covariances_ += 1e-6 * np.eye(n_features)
    
    def _compute_log_likelihood(self, X):
        """计算对数似然"""
        n_samples = X.shape[0]
        log_likelihood = np.zeros(n_samples)
        
        for k in range(self.n_components):
            if self.covariance_type == 'tied':
                cov = self.covariances_
            else:
                cov = self.covariances_[k]
            
            log_likelihood += self.weights_[k] * multivariate_normal.pdf(
                X, mean=self.means_[k], cov=cov
            )
        
        return np.log(log_likelihood + 1e-300).sum()
    
    def fit(self, X):
        """训练GMM(多次初始化)"""
        best_lower_bound = -np.inf
        best_params = None
        
        for init in range(self.n_init):
            self._initialize(X)
            
            for iteration in range(self.max_iter):
                responsibilities = self._e_step(X)
                self._m_step(X, responsibilities)
                
                current_bound = self._compute_log_likelihood(X)
                
                if iteration > 0 and abs(current_bound - self.lower_bound_) < self.tol:
                    self.converged_ = True
                    break
                
                self.lower_bound_ = current_bound
            
            if self.lower_bound_ > best_lower_bound:
                best_lower_bound = self.lower_bound_
                best_params = {
                    'weights': self.weights_.copy(),
                    'means': self.means_.copy(),
                    'covariances': [c.copy() for c in self.covariances_] if self.covariance_type != 'tied' else self.covariances_.copy(),
                    'n_iter': iteration,
                    'converged': self.converged_
                }
        
        # 恢复最优参数
        self.weights_ = best_params['weights']
        self.means_ = best_params['means']
        self.covariances_ = best_params['covariances']
        self.n_iter_ = best_params['n_iter']
        self.converged_ = best_params['converged']
        
        return self
    
    def predict(self, X):
        """预测类别(硬聚类)"""
        responsibilities = self._e_step(X)
        return np.argmax(responsibilities, axis=1)
    
    def predict_proba(self, X):
        """预测概率(软聚类)"""
        return self._e_step(X)
    
    def score(self, X):
        """计算平均对数似然"""
        return self._compute_log_likelihood(X) / len(X)
    
    def bic(self, X):
        """计算BIC"""
        n_samples, n_features = X.shape[0], X.shape[1]
        
        # 计算参数数量
        if self.covariance_type == 'full':
            n_params = self.n_components * (n_features + n_features * (n_features + 1) // 2)
        elif self.covariance_type == 'tied':
            n_params = n_features + n_features * (n_features + 1) // 2
        elif self.covariance_type == 'diag':
            n_params = self.n_components * (2 * n_features)
        else:  # spherical
            n_params = self.n_components * (n_features + 1)
        
        n_params += self.n_components - 1  # 权重参数
        
        return -2 * self._compute_log_likelihood(X) + n_params * np.log(n_samples)
    
    def aic(self, X):
        """计算AIC"""
        n_samples, n_features = X.shape[0], X.shape[1]
        
        # 参数数量(同上)
        if self.covariance_type == 'full':
            n_params = self.n_components * (n_features + n_features * (n_features + 1) // 2)
        elif self.covariance_type == 'tied':
            n_params = n_features + n_features * (n_features + 1) // 2
        elif self.covariance_type == 'diag':
            n_params = self.n_components * (2 * n_features)
        else:
            n_params = self.n_components * (n_features + 1)
        
        n_params += self.n_components - 1
        
        return -2 * self._compute_log_likelihood(X) + 2 * n_params

10. 参考文献


相关阅读

Footnotes

  1. Bishop, C. M. (2006). Pattern Recognition and Machine Learning. Springer.