集成学习统一理论

概述

集成学习(Ensemble Learning)通过组合多个学习器来提升预测性能,是机器学习中最重要的技术之一。1 本文档从统一视角分析Bagging、Boosting、Stacking等主流集成方法,揭示它们的共性与差异。


1. 集成学习基础

1.1 集成学习的核心思想

给定 个基学习器 ,集成学习通过某种组合策略得到最终预测:

1.2 集成方法的分类

集成学习方法
├── 同质集成(Homogeneous)
│   ├── Bagging:并行组合,独立基学习器
│   ├── Boosting:串行组合,依赖性基学习器
│   └── Random Forest:Bagging + 特征采样
│
└── 异质集成(Heterogeneous)
    └── Stacking:元学习器组合

1.3 集成学习的前提条件

条件说明重要性
准确性基学习器应比随机猜测好
多样性基学习器之间应有差异
独立性错误应尽可能不相关

2. 偏差-方差分解

2.1 泛化误差分解

对于回归问题,泛化误差可以分解为:

其中:

  • 偏差(Bias):模型预测值的期望与真实值的差异
  • 方差(Variance):模型预测值围绕其期望的波动
  • 噪声(Noise):数据本身的固有不确定性

2.2 分解公式推导

设真实函数为 ,噪声为 ,观测数据

学习器 的期望预测:

偏差:

方差:

2.3 偏差-方差权衡

          泛化误差
              │
         高   │  ╲
              │    ╲         方差主导
              │     ╲___        (复杂模型)
              │        ╲___
              │           ╲___
              │              ╲___    偏差主导
              │                 ╲___  (简单模型)
              │                    ╲___
              │                       ╲___
              │                          ╲___
         低   │                             ╲___
              └──────────────────────────────────→ 模型复杂度
                      低              高

2.4 集成学习对偏差和方差的影响

方法对Bias的影响对Variance的影响
Bagging无显著影响降低方差
Boosting降低偏差可能增加方差
Random Forest无显著影响显著降低方差

3. Bagging理论

3.1 Bagging核心思想

Bagging(Bootstrap Aggregating)通过有放回抽样创建多个训练集,然后分别训练基学习器并组合预测。

3.2 方差缩减理论

定理:对于 个独立同分布、方差为 的预测器,平均后的方差为

证明

3.3 Bagging的实际方差缩减

实际中基学习器不完全独立,但Bagging仍能有效降低方差:

import numpy as np
 
def bagging_variance_analysis():
    """
    Bagging方差缩减实验分析
    """
    # 模拟多个有相关性的学习器
    n_models = 100
    n_samples = 1000
    correlation = 0.5  # 学习器之间的相关性
    
    # 单个模型的方差
    single_var = 1.0
    
    # 理论独立情况下的方差
    independent_var = single_var / n_models
    
    # 相关情况下的方差
    # Var(avg) = (1/M)² * M * σ² + (M-1)/M * ρσ²
    #          = σ²/M * (1 + (M-1)ρ)
    correlated_var = single_var / n_models * (1 + (n_models - 1) * correlation)
    
    print(f"单个模型方差: {single_var:.4f}")
    print(f"独立情况方差: {independent_var:.4f}")
    print(f"相关情况方差(ρ={correlation}): {correlated_var:.4f}")
    print(f"方差缩减比: {single_var / correlated_var:.2f}x")
 
# 结果:
# 单个模型方差: 1.0000
# 独立情况方差: 0.0100
# 相关情况方差(ρ=0.5): 0.5050
# 方差缩减比: 1.98x

3.4 Out-of-Bag估计

Bagging的副产品:每个样本约有36.8%的数据从未被选中,可用于验证:

def oob_evaluation(X, y, model_fn, n_estimators=100):
    """
    Out-of-Bag (OOB) 评估
    
    对于每个样本,只使用未使用它的模型进行预测
    """
    n = len(y)
    oob_predictions = np.zeros(n)
    oob_counts = np.zeros(n)
    
    for m in range(n_estimators):
        # 生成bootstrap样本
        indices = np.random.choice(n, n, replace=True)
        
        # 训练模型
        model = model_fn()
        model.fit(X[indices], y[indices])
        
        # 找出OOB样本
        oob_mask = np.ones(n, dtype=bool)
        oob_mask[indices] = False
        oob_idx = np.where(oob_mask)[0]
        
        if len(oob_idx) > 0:
            oob_predictions[oob_idx] += model.predict(X[oob_idx])
            oob_counts[oob_idx] += 1
    
    # 平均OOB预测
    valid_mask = oob_counts > 0
    oob_predictions[valid_mask] /= oob_counts[valid_mask]
    
    # 计算OOB误差
    mse = np.mean((oob_predictions[valid_mask] - y[valid_mask])**2)
    
    return oob_predictions, np.sqrt(mse)

4. Boosting理论

4.1 Boosting的核心思想

Boosting通过序列化训练逐步提升模型性能,每轮关注前一轮的”困难样本”。

4.2 加法模型视角

Boosting可以形式化为加法模型优化:

目标是最小化经验风险:

4.3 前向分步算法

def forward_stagewise_algorithm(X, y, loss_fn, M):
    """
    前向分步算法
    
    每一步只优化一个基学习器和它的权重
    """
    F = np.zeros(len(X))  # 当前集成预测
    
    for m in range(M):
        # 计算负梯度(伪残差)
        negative_gradient = -gradient(loss_fn, y, F)
        
        # 训练基学习器拟合负梯度
        h_m = train_base_learner(X, negative_gradient)
        
        # 求解最优步长
        alpha_m = find_optimal_step(F, h_m, X, y, loss_fn)
        
        # 更新集成
        F = F + alpha_m * h_m(X)
    
    return F

4.4 AdaBoost详解

AdaBoost是Boosting的经典实现,使用指数损失函数:

权重更新

样本权重更新公式:

其中 是第 轮的加权错误率。

4.5 AdaBoost的边界理论

定理(AdaBoost边界):设训练样本可被 个弱分类器分开,则AdaBoost的泛化误差满足:

其中 是指数损失在训练集上的上界。

4.6 Boosting与正则化

正则化技术作用
收缩(Shrinkage)降低每棵树的影响
早停(Early Stopping)防止过拟合
树复杂度限制控制单棵树深度
样本权重裁剪减少异常值影响

5. Random Forest理论

5.1 Random Forest = Bagging + 特征采样

Random Forest在Bagging基础上增加了特征子采样

class RandomForest:
    def __init__(self, n_estimators=100, max_features='sqrt'):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.trees = []
    
    def fit(self, X, y):
        for _ in range(self.n_estimators):
            # Bootstrap采样
            indices = np.random.choice(len(X), len(X), replace=True)
            X_boot = X[indices]
            y_boot = y[indices]
            
            # 特征子采样
            n_features = X.shape[1]
            if self.max_features == 'sqrt':
                n_select = int(np.sqrt(n_features))
            else:
                n_select = self.max_features
            
            feature_indices = np.random.choice(n_features, n_select, replace=False)
            
            # 训练决策树
            tree = DecisionTree(max_features=feature_indices)
            tree.fit(X_boot[:, feature_indices], y_boot)
            self.trees.append(tree)
        
        return self

5.2 双重随机化的效果

随机化来源作用
Bootstrap采样增加基学习器之间的差异
特征子采样进一步解耦基学习器
组合效果显著降低方差

5.3 Random Forest的收敛性

定理:随着树的数量增加,Random Forest的泛化误差几乎必然收敛到:

其中:

  • 是树之间的平均相关性
  • 是单棵树的预测强度

5.4 变量重要性

def permutation_importance(model, X, y, n_repeats=30):
    """
    置换重要性计算
    
    通过随机置换特征值来衡量特征重要性
    """
    n_features = X.shape[1]
    baseline_score = model.score(X, y)
    importance = np.zeros(n_features)
    
    for f in range(n_features):
        scores = []
        for _ in range(n_repeats):
            # 置换特征f
            X_permuted = X.copy()
            X_permuted[:, f] = np.random.permutation(X_permuted[:, f])
            
            # 计算置换后的分数
            permuted_score = model.score(X_permuted, y)
            scores.append(baseline_score - permuted_score)
        
        importance[f] = np.mean(scores)
    
    return importance

6. Stacking统一框架

6.1 Stacking核心思想

Stacking通过元学习器组合多个基学习器的预测:

Level 0 (基学习器):
  模型1 → 预测₁
  模型2 → 预测₂
  模型3 → 预测₃
       ↓
  [预测₁, 预测₂, 预测₃] 堆叠为新特征
       ↓
Level 1 (元学习器):
  元模型 → 最终预测

6.2 Stacking算法

def stacking_cv(X, y, base_models, meta_model, cv=5):
    """
    交叉验证Stacking
    
    使用交叉验证避免元学习器过拟合
    """
    n_samples = len(X)
    n_models = len(base_models)
    
    # 存储OOF预测(Out-of-Fold)
    oof_predictions = np.zeros((n_samples, n_models))
    
    # 对每个基模型进行交叉验证
    for m, model in enumerate(base_models):
        kfold = KFold(n_splits=cv, shuffle=True, random_state=42)
        
        for train_idx, val_idx in kfold.split(X):
            # 训练模型
            model.fit(X[train_idx], y[train_idx])
            
            # 预测验证集
            oof_predictions[val_idx, m] = model.predict(X[val_idx])
    
    # 使用全部OOF预测训练元学习器
    meta_X = oof_predictions
    meta_model.fit(meta_X, y)
    
    return base_models, meta_model
 
 
def full_stacking_pipeline(X_train, y_train, X_test, base_models, meta_model):
    """
    完整的Stacking流程
    """
    n_samples_train = len(X_train)
    n_samples_test = len(X_test)
    n_models = len(base_models)
    
    # 第一层:基学习器OOF预测
    oof_predictions = np.zeros((n_samples_train, n_models))
    test_predictions = np.zeros((n_samples_test, n_models))
    
    for m, model in enumerate(base_models):
        # 交叉验证OOF
        kfold = KFold(n_splits=5, shuffle=True, random_state=42)
        
        for train_idx, val_idx in kfold.split(X_train):
            model.fit(X_train[train_idx], y_train[train_idx])
            oof_predictions[val_idx, m] = model.predict(X_train[val_idx])
        
        # 全量训练后在测试集上预测
        model.fit(X_train, y_train)
        test_predictions[:, m] = model.predict(X_test)
    
    # 第二层:元学习器
    meta_X_train = oof_predictions
    meta_X_test = test_predictions
    
    meta_model.fit(meta_X_train, y_train)
    final_predictions = meta_model.predict(meta_X_test)
    
    return final_predictions

6.3 Stacking的理论分析

定理:Stacking的泛化误差满足:

其中余项与基学习器的多样性和元学习器的复杂性相关。

6.4 基学习器选择

基学习器类型推荐场景
同质弱学习器简单稳定,如决策树桩
异质强学习器不同类型模型,增加多样性
树模型 + 线性模型捕捉非线性 + 线性模式

7. 集成学习的统一视角

7.1 偏差-方差分解的统一框架

def ensemble_bias_variance(F_models, F_true, X, y):
    """
    集成学习的偏差-方差分解分析
    """
    n_samples = len(y)
    n_trials = 100
    
    # 多次采样训练
    all_predictions = []
    
    for _ in range(n_trials):
        # 采样训练数据
        indices = np.random.choice(n_samples, n_samples * 0.8, replace=False)
        
        # 训练多个模型
        predictions = []
        for F in F_models:
            model = clone(F)
            model.fit(X[indices], y[indices])
            predictions.append(model.predict(X))
        
        # 集成预测(平均)
        ensemble_pred = np.mean(predictions, axis=0)
        all_predictions.append(ensemble_pred)
    
    all_predictions = np.array(all_predictions)
    
    # 计算统计量
    mean_prediction = all_predictions.mean(axis=0)
    bias = np.mean((mean_prediction - y) ** 2)
    variance = all_predictions.var(axis=0).mean()
    
    return bias, variance

7.2 三种集成方法的对比

方面BaggingBoostingStacking
基学习器训练并行串行并行
依赖性独立依赖前一轮独立
关注点降低方差降低偏差组合最优
过拟合风险中-高
计算成本
典型算法RF, ExtraTreesAdaBoost, GBDT各种变体

7.3 何时使用哪种集成方法

                    数据特点
                        │
          ┌─────────────┴─────────────┐
          │                           │
    高方差(过拟合)            高偏差(欠拟合)
          │                           │
          ├─────────────┬─────────────┤
          ↓             ↓             ↓
      Bagging       Random       Boosting
                    Forest
          │             │             │
    决策树            多样性      弱学习器
    不稳定            需要        可串行优化

8. 集成学习的正则化

8.1 集成大小选择

def select_ensemble_size(X, y, base_learner_fn, max_n=500):
    """
    选择最优集成大小
    
    使用验证集或OOB误差选择
    """
    from sklearn.model_selection import cross_val_score
    
    n_estimators_range = [10, 50, 100, 200, 500]
    scores = []
    
    for n in n_estimators_range:
        model = BaggingClassifier(
            estimator=base_learner_fn(),
            n_estimators=n,
            oob_score=True  # 使用OOB评估
        )
        model.fit(X, y)
        scores.append(model.oob_score_)
        print(f"n_estimators={n}: OOB Score={scores[-1]:.4f}")
    
    best_n = n_estimators_range[np.argmax(scores)]
    return best_n, scores

8.2 集成预测的加权策略

class WeightedEnsemble:
    """加权集成"""
    
    def __init__(self, models, weights=None):
        self.models = models
        self.weights = weights if weights else [1/len(models)] * len(models)
    
    def fit(self, X, y):
        for model in self.models:
            model.fit(X, y)
        return self
    
    def predict(self, X):
        predictions = np.array([model.predict(X) for model in self.models])
        return np.average(predictions, axis=0, weights=self.weights)
    
    def predict_proba(self, X):
        probas = np.array([model.predict_proba(X) for model in self.models])
        weighted_proba = np.average(probas, axis=0, weights=self.weights)
        return weighted_proba

8.3 剪枝集成

def prune_ensemble(models, X_val, y_val, metric='accuracy'):
    """
    剪枝集成:移除表现差的模型
    """
    n_models = len(models)
    model_scores = []
    
    # 评估每个模型
    for model in models:
        y_pred = model.predict(X_val)
        if metric == 'accuracy':
            score = accuracy_score(y_val, y_pred)
        model_scores.append(score)
    
    # 从最差的开始剪枝
    sorted_indices = np.argsort(model_scores)[::-1]  # 从好到差
    
    best_score = -np.inf
    best_subset = []
    
    for k in range(1, n_models + 1):
        subset = sorted_indices[:k]
        ensemble_pred = np.mean([models[i].predict(X_val) for i in subset], axis=0)
        ensemble_pred = (ensemble_pred > 0.5).astype(int)
        
        score = accuracy_score(y_val, ensemble_pred)
        
        if score > best_score:
            best_score = score
            best_subset = subset
    
    return [models[i] for i in best_subset], best_score

9. 集成学习的最新进展

9.1 深度集成

class DeepEnsemble:
    """深度集成:结合深度学习与集成"""
    
    def __init__(self, n_members=5, hidden_dim=256):
        self.n_members = n_members
        self.models = []
        
        for _ in range(n_members):
            model = nn.Sequential(
                nn.Linear(784, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.5),
                nn.Linear(hidden_dim, 10)
            )
            self.models.append(model)
    
    def fit(self, X, y, epochs=10):
        for i, model in enumerate(self.models):
            # 不同随机种子训练
            torch.manual_seed(42 + i)
            set_seed(42 + i)
            
            optimizer = torch.optim.Adam(model.parameters())
            criterion = nn.CrossEntropyLoss()
            
            for epoch in range(epochs):
                model.train()
                for batch_x, batch_y in DataLoader(X, y):
                    optimizer.zero_grad()
                    loss = criterion(model(batch_x), batch_y)
                    loss.backward()
                    optimizer.step()
    
    def predict(self, X):
        # 集成预测
        predictions = []
        for model in self.models:
            model.eval()
            with torch.no_grad():
                pred = torch.softmax(model(X), dim=1)
                predictions.append(pred)
        
        return torch.mean(torch.stack(predictions), dim=0)

9.2 Snapshot集成

class SnapshotEnsemble:
    """Snapshot集成:利用学习率循环保存模型"""
    
    def __init__(self, model_fn, n_snapshots=5, lr_max=0.1, lr_min=1e-4):
        self.n_snapshots = n_snapshots
        self.lr_max = lr_max
        self.lr_min = lr_min
        self.models = []
        self.model_fn = model_fn
    
    def train(self, X, y, epochs_per_cycle=50, n_cycles=5):
        total_epochs = epochs_per_cycle * n_cycles
        
        for cycle in range(n_cycles):
            model = self.model_fn()
            optimizer = torch.optim.SGD(model.parameters(), lr=self.lr_max)
            
            # 余弦退火学习率
            scheduler = CosineAnnealingWarmRestarts(
                optimizer, T_0=epochs_per_cycle, T_mult=1
            )
            
            for epoch in range(epochs_per_cycle):
                scheduler.step()
                
                # 训练
                model.train()
                loss = self.compute_loss(model, X, y)
                loss.backward()
                optimizer.step()
            
            # 保存snapshot
            self.models.append(copy.deepcopy(model))
    
    def predict(self, X):
        predictions = [model(X) for model in self.models]
        return torch.mean(torch.stack(predictions), dim=0)

参考


相关主题

Footnotes

  1. Zhou, Z. H. (2021). “Ensemble Learning.” Machine Learning. Springer.