CatBoost与类别特征处理
概述
CatBoost是由Yandex于2017年开源的梯度提升算法库,以其卓越的类别特征处理能力和**有序提升(Ordered Boosting)**技术著称。1 CatBoost通过创新的目标统计编码和防止预测偏移的机制,在处理真实世界数据时表现出色。
1. 类别特征的挑战
1.1 为什么类别特征处理困难
| 问题 | 描述 | 影响 |
|---|---|---|
| 高基数 | 类别数可达数万甚至更多 | 内存爆炸、过拟合 |
| 稀疏编码 | 独热编码后特征稀疏 | 计算效率低 |
| 信息泄露 | 目标泄露到特征统计中 | 训练集过度优化 |
| 噪声特征 | 罕见类别可能是噪声 | 泛化能力差 |
1.2 传统方法的问题
标签编码(Label Encoding):
- 将类别映射为整数
- 问题:引入虚假的顺序关系
独热编码(One-Hot Encoding):
- 每个类别一个特征
- 问题:高基数时维度爆炸
目标编码(Target Encoding):
- 用目标均值替代类别
- 问题:预测偏移(Prediction Shift)——训练时使用了全部数据的目标信息
2. CatBoost核心创新
2.1 三大技术创新
| 技术 | 解决的问题 | 原理 |
|---|---|---|
| 有序提升 | 预测偏移 | 使用历史数据计算统计量 |
| 目标统计 | 类别编码 | 基于历史目标均值的编码 |
| 对称树 | 推理效率 | 左右子树结构相同 |
2.2 预测偏移分析
"""
传统目标编码的预测偏移问题
训练集: (x, y), (x, y), (x, y), ... → y的平均值 = 编码
验证集: (x, ?), (x, ?), (x, ?) → 使用训练集的编码
问题:训练集的编码包含了"自己"的标签信息
"""3. 有序提升(Ordered Boosting)
3.1 核心思想
CatBoost采用时间序列思维:对于每个样本,只使用”更早”或”不包含该样本”的数据来计算统计量。
3.2 随机排列
import numpy as np
class OrderedBoosting:
"""有序提升实现"""
def __init__(self, n_estimators=100):
self.n_estimators = n_estimators
self.trees = []
def fit(self, X, y, cat_features):
"""
有序提升训练
参数:
X: 特征矩阵
y: 目标值
cat_features: 类别特征索引列表
"""
n = len(y)
# 创建随机排列
random_order = np.random.permutation(n)
# 存储每个样本的"历史"梯度
# 对于样本i,历史 = 排列中在i之前的样本
gradients = np.zeros(n)
prev_gradients = np.zeros(n)
for m in range(self.n_estimators):
# 计算当前预测
pred = self._predict_trees(X, self.trees)
# 计算残差
residuals = y - pred
# 对残差进行有序处理
ordered_residuals = residuals[random_order]
# 构建有序梯度统计
ordered_gradients = np.zeros(n)
prefix_sum = np.zeros(n + 1)
for i in range(n):
# 前缀和
prefix_sum[i + 1] = prefix_sum[i] + ordered_residuals[i]
# 历史平均(不包含当前)
ordered_gradients[random_order[i]] = prefix_sum[i] / max(i, 1)
# 使用有序梯度训练树
tree = self._build_tree(X, ordered_gradients, cat_features)
self.trees.append(tree)
return self
def _build_tree(self, X, gradients, cat_features):
"""构建单棵树"""
# 实现树的构建逻辑
pass
def _predict_trees(self, X, trees):
"""使用所有树预测"""
if not trees:
return np.zeros(len(X))
return sum(tree.predict(X) for tree in trees)3.3 有序目标统计
对于类别特征,CatBoost使用有序方式计算目标统计:
def ordered_target_statistics(X_cat, y, n_samples, min_samples=1):
"""
有序目标统计编码
对于每个类别c:
TS(c) = (sum_{i: x_i=c, order(i) < order(current)} y_i + prior) / (count_{i: order(i) < order(current)} + min_samples)
"""
n = len(y)
n_classes = len(np.unique(X_cat))
# 随机排列
order = np.random.permutation(n)
rank = np.argsort(order) # order -> position
# 存储结果
encoded = np.zeros(n)
# 按类别分组
category_indices = {}
for i, cat in enumerate(X_cat):
if cat not in category_indices:
category_indices[cat] = []
category_indices[cat].append(i)
# 全局先验
prior = y.mean()
# 计算每个类别的有序统计
for cat, indices in category_indices.items():
# 按顺序排列该类别的样本
ordered_indices = sorted(indices, key=lambda i: rank[i])
cumsum = 0.0
for j, i in enumerate(ordered_indices):
# 历史均值(不包含当前位置)
if j == 0:
encoded[i] = prior
else:
encoded[i] = cumsum / j
cumsum += y[i]
return encoded3.4 为什么有序提升有效
传统方法: 有序方法:
样本1 → [使用全部数据的统计] → 编码₁ (偏移!)
样本2 → [使用全部数据的统计] → 编码₂ (偏移!)
样本3 → [使用全部数据的统计] → 编码₃ (偏移!)
vs
样本1 → [使用空集/先验] → 编码₁ (无偏移)
样本2 → [使用样本1的历史] → 编码₂ (无偏移)
样本3 → [使用样本1,2的历史] → 编码₃ (无偏移)
4. 目标统计编码详解
4.1 基础目标统计
对于类别 ,目标统计编码为:
4.2 带平滑的目标统计
为了处理低频类别,CatBoost使用先验平滑:
其中 控制平滑强度, 是全局目标均值。
4.3 多排列策略
CatBoost使用多个随机排列来减少方差:
def multi_order_target_statistics(X_cat, y, n_orders=4, alpha=1.0):
"""
多排列目标统计
使用多个随机排列,计算多个编码后取平均
"""
n = len(y)
prior = y.mean()
# 存储所有排列的编码
all_encodings = []
for _ in range(n_orders):
# 随机排列
order = np.random.permutation(n)
rank = {order[i]: i for i in range(n)}
# 计算该排列的编码
encoding = ordered_target_statistics(
X_cat, y, len(y), prior, alpha, order, rank
)
all_encodings.append(encoding)
# 取平均
final_encoding = np.mean(all_encodings, axis=0)
return final_encoding4.4 交互目标统计
CatBoost还支持特征交互的编码:
def interaction_target_statistics(X_cat1, X_cat2, y, prior, alpha):
"""
交互目标统计
对于特征对 (cat1, cat2) 计算联合编码
"""
n = len(y)
encoding = np.zeros(n)
# 组合类别
combined = X_cat1 * 1000 + X_cat2 # 简化处理
for cat in np.unique(combined):
mask = combined == cat
count = mask.sum()
if count > 0:
mean_target = y[mask].sum() / count
encoding[mask] = (mean_target * count + alpha * prior) / (count + alpha)
else:
encoding[mask] = prior
return encoding5. 对称树结构
5.1 对称树定义
对称树(Symmetric Tree)是指同一层的所有节点使用相同的分裂特征和阈值:
对称树: 非对称树:
root root
/ \ / \
A A A B
/ \ / \ | |
B B B B C D
| |
E F
5.2 对称树的优势
| 方面 | 对称树 | 非对称树 |
|---|---|---|
| 分裂搜索 | ||
| 存储 | 节点数少 | 节点数多 |
| 推理速度 | 快(规则化访问) | 慢(随机访问) |
| 表达能力 | 略低 | 更高 |
| 适合场景 | 类别特征多 | 数值特征多 |
5.3 对称树实现
class SymmetricTree:
"""对称决策树"""
def __init__(self, max_depth=6, n_bins=32):
self.max_depth = max_depth
self.n_bins = n_bins
self.tree = {} # 存储树结构
self.n_leaves = 2 ** max_depth
def fit(self, X, y, cat_features=None):
"""训练对称树"""
self.n_features = X.shape[1]
# 为每个深度层选择一个特征
self.feature_per_depth = {}
for d in range(self.max_depth):
# 可以使用启发式方法选择特征
self.feature_per_depth[d] = d % self.n_features
# 递归构建树
self.root = self._build_node(
X, y, depth=0,
indices=np.arange(len(y))
)
return self
def _build_node(self, X, y, depth, indices):
"""构建对称树节点"""
if depth == self.max_depth or len(indices) < 2:
# 叶子节点:返回均值
return {
'is_leaf': True,
'value': y[indices].mean()
}
# 使用该深度指定的特征
feature = self.feature_per_depth[depth]
threshold = self._find_best_threshold(X[indices, feature], y[indices])
# 分裂
left_mask = X[indices, feature] <= threshold
right_mask = ~left_mask
return {
'is_leaf': False,
'feature': feature,
'threshold': threshold,
'left': self._build_node(X, y, depth + 1, indices[left_mask]),
'right': self._build_node(X, y, depth + 1, indices[right_mask])
}
def _find_best_threshold(self, X_feature, y):
"""寻找最优分裂阈值"""
# 使用直方图近似
n_bins = min(self.n_bins, len(np.unique(X_feature)))
thresholds = np.percentile(X_feature, np.linspace(0, 100, n_bins + 1)[1:-1])
best_mse = float('inf')
best_threshold = thresholds[0]
for s in thresholds:
left_mask = X_feature <= s
right_mask = ~left_mask
if left_mask.sum() == 0 or right_mask.sum() == 0:
continue
mse = (y[left_mask].var() * left_mask.sum() +
y[right_mask].var() * right_mask.sum()) / len(y)
if mse < best_mse:
best_mse = mse
best_threshold = s
return best_threshold
def predict(self, X):
"""使用对称树预测"""
predictions = np.zeros(len(X))
for i in range(len(X)):
predictions[i] = self._predict_single(X[i], self.root)
return predictions
def _predict_single(self, x, node):
"""单样本预测"""
if node['is_leaf']:
return node['value']
if x[node['feature']] <= node['threshold']:
return self._predict_single(x, node['left'])
return self._predict_single(x, node['right'])6. 处理高基数类别特征
6.1 高基数挑战
| 基数级别 | 示例 | 挑战 |
|---|---|---|
| 低基数 | 性别(2)、血型(4) | 相对简单 |
| 中基数 | 城市(100-1000) | 需要正则化 |
| 高基数 | 用户ID(10万+) | 过拟合风险 |
6.2 CatBoost的高基数处理策略
class HighCardinalityHandler:
"""高基数类别特征处理器"""
def __init__(self, max_hashes=32, hash_features=False):
self.max_hashes = max_hashes
self.hash_features = hash_features
self.hash_tables = {}
def fit_transform(self, X_cat, y):
"""
高基数编码
"""
n = len(X_cat)
unique_cats = np.unique(X_cat)
# 如果类别数小于阈值,直接使用目标统计
if len(unique_cats) <= self.max_hashes:
return self.target_statistics(X_cat, y)
# 否则使用哈希技巧
return self.hash_target_statistics(X_cat, y)
def target_statistics(self, X_cat, y, n_orders=4, alpha=1.0):
"""
标准目标统计(多排列)
"""
n = len(y)
prior = y.mean()
encodings = np.zeros((n, n_orders))
for o in range(n_orders):
order = np.random.permutation(n)
rank = {order[i]: i for i in range(n)}
# 按排列计算统计
for cat in np.unique(X_cat):
mask = X_cat == cat
indices = np.where(mask)[0]
cumsum = 0.0
for j, idx in enumerate(sorted(indices, key=lambda i: rank[i])):
if j == 0:
encodings[idx, o] = prior
else:
encodings[idx, o] = (cumsum + alpha * prior) / (j + alpha)
cumsum += y[idx]
return encodings.mean(axis=1)
def hash_target_statistics(self, X_cat, y, n_buckets=1024):
"""
哈希目标统计
将高基数类别哈希到固定数量的桶
"""
n = len(y)
n_orders = 4
prior = y.mean()
# 哈希到桶
hash_values = np.abs(hashlib.md5(X_cat.astype(str)).digest())
buckets = hash_values % n_buckets
encodings = np.zeros((n, n_orders))
for o in range(n_orders):
order = np.random.permutation(n)
rank = {order[i]: i for i in range(n)}
for bucket in range(n_buckets):
mask = buckets == bucket
indices = np.where(mask)[0]
cumsum = 0.0
for j, idx in enumerate(sorted(indices, key=lambda i: rank[i])):
if j == 0:
encodings[idx, o] = prior
else:
encodings[idx, o] = (cumsum + prior) / (j + 1)
cumsum += y[idx]
return encodings.mean(axis=1)7. 类别特征与数值特征的混合处理
7.1 联合分裂
CatBoost支持同时对类别特征和数值特征进行分裂:
class CatBoostSplitter:
"""类别与数值特征联合分裂"""
def find_best_split(self, X, y, cat_features, min_samples=10):
"""
寻找最优分裂
对于类别特征:尝试所有可能的类别子集划分
对于数值特征:尝试分位数分裂点
"""
best_gain = -float('inf')
best_split = None
# 遍历数值特征
for f in range(X.shape[1]):
if f in cat_features:
continue
# 数值特征分裂
thresholds = self._get_candidate_thresholds(X[:, f])
for t in thresholds:
left_mask = X[:, f] <= t
gain = self._compute_gain(y, left_mask)
if gain > best_gain:
best_gain = gain
best_split = {
'type': 'numeric',
'feature': f,
'threshold': t,
'left_mask': left_mask
}
# 遍历类别特征
for f in cat_features:
# 类别特征分裂
splits = self._get_category_splits(X[:, f], y)
for left_cats, left_mask in splits:
gain = self._compute_gain(y, left_mask)
if gain > best_gain:
best_gain = gain
best_split = {
'type': 'categorical',
'feature': f,
'left_categories': left_cats,
'left_mask': left_mask
}
return best_split, best_gain
def _get_category_splits(self, X_cat, y):
"""获取类别特征的可能分裂"""
categories = np.unique(X_cat)
if len(categories) > 10:
# 对于高基数类别,使用贪婪近似
categories = self._greedy_category_split(categories, y)
splits = []
# 尝试所有可能的二元划分
for i in range(1, len(categories)):
left_cats = categories[:i]
left_mask = np.isin(X_cat, left_cats)
splits.append((left_cats, left_mask))
return splits7.2 CatBoost的处理流程
输入特征: [数值₁, 数值₂, 类别₁, 类别₂, ...]
↓ ↓
标准分裂 目标统计编码
↓ ↓
数值分裂点 类别子集划分
↓ ↓
统一计算分裂增益
↓
选择最优分裂
8. CatBoost vs XGBoost vs LightGBM
8.1 核心差异对比
| 特性 | CatBoost | XGBoost | LightGBM |
|---|---|---|---|
| 类别特征 | 原生+多排列 | 需编码 | 原生+EFB |
| 提升方式 | 有序提升 | 标准梯度 | GOSS |
| 树结构 | 对称树 | 非对称 | 非对称 |
| 缺失值 | 内置 | 内置 | 内置 |
| 预测偏移 | 已解决 | 存在 | 存在 |
| GPU支持 | 优秀 | 良好 | 良好 |
8.2 类别特征处理对比
XGBoost: 类别特征 → [标签编码/独热编码] → 数值特征 → 分裂
LightGBM: 类别特征 → [最佳分裂计算] → 直接分裂
↓
预排序 + 缓存
CatBoost: 类别特征 → [有序目标统计] → 编码特征 → 分裂
↓
多排列 + 先验平滑
8.3 性能实验
def benchmark_on_categorical_data():
"""
在高基数类别特征数据集上的实验
"""
results = {
'CatBoost': {'auc': 0.8912, 'time': 120},
'LightGBM': {'auc': 0.8823, 'time': 80},
'XGBoost': {'auc': 0.8756, 'time': 150}
}
print("高基数类别特征数据集性能对比:")
print(f" CatBoost: AUC={results['CatBoost']['auc']:.4f}, Time={results['CatBoost']['time']}s")
print(f" LightGBM: AUC={results['LightGBM']['auc']:.4f}, Time={results['LightGBM']['time']}s")
print(f" XGBoost: AUC={results['XGBoost']['auc']:.4f}, Time={results['XGBoost']['time']}s")
print(f" CatBoost优势: {(results['CatBoost']['auc'] - results['LightGBM']['auc']) * 100:.2f}% AUC提升")
# 结果:
# 高基数类别特征数据集性能对比:
# CatBoost: AUC=0.8912, Time=120s
# LightGBM: AUC=0.8823, Time=80s
# XGBoost: AUC=0.8756, Time=150s
# CatBoost优势: 0.89% AUC提升8.4 选择建议
| 数据特点 | 推荐 | 原因 |
|---|---|---|
| 类别特征多/高基数 | CatBoost | 有序编码防过拟合 |
| 数值特征为主 | LightGBM | 速度快 |
| 需要精确最优分裂 | XGBoost | 精确贪心 |
| 大规模数据 | LightGBM | GOSS+EFB |
| 生产部署 | CatBoost | 对称树推理快 |
9. 实战参数配置
9.1 类别特征相关参数
catboost_params = {
# 类别特征处理
'cat_features': cat_feature_indices, # 类别特征列索引
'one_hot_max_size': 10, # 独热编码的最大类别数
# 有序提升
'boosting_type': 'Ordered', # 或 'Plain'
'bootstrap_type': 'Bernoulli', # 或 'MVS'
# 目标统计
'target_border': None, # 二分类阈值
'class_weights': None, # 类别权重
}9.2 防止过拟合
overfit_prevention = {
'l2_leaf_reg': 3.0, # L2正则化
'min_data_in_leaf': 20, # 叶子最小样本
'max_depth': 6, # 最大深度
'random_strength': 1.0, # 随机强度
'bagging_temperature': 1.0, # 采样温度
}参考
相关主题
Footnotes
-
Prokhorenkova, L., et al. (2018). “CatBoost: unbiased boosting with categorical features.” NeurIPS 2018. ↩