集成学习统一理论
概述
集成学习(Ensemble Learning)通过组合多个学习器来提升预测性能,是机器学习中最重要的技术之一。1 本文档从统一视角分析Bagging、Boosting、Stacking等主流集成方法,揭示它们的共性与差异。
1. 集成学习基础
1.1 集成学习的核心思想
给定 个基学习器 ,集成学习通过某种组合策略得到最终预测:
1.2 集成方法的分类
集成学习方法
├── 同质集成(Homogeneous)
│ ├── Bagging:并行组合,独立基学习器
│ ├── Boosting:串行组合,依赖性基学习器
│ └── Random Forest:Bagging + 特征采样
│
└── 异质集成(Heterogeneous)
└── Stacking:元学习器组合
1.3 集成学习的前提条件
| 条件 | 说明 | 重要性 |
|---|---|---|
| 准确性 | 基学习器应比随机猜测好 | 高 |
| 多样性 | 基学习器之间应有差异 | 高 |
| 独立性 | 错误应尽可能不相关 | 中 |
2. 偏差-方差分解
2.1 泛化误差分解
对于回归问题,泛化误差可以分解为:
其中:
- 偏差(Bias):模型预测值的期望与真实值的差异
- 方差(Variance):模型预测值围绕其期望的波动
- 噪声(Noise):数据本身的固有不确定性
2.2 分解公式推导
设真实函数为 ,噪声为 ,观测数据 。
学习器 的期望预测:
偏差:
方差:
2.3 偏差-方差权衡
泛化误差
│
高 │ ╲
│ ╲ 方差主导
│ ╲___ (复杂模型)
│ ╲___
│ ╲___
│ ╲___ 偏差主导
│ ╲___ (简单模型)
│ ╲___
│ ╲___
│ ╲___
低 │ ╲___
└──────────────────────────────────→ 模型复杂度
低 高
2.4 集成学习对偏差和方差的影响
| 方法 | 对Bias的影响 | 对Variance的影响 |
|---|---|---|
| Bagging | 无显著影响 | 降低方差 |
| Boosting | 降低偏差 | 可能增加方差 |
| Random Forest | 无显著影响 | 显著降低方差 |
3. Bagging理论
3.1 Bagging核心思想
Bagging(Bootstrap Aggregating)通过有放回抽样创建多个训练集,然后分别训练基学习器并组合预测。
3.2 方差缩减理论
定理:对于 个独立同分布、方差为 的预测器,平均后的方差为 。
证明:
3.3 Bagging的实际方差缩减
实际中基学习器不完全独立,但Bagging仍能有效降低方差:
import numpy as np
def bagging_variance_analysis():
"""
Bagging方差缩减实验分析
"""
# 模拟多个有相关性的学习器
n_models = 100
n_samples = 1000
correlation = 0.5 # 学习器之间的相关性
# 单个模型的方差
single_var = 1.0
# 理论独立情况下的方差
independent_var = single_var / n_models
# 相关情况下的方差
# Var(avg) = (1/M)² * M * σ² + (M-1)/M * ρσ²
# = σ²/M * (1 + (M-1)ρ)
correlated_var = single_var / n_models * (1 + (n_models - 1) * correlation)
print(f"单个模型方差: {single_var:.4f}")
print(f"独立情况方差: {independent_var:.4f}")
print(f"相关情况方差(ρ={correlation}): {correlated_var:.4f}")
print(f"方差缩减比: {single_var / correlated_var:.2f}x")
# 结果:
# 单个模型方差: 1.0000
# 独立情况方差: 0.0100
# 相关情况方差(ρ=0.5): 0.5050
# 方差缩减比: 1.98x3.4 Out-of-Bag估计
Bagging的副产品:每个样本约有36.8%的数据从未被选中,可用于验证:
def oob_evaluation(X, y, model_fn, n_estimators=100):
"""
Out-of-Bag (OOB) 评估
对于每个样本,只使用未使用它的模型进行预测
"""
n = len(y)
oob_predictions = np.zeros(n)
oob_counts = np.zeros(n)
for m in range(n_estimators):
# 生成bootstrap样本
indices = np.random.choice(n, n, replace=True)
# 训练模型
model = model_fn()
model.fit(X[indices], y[indices])
# 找出OOB样本
oob_mask = np.ones(n, dtype=bool)
oob_mask[indices] = False
oob_idx = np.where(oob_mask)[0]
if len(oob_idx) > 0:
oob_predictions[oob_idx] += model.predict(X[oob_idx])
oob_counts[oob_idx] += 1
# 平均OOB预测
valid_mask = oob_counts > 0
oob_predictions[valid_mask] /= oob_counts[valid_mask]
# 计算OOB误差
mse = np.mean((oob_predictions[valid_mask] - y[valid_mask])**2)
return oob_predictions, np.sqrt(mse)4. Boosting理论
4.1 Boosting的核心思想
Boosting通过序列化训练逐步提升模型性能,每轮关注前一轮的”困难样本”。
4.2 加法模型视角
Boosting可以形式化为加法模型优化:
目标是最小化经验风险:
4.3 前向分步算法
def forward_stagewise_algorithm(X, y, loss_fn, M):
"""
前向分步算法
每一步只优化一个基学习器和它的权重
"""
F = np.zeros(len(X)) # 当前集成预测
for m in range(M):
# 计算负梯度(伪残差)
negative_gradient = -gradient(loss_fn, y, F)
# 训练基学习器拟合负梯度
h_m = train_base_learner(X, negative_gradient)
# 求解最优步长
alpha_m = find_optimal_step(F, h_m, X, y, loss_fn)
# 更新集成
F = F + alpha_m * h_m(X)
return F4.4 AdaBoost详解
AdaBoost是Boosting的经典实现,使用指数损失函数:
权重更新:
样本权重更新公式:
其中 , 是第 轮的加权错误率。
4.5 AdaBoost的边界理论
定理(AdaBoost边界):设训练样本可被 个弱分类器分开,则AdaBoost的泛化误差满足:
其中 是指数损失在训练集上的上界。
4.6 Boosting与正则化
| 正则化技术 | 作用 |
|---|---|
| 收缩(Shrinkage) | 降低每棵树的影响 |
| 早停(Early Stopping) | 防止过拟合 |
| 树复杂度限制 | 控制单棵树深度 |
| 样本权重裁剪 | 减少异常值影响 |
5. Random Forest理论
5.1 Random Forest = Bagging + 特征采样
Random Forest在Bagging基础上增加了特征子采样:
class RandomForest:
def __init__(self, n_estimators=100, max_features='sqrt'):
self.n_estimators = n_estimators
self.max_features = max_features
self.trees = []
def fit(self, X, y):
for _ in range(self.n_estimators):
# Bootstrap采样
indices = np.random.choice(len(X), len(X), replace=True)
X_boot = X[indices]
y_boot = y[indices]
# 特征子采样
n_features = X.shape[1]
if self.max_features == 'sqrt':
n_select = int(np.sqrt(n_features))
else:
n_select = self.max_features
feature_indices = np.random.choice(n_features, n_select, replace=False)
# 训练决策树
tree = DecisionTree(max_features=feature_indices)
tree.fit(X_boot[:, feature_indices], y_boot)
self.trees.append(tree)
return self5.2 双重随机化的效果
| 随机化来源 | 作用 |
|---|---|
| Bootstrap采样 | 增加基学习器之间的差异 |
| 特征子采样 | 进一步解耦基学习器 |
| 组合效果 | 显著降低方差 |
5.3 Random Forest的收敛性
定理:随着树的数量增加,Random Forest的泛化误差几乎必然收敛到:
其中:
- 是树之间的平均相关性
- 是单棵树的预测强度
5.4 变量重要性
def permutation_importance(model, X, y, n_repeats=30):
"""
置换重要性计算
通过随机置换特征值来衡量特征重要性
"""
n_features = X.shape[1]
baseline_score = model.score(X, y)
importance = np.zeros(n_features)
for f in range(n_features):
scores = []
for _ in range(n_repeats):
# 置换特征f
X_permuted = X.copy()
X_permuted[:, f] = np.random.permutation(X_permuted[:, f])
# 计算置换后的分数
permuted_score = model.score(X_permuted, y)
scores.append(baseline_score - permuted_score)
importance[f] = np.mean(scores)
return importance6. Stacking统一框架
6.1 Stacking核心思想
Stacking通过元学习器组合多个基学习器的预测:
Level 0 (基学习器):
模型1 → 预测₁
模型2 → 预测₂
模型3 → 预测₃
↓
[预测₁, 预测₂, 预测₃] 堆叠为新特征
↓
Level 1 (元学习器):
元模型 → 最终预测
6.2 Stacking算法
def stacking_cv(X, y, base_models, meta_model, cv=5):
"""
交叉验证Stacking
使用交叉验证避免元学习器过拟合
"""
n_samples = len(X)
n_models = len(base_models)
# 存储OOF预测(Out-of-Fold)
oof_predictions = np.zeros((n_samples, n_models))
# 对每个基模型进行交叉验证
for m, model in enumerate(base_models):
kfold = KFold(n_splits=cv, shuffle=True, random_state=42)
for train_idx, val_idx in kfold.split(X):
# 训练模型
model.fit(X[train_idx], y[train_idx])
# 预测验证集
oof_predictions[val_idx, m] = model.predict(X[val_idx])
# 使用全部OOF预测训练元学习器
meta_X = oof_predictions
meta_model.fit(meta_X, y)
return base_models, meta_model
def full_stacking_pipeline(X_train, y_train, X_test, base_models, meta_model):
"""
完整的Stacking流程
"""
n_samples_train = len(X_train)
n_samples_test = len(X_test)
n_models = len(base_models)
# 第一层:基学习器OOF预测
oof_predictions = np.zeros((n_samples_train, n_models))
test_predictions = np.zeros((n_samples_test, n_models))
for m, model in enumerate(base_models):
# 交叉验证OOF
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
for train_idx, val_idx in kfold.split(X_train):
model.fit(X_train[train_idx], y_train[train_idx])
oof_predictions[val_idx, m] = model.predict(X_train[val_idx])
# 全量训练后在测试集上预测
model.fit(X_train, y_train)
test_predictions[:, m] = model.predict(X_test)
# 第二层:元学习器
meta_X_train = oof_predictions
meta_X_test = test_predictions
meta_model.fit(meta_X_train, y_train)
final_predictions = meta_model.predict(meta_X_test)
return final_predictions6.3 Stacking的理论分析
定理:Stacking的泛化误差满足:
其中余项与基学习器的多样性和元学习器的复杂性相关。
6.4 基学习器选择
| 基学习器类型 | 推荐场景 |
|---|---|
| 同质弱学习器 | 简单稳定,如决策树桩 |
| 异质强学习器 | 不同类型模型,增加多样性 |
| 树模型 + 线性模型 | 捕捉非线性 + 线性模式 |
7. 集成学习的统一视角
7.1 偏差-方差分解的统一框架
def ensemble_bias_variance(F_models, F_true, X, y):
"""
集成学习的偏差-方差分解分析
"""
n_samples = len(y)
n_trials = 100
# 多次采样训练
all_predictions = []
for _ in range(n_trials):
# 采样训练数据
indices = np.random.choice(n_samples, n_samples * 0.8, replace=False)
# 训练多个模型
predictions = []
for F in F_models:
model = clone(F)
model.fit(X[indices], y[indices])
predictions.append(model.predict(X))
# 集成预测(平均)
ensemble_pred = np.mean(predictions, axis=0)
all_predictions.append(ensemble_pred)
all_predictions = np.array(all_predictions)
# 计算统计量
mean_prediction = all_predictions.mean(axis=0)
bias = np.mean((mean_prediction - y) ** 2)
variance = all_predictions.var(axis=0).mean()
return bias, variance7.2 三种集成方法的对比
| 方面 | Bagging | Boosting | Stacking |
|---|---|---|---|
| 基学习器训练 | 并行 | 串行 | 并行 |
| 依赖性 | 独立 | 依赖前一轮 | 独立 |
| 关注点 | 降低方差 | 降低偏差 | 组合最优 |
| 过拟合风险 | 低 | 中-高 | 中 |
| 计算成本 | 中 | 高 | 高 |
| 典型算法 | RF, ExtraTrees | AdaBoost, GBDT | 各种变体 |
7.3 何时使用哪种集成方法
数据特点
│
┌─────────────┴─────────────┐
│ │
高方差(过拟合) 高偏差(欠拟合)
│ │
├─────────────┬─────────────┤
↓ ↓ ↓
Bagging Random Boosting
Forest
│ │ │
决策树 多样性 弱学习器
不稳定 需要 可串行优化
8. 集成学习的正则化
8.1 集成大小选择
def select_ensemble_size(X, y, base_learner_fn, max_n=500):
"""
选择最优集成大小
使用验证集或OOB误差选择
"""
from sklearn.model_selection import cross_val_score
n_estimators_range = [10, 50, 100, 200, 500]
scores = []
for n in n_estimators_range:
model = BaggingClassifier(
estimator=base_learner_fn(),
n_estimators=n,
oob_score=True # 使用OOB评估
)
model.fit(X, y)
scores.append(model.oob_score_)
print(f"n_estimators={n}: OOB Score={scores[-1]:.4f}")
best_n = n_estimators_range[np.argmax(scores)]
return best_n, scores8.2 集成预测的加权策略
class WeightedEnsemble:
"""加权集成"""
def __init__(self, models, weights=None):
self.models = models
self.weights = weights if weights else [1/len(models)] * len(models)
def fit(self, X, y):
for model in self.models:
model.fit(X, y)
return self
def predict(self, X):
predictions = np.array([model.predict(X) for model in self.models])
return np.average(predictions, axis=0, weights=self.weights)
def predict_proba(self, X):
probas = np.array([model.predict_proba(X) for model in self.models])
weighted_proba = np.average(probas, axis=0, weights=self.weights)
return weighted_proba8.3 剪枝集成
def prune_ensemble(models, X_val, y_val, metric='accuracy'):
"""
剪枝集成:移除表现差的模型
"""
n_models = len(models)
model_scores = []
# 评估每个模型
for model in models:
y_pred = model.predict(X_val)
if metric == 'accuracy':
score = accuracy_score(y_val, y_pred)
model_scores.append(score)
# 从最差的开始剪枝
sorted_indices = np.argsort(model_scores)[::-1] # 从好到差
best_score = -np.inf
best_subset = []
for k in range(1, n_models + 1):
subset = sorted_indices[:k]
ensemble_pred = np.mean([models[i].predict(X_val) for i in subset], axis=0)
ensemble_pred = (ensemble_pred > 0.5).astype(int)
score = accuracy_score(y_val, ensemble_pred)
if score > best_score:
best_score = score
best_subset = subset
return [models[i] for i in best_subset], best_score9. 集成学习的最新进展
9.1 深度集成
class DeepEnsemble:
"""深度集成:结合深度学习与集成"""
def __init__(self, n_members=5, hidden_dim=256):
self.n_members = n_members
self.models = []
for _ in range(n_members):
model = nn.Sequential(
nn.Linear(784, hidden_dim),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(hidden_dim, 10)
)
self.models.append(model)
def fit(self, X, y, epochs=10):
for i, model in enumerate(self.models):
# 不同随机种子训练
torch.manual_seed(42 + i)
set_seed(42 + i)
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
model.train()
for batch_x, batch_y in DataLoader(X, y):
optimizer.zero_grad()
loss = criterion(model(batch_x), batch_y)
loss.backward()
optimizer.step()
def predict(self, X):
# 集成预测
predictions = []
for model in self.models:
model.eval()
with torch.no_grad():
pred = torch.softmax(model(X), dim=1)
predictions.append(pred)
return torch.mean(torch.stack(predictions), dim=0)9.2 Snapshot集成
class SnapshotEnsemble:
"""Snapshot集成:利用学习率循环保存模型"""
def __init__(self, model_fn, n_snapshots=5, lr_max=0.1, lr_min=1e-4):
self.n_snapshots = n_snapshots
self.lr_max = lr_max
self.lr_min = lr_min
self.models = []
self.model_fn = model_fn
def train(self, X, y, epochs_per_cycle=50, n_cycles=5):
total_epochs = epochs_per_cycle * n_cycles
for cycle in range(n_cycles):
model = self.model_fn()
optimizer = torch.optim.SGD(model.parameters(), lr=self.lr_max)
# 余弦退火学习率
scheduler = CosineAnnealingWarmRestarts(
optimizer, T_0=epochs_per_cycle, T_mult=1
)
for epoch in range(epochs_per_cycle):
scheduler.step()
# 训练
model.train()
loss = self.compute_loss(model, X, y)
loss.backward()
optimizer.step()
# 保存snapshot
self.models.append(copy.deepcopy(model))
def predict(self, X):
predictions = [model(X) for model in self.models]
return torch.mean(torch.stack(predictions), dim=0)参考
相关主题
Footnotes
-
Zhou, Z. H. (2021). “Ensemble Learning.” Machine Learning. Springer. ↩