TDA工具与深度学习集成
本文档汇总拓扑数据分析(Topological Data Analysis, TDA)主流工具库的使用指南,并提供与深度学习框架集成的实践代码。
1. 核心TDA工具库
1.1 工具对比概览
| 库名 | 语言 | 特点 | 活跃度 | 学习曲线 |
|---|---|---|---|---|
| GUDHI | C++/Python | 功能最全面,支持复形类型多 | 高 | 陡峭 |
| Ripser | C++/Python | 高效,专注文持久同调 | 高 | 平缓 |
| KeplerMapper | Python | Mapper算法,可视化强 | 中 | 平缓 |
| Giotto-tda | Python | sklearn集成,ML友好 | 中 | 平缓 |
| scikit-tda | Python | 基础工具集合 | 中 | 平缓 |
| TopologyToolKit | C++/Python | 高级分析,高性能 | 中 | 陡峭 |
1.2 安装指南
# 基础TDA工具
pip install ripser persim gudhi scikit-tda
# Mapper和可视化
pip install kepler-mapper
# 与sklearn集成
pip install giotto-tda
# 深度学习集成
pip install torch-geometric
# 可选:详细文档
pip install gudhi[sql] # SQLite支持2. Ripser快速入门
2.1 基础使用
import numpy as np
from ripser import ripser
from persim import plot_diagrams
import matplotlib.pyplot as plt
# 生成测试数据:两个嵌套圆环
np.random.seed(42)
# 圆环1
theta1 = np.random.uniform(0, 2*np.pi, 50)
circle1 = np.column_stack([
np.cos(theta1) + np.random.normal(0, 0.05, 50),
np.sin(theta1) + np.random.normal(0, 0.05, 50),
np.zeros(50)
])
# 圆环2(嵌套)
theta2 = np.random.uniform(0, 2*np.pi, 50)
circle2 = np.column_stack([
2*np.cos(theta2) + np.random.normal(0, 0.05, 50),
2*np.sin(theta2) + np.random.normal(0, 0.05, 50),
np.zeros(50)
])
# 合并数据
data = np.vstack([circle1, circle2])
# 计算持久同调
result = ripser(data, maxdim=2, thresh=3.0)
# 查看持久图
print("0维持久图(连通分量):", result['dgms'][0])
print("1维持久图(环路):", result['dgms'][1])
print("2维持久图(空洞):", result['dgms'][2])
# 可视化
plot_diagrams(result['dgms'], show=True)2.2 持久图距离计算
from persim import persim, bottleneck
# 两个数据集的持久图
result1 = ripser(data1, maxdim=1)
result2 = ripser(data2, maxdim=1)
dgm1 = result1['dgms'][1]
dgm2 = result2['dgms'][1]
# Bottleneck距离
b_dist = bottleneck(dgm1, dgm2)
print(f"Bottleneck距离: {b_dist:.4f}")
# Wasserstein距离 (p=2)
w_dist = persim(dgm1, dgm2, matching=False)
print(f"Wasserstein距离: {w_dist:.4f}")2.3 批量计算
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from ripser import ripser
def compute_ph_for_dataset(data):
"""计算单个数据集的持久同调"""
result = ripser(data, maxdim=2, thresh=2.0)
return result['dgms']
# 大量数据集的批量处理
def batch_persistent_homology(data_list, n_workers=4):
"""并行计算多个数据集的持久同调"""
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(compute_ph_for_dataset, data_list))
return results
# 使用示例
datasets = [generate_synthetic_data() for _ in range(100)]
all_diagrams = batch_persistent_homology(datasets)3. GUDHI进阶使用
3.1 Vietoris-Rips复形
from gudhi import RipsComplex, PersistentHomologyDiagram
import numpy as np
# 创建Vietoris-Rips复形
points = np.random.random((100, 3)) # 100个3D点
rips = RipsComplex(points=points, max_edge_length=0.5)
# 构建复形(最大维度为2)
simplex_tree = rips.create_simplex_tree(max_dimension=2)
# 计算持久同调
diag = simplex_tree.persistence()
# 获取Betti数
print("Betti(0):", simplex_tree.betti_numbers()[0])
print("Betti(1):", simplex_tree.betti_numbers()[1])
# 获取持久对
pairs = simplex_tree.persistence_pairs()
print(f"持久对数量: {len(pairs)}")3.2 复杂过滤函数
from gudhi import SimplexTree, FlagComplex
from gudhi.point_cloud.timmer import Controller
# 使用自定义过滤函数
# 示例:基于密度的过滤
def density_filter(points, radius=0.1):
"""计算每个点的局部密度"""
from scipy.spatial.distance import cdist
dist_matrix = cdist(points, points)
densities = (dist_matrix < radius).sum(axis=1)
return densities
points = np.random.random((100, 2))
densities = density_filter(points)
# 创建带过滤的复形
rips = RipsComplex(points=points, max_edge_length=0.5)
simplex_tree = rips.create_simplex_tree(max_dimension=2)
# 持久同调可视化
from gudhi import plot_persistence_diagram, plot_persistence_barcode
plot_persistence_diagram(diag)
plot_persistence_barcode(diag)3.3 持久图像
from gudhi.sklearn import (
PersistenceVectorizer,
PersistenceFisherSVM,
PersistenceWeightedGaussianKernel
)
# 持久向量器:将持久图转换为向量
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
pipeline = Pipeline([
('scaler', StandardScaler()),
('vectorizer', PersistenceVectorizer(
kernel="polynomial", # 或 "gaussian"
degree=2,
bandwidth=1.0,
weight_function=lambda x: np.power(x, 1) # 持久度加权
))
])
# 拟合并转换
X_topo = pipeline.fit_transform(diagrams)
print(f"拓扑特征维度: {X_topo.shape}")4. Giotto-tda集成
4.1 sklearn风格API
from gtda.homology import (
VietorisRipsPersistence,
CubicalPersistence,
WeakAlphaPersistence
)
from gtda.diagrams import (
PersistenceScaler,
Filtering,
BettiNumbers,
Entropy,
HeatKernel
)
from sklearn.pipeline import Pipeline
import numpy as np
# 创建持久同调计算管道
persistence_pipeline = Pipeline([
# 持久同调计算
('vr', VietorisRipsPersistence(
metric='euclidean',
max_edge_length=1.0,
max_dim=2,
n_jobs=-1 # 并行计算
)),
# 缩放持久度
('scaler', PersistenceScaler(
n_jobs=-1
)),
# 过滤噪声(短持久度特征)
('filter', Filtering(
method='trim',
threshold=0.1 # 移除持久度<0.1的特征
))
])
# 处理数据
X = np.random.random((100, 20, 2)) # 100个样本,每个20个2D点
X_persistence = persistence_pipeline.fit_transform(X)
print(f"持久图维度: {X_persistence.shape}") # (100, n_points, 3)4.2 特征提取
from gtda.diagrams import (
BettiNumbers,
Entropy,
HeatKernel,
PersistenceLandscape,
Silhouette
)
# 创建特征提取器
feature_extractors = [
('betti', BettiNumbers(n_bins=10)),
('entropy', Entropy(n_bins=10, normalized=False)),
('landscape', PersistenceLandscape(n_bins=100, n_layers=3)),
('silhouette', Silhouette(n_bins=100, power=2)),
('heat', HeatKernel(sigma=0.1, n_bins=100))
]
# 提取多个特征
features = []
feature_names = []
for name, extractor in feature_extractors:
X_feat = extractor.fit_transform(X_persistence)
features.append(X_feat)
feature_names.append(name)
print(f"{name}: {X_feat.shape}")
# 拼接所有特征
X_combined = np.hstack(features)
print(f"组合特征维度: {X_combined.shape}")4.3 与sklearn分类器结合
from gtda.pipeline import make_pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# 完整的分类管道
classification_pipeline = make_pipeline(
VietorisRipsPersistence(metric='euclidean', max_dim=1),
PersistenceScaler(),
Filtering(method='trim', threshold=0.05),
BettiNumbers(n_bins=10),
RandomForestClassifier(n_estimators=100, random_state=42)
)
# 准备数据
X_train, X_test, y_train, y_test = train_test_split(
point_clouds, labels, test_size=0.2
)
# 训练
classification_pipeline.fit(X_train, y_train)
# 预测
y_pred = classification_pipeline.predict(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"分类准确率: {accuracy:.4f}")5. 深度学习集成
5.1 PyTorch几何集成
import torch
import torch.nn as nn
from torch_geometric.nn import MessagePassing, global_mean_pool
class TopologicalMessagePassing(MessagePassing):
"""
拓扑感知消息传递层
"""
def __init__(self, in_channels, out_channels):
super().__init__(aggr='add')
self.lin = nn.Linear(in_channels, out_channels)
self.edge_lin = nn.Linear(in_channels * 2, out_channels)
def forward(self, x, edge_index, edge_attr=None):
# x: 节点特征
# edge_index: 边索引
# edge_attr: 边特征(可选,包含拓扑信息)
return self.propagate(edge_index, x=x, edge_attr=edge_attr)
def message(self, x_i, x_j, edge_attr):
# 消息:源节点特征 + 边特征
if edge_attr is not None:
return self.edge_lin(torch.cat([x_i, edge_attr], dim=-1))
return self.lin(x_j)
class TopoGNN(nn.Module):
"""
拓扑感知图神经网络
"""
def __init__(self, node_dim, edge_dim, hidden_dim, out_dim):
super().__init__()
self.conv1 = TopologicalMessagePassing(node_dim, hidden_dim)
self.conv2 = TopologicalMessagePassing(hidden_dim, hidden_dim)
self.conv3 = TopologicalMessagePassing(hidden_dim, hidden_dim)
self.classifier = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, out_dim)
)
def forward(self, data):
x, edge_index = data.x, data.edge_index
edge_attr = data.edge_attr if hasattr(data, 'edge_attr') else None
# 拓扑感知消息传递
x = torch.relu(self.conv1(x, edge_index, edge_attr))
x = torch.relu(self.conv2(x, edge_index, edge_attr))
x = torch.relu(self.conv3(x, edge_index, edge_attr))
# 图级别池化
x = global_mean_pool(x, data.batch)
return self.classifier(x)5.2 可微分持久同调
import torch
import torch.nn as nn
class SoftPersistenceDiagram(nn.Module):
"""
软化持久同调层
近似可微分的持久同调计算
"""
def __init__(self, max_samples=100, temperature=0.1):
super().__init__()
self.max_samples = max_samples
self.temperature = temperature
def forward(self, points):
"""
points: (batch, n_points, dim)
返回: (batch, n_features)
"""
batch_size = points.shape[0]
device = points.device
# 计算成对距离矩阵
diff = points.unsqueeze(2) - points.unsqueeze(1) # (B, N, N, D)
dist = torch.norm(diff, dim=-1) # (B, N, N)
# 软化排序(可微分)
# 使用Gumbel-Softmax近似排序
weights = torch.softmax(-dist / self.temperature, dim=-1)
# 估计持久度统计量
features = []
for b in range(batch_size):
d = dist[b] # (N, N)
w = weights[b] # (N, N)
# 加权距离
weighted_dist = (w * d).sum()
# 邻域大小
neighborhood = (d < d.median()).float().sum()
features.append([weighted_dist, neighborhood])
return torch.tensor(features, device=device)
class TopoDeepLayer(nn.Module):
"""
拓扑深度学习层
结合PH特征和深度特征
"""
def __init__(self, deep_dim, topo_dim, hidden_dim):
super().__init__()
self.deep_mlp = nn.Linear(deep_dim, hidden_dim)
self.topo_mlp = nn.Linear(topo_dim, hidden_dim)
self.fusion = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.ReLU()
)
def forward(self, deep_features, topo_features):
deep = self.deep_mlp(deep_features)
topo = self.topo_mlp(topo_features)
combined = torch.cat([deep, topo], dim=-1)
return self.fusion(combined)5.3 端到端训练
import torch
from torch.utils.data import Dataset, DataLoader
class TopoPointCloudDataset(Dataset):
"""
拓扑点云数据集
"""
def __init__(self, point_clouds, labels, compute_topo=True):
self.point_clouds = point_clouds
self.labels = labels
self.compute_topo = compute_topo
if compute_topo:
from ripser import ripser
self.topo_features = []
for pc in point_clouds:
result = ripser(pc, maxdim=1, thresh=1.0)
dgm = result['dgms'][1]
# 提取持久度统计
if len(dgm) > 0 and dgm[0, 1] < float('inf'):
pers = dgm[:, 1] - dgm[:, 0]
feat = [len(pers), pers.mean(), pers.max(), pers.std()]
else:
feat = [0, 0, 0, 0]
self.topo_features.append(feat)
def __len__(self):
return len(self.point_clouds)
def __getitem__(self, idx):
pc = torch.tensor(self.point_clouds[idx], dtype=torch.float32)
label = torch.tensor(self.labels[idx], dtype=torch.long)
if self.compute_topo:
topo = torch.tensor(self.topo_features[idx], dtype=torch.float32)
return pc, topo, label
return pc, label
def train_topo_model():
"""
端到端拓扑感知模型训练
"""
# 创建数据集
dataset = TopoPointCloudDataset(point_clouds, labels)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 创建模型
model = TopoGNN(
node_dim=3,
edge_dim=1,
hidden_dim=128,
out_dim=num_classes
)
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
# 训练循环
model.train()
for epoch in range(100):
total_loss = 0
for batch in dataloader:
if len(batch) == 3:
pcs, topo_features, labels = batch
else:
pcs, labels = batch
topo_features = None
optimizer.zero_grad()
# 前向传播
# 这里需要将点云转换为图格式
# 简化示例
out = model(pcs)
loss = criterion(out, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Loss={total_loss:.4f}")
return model6. 可视化工具
6.1 Mapper算法
import numpy as np
from kmapper import KeplerMapper
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
# 创建Mapper
mapper = KeplerMapper(verbose=0)
# 准备数据
data = np.random.random((1000, 20))
data = StandardScaler().fit_transform(data)
# 拟合并映射
lens = mapper.fit_transform(data, projection='umap')
graph = mapper.fit_transform(
data,
lens=lens,
cover=km.Cover(n_cubes=15, perc_overlap=0.5),
clusterer=km.LenslessCmeans(n_clusters=5)
)
# 可视化
mapper.visualize(
graph,
path_html="mapper_output.html",
title="Topological Data Exploration"
)6.2 持久图可视化
import matplotlib.pyplot as plt
from persim import plot_diagrams
from ripser import ripser
# 计算持久同调
data = generate_complex_shape()
result = ripser(data, maxdim=2)
# 持久图
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i in range(3):
dgm = result['dgms'][i]
axes[i].scatter(dgm[:, 0], dgm[:, 1])
axes[i].plot([0, dgm[:, 1].max()], [0, dgm[:, 1].max()], 'k--')
axes[i].set_xlabel('Birth')
axes[i].set_ylabel('Death')
axes[i].set_title(f'Persistence Diagram (dim={i})')
plt.tight_layout()
plt.savefig('persistence_diagrams.png')7. 性能优化
7.1 子采样策略
import numpy as np
from ripser import ripser
def subsample_persistent_homology(points, n_samples=1000, seed=42):
"""
大规模数据的持久同调计算
使用随机子采样
"""
n_points = len(points)
if n_points <= n_samples:
return ripser(points, maxdim=2)
np.random.seed(seed)
indices = np.random.choice(n_points, n_samples, replace=False)
sampled_points = points[indices]
return ripser(sampled_points, maxdim=2)
def adaptive_subsample(points, base_n=500, max_iterations=5):
"""
自适应子采样
根据结果稳定性决定是否增加样本
"""
prev_persistence = None
for n in [base_n * (2**i) for i in range(max_iterations)]:
result = subsample_persistent_homology(points, n_samples=n)
current_persistence = result['dgms'][1][:, 1] - result['dgms'][1][:, 0]
if prev_persistence is not None:
# 检查收敛性
if np.abs(current_persistence.mean() - prev_persistence.mean()) < 0.01:
return result
prev_persistence = current_persistence
return result7.2 并行计算
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import numpy as np
from ripser import ripser
def parallel_persistence_computation(data_list, n_workers=4):
"""
并行计算多个数据集的持久同调
"""
# 使用进程池(CPU密集型)
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(
lambda d: ripser(d, maxdim=2),
data_list
))
return results
def batch_with_progress(data_list, batch_size=100):
"""
分批处理大数据
"""
results = []
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i+batch_size]
batch_results = parallel_persistence_computation(batch)
results.extend(batch_results)
print(f"Processed {min(i+batch_size, len(data_list))}/{len(data_list)}")
return results8. 实践项目建议
8.1 入门项目
- 持久同调可视化:对不同形状数据计算并可视化
- 拓扑特征分类:将PH特征加入传统ML管道
- Mapper探索:对高维数据使用Mapper进行降维可视化
8.2 进阶项目
- 拓扑增强GNN:在图神经网络中加入拓扑特征
- 拓扑约束生成:强制生成数据保持特定拓扑
- 动态拓扑追踪:时变数据的拓扑分析
8.3 研究项目
- 可微分PH层:设计端到端可训练的拓扑层
- 拓扑Transformer:融合注意力与拓扑归纳偏置
- 拓扑强化学习:状态空间的拓扑先验