TDA工具与深度学习集成

本文档汇总拓扑数据分析(Topological Data Analysis, TDA)主流工具库的使用指南,并提供与深度学习框架集成的实践代码。


1. 核心TDA工具库

1.1 工具对比概览

库名语言特点活跃度学习曲线
GUDHIC++/Python功能最全面,支持复形类型多陡峭
RipserC++/Python高效,专注文持久同调平缓
KeplerMapperPythonMapper算法,可视化强平缓
Giotto-tdaPythonsklearn集成,ML友好平缓
scikit-tdaPython基础工具集合平缓
TopologyToolKitC++/Python高级分析,高性能陡峭

1.2 安装指南

# 基础TDA工具
pip install ripser persim gudhi scikit-tda
 
# Mapper和可视化
pip install kepler-mapper
 
# 与sklearn集成
pip install giotto-tda
 
# 深度学习集成
pip install torch-geometric
 
# 可选:详细文档
pip install gudhi[sql]  # SQLite支持

2. Ripser快速入门

2.1 基础使用

import numpy as np
from ripser import ripser
from persim import plot_diagrams
import matplotlib.pyplot as plt
 
# 生成测试数据:两个嵌套圆环
np.random.seed(42)
 
# 圆环1
theta1 = np.random.uniform(0, 2*np.pi, 50)
circle1 = np.column_stack([
    np.cos(theta1) + np.random.normal(0, 0.05, 50),
    np.sin(theta1) + np.random.normal(0, 0.05, 50),
    np.zeros(50)
])
 
# 圆环2(嵌套)
theta2 = np.random.uniform(0, 2*np.pi, 50)
circle2 = np.column_stack([
    2*np.cos(theta2) + np.random.normal(0, 0.05, 50),
    2*np.sin(theta2) + np.random.normal(0, 0.05, 50),
    np.zeros(50)
])
 
# 合并数据
data = np.vstack([circle1, circle2])
 
# 计算持久同调
result = ripser(data, maxdim=2, thresh=3.0)
 
# 查看持久图
print("0维持久图(连通分量):", result['dgms'][0])
print("1维持久图(环路):", result['dgms'][1])
print("2维持久图(空洞):", result['dgms'][2])
 
# 可视化
plot_diagrams(result['dgms'], show=True)

2.2 持久图距离计算

from persim import persim, bottleneck
 
# 两个数据集的持久图
result1 = ripser(data1, maxdim=1)
result2 = ripser(data2, maxdim=1)
 
dgm1 = result1['dgms'][1]
dgm2 = result2['dgms'][1]
 
# Bottleneck距离
b_dist = bottleneck(dgm1, dgm2)
print(f"Bottleneck距离: {b_dist:.4f}")
 
# Wasserstein距离 (p=2)
w_dist = persim(dgm1, dgm2, matching=False)
print(f"Wasserstein距离: {w_dist:.4f}")

2.3 批量计算

from concurrent.futures import ProcessPoolExecutor
import numpy as np
from ripser import ripser
 
def compute_ph_for_dataset(data):
    """计算单个数据集的持久同调"""
    result = ripser(data, maxdim=2, thresh=2.0)
    return result['dgms']
 
# 大量数据集的批量处理
def batch_persistent_homology(data_list, n_workers=4):
    """并行计算多个数据集的持久同调"""
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(compute_ph_for_dataset, data_list))
    return results
 
# 使用示例
datasets = [generate_synthetic_data() for _ in range(100)]
all_diagrams = batch_persistent_homology(datasets)

3. GUDHI进阶使用

3.1 Vietoris-Rips复形

from gudhi import RipsComplex, PersistentHomologyDiagram
import numpy as np
 
# 创建Vietoris-Rips复形
points = np.random.random((100, 3))  # 100个3D点
rips = RipsComplex(points=points, max_edge_length=0.5)
 
# 构建复形(最大维度为2)
simplex_tree = rips.create_simplex_tree(max_dimension=2)
 
# 计算持久同调
diag = simplex_tree.persistence()
 
# 获取Betti数
print("Betti(0):", simplex_tree.betti_numbers()[0])
print("Betti(1):", simplex_tree.betti_numbers()[1])
 
# 获取持久对
pairs = simplex_tree.persistence_pairs()
print(f"持久对数量: {len(pairs)}")

3.2 复杂过滤函数

from gudhi import SimplexTree, FlagComplex
from gudhi.point_cloud.timmer import Controller
 
# 使用自定义过滤函数
# 示例:基于密度的过滤
def density_filter(points, radius=0.1):
    """计算每个点的局部密度"""
    from scipy.spatial.distance import cdist
    dist_matrix = cdist(points, points)
    densities = (dist_matrix < radius).sum(axis=1)
    return densities
 
points = np.random.random((100, 2))
densities = density_filter(points)
 
# 创建带过滤的复形
rips = RipsComplex(points=points, max_edge_length=0.5)
simplex_tree = rips.create_simplex_tree(max_dimension=2)
 
# 持久同调可视化
from gudhi import plot_persistence_diagram, plot_persistence_barcode
 
plot_persistence_diagram(diag)
plot_persistence_barcode(diag)

3.3 持久图像

from gudhi.sklearn import (
    PersistenceVectorizer,
    PersistenceFisherSVM,
    PersistenceWeightedGaussianKernel
)
 
# 持久向量器:将持久图转换为向量
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
 
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('vectorizer', PersistenceVectorizer(
        kernel="polynomial",  # 或 "gaussian"
        degree=2,
        bandwidth=1.0,
        weight_function=lambda x: np.power(x, 1)  # 持久度加权
    ))
])
 
# 拟合并转换
X_topo = pipeline.fit_transform(diagrams)
print(f"拓扑特征维度: {X_topo.shape}")

4. Giotto-tda集成

4.1 sklearn风格API

from gtda.homology import (
    VietorisRipsPersistence,
    CubicalPersistence,
    WeakAlphaPersistence
)
from gtda.diagrams import (
    PersistenceScaler,
    Filtering,
    BettiNumbers,
    Entropy,
    HeatKernel
)
from sklearn.pipeline import Pipeline
import numpy as np
 
# 创建持久同调计算管道
persistence_pipeline = Pipeline([
    # 持久同调计算
    ('vr', VietorisRipsPersistence(
        metric='euclidean',
        max_edge_length=1.0,
        max_dim=2,
        n_jobs=-1  # 并行计算
    )),
    
    # 缩放持久度
    ('scaler', PersistenceScaler(
        n_jobs=-1
    )),
    
    # 过滤噪声(短持久度特征)
    ('filter', Filtering(
        method='trim',
        threshold=0.1  # 移除持久度<0.1的特征
    ))
])
 
# 处理数据
X = np.random.random((100, 20, 2))  # 100个样本,每个20个2D点
X_persistence = persistence_pipeline.fit_transform(X)
 
print(f"持久图维度: {X_persistence.shape}")  # (100, n_points, 3)

4.2 特征提取

from gtda.diagrams import (
    BettiNumbers,
    Entropy,
    HeatKernel,
    PersistenceLandscape,
    Silhouette
)
 
# 创建特征提取器
feature_extractors = [
    ('betti', BettiNumbers(n_bins=10)),
    ('entropy', Entropy(n_bins=10, normalized=False)),
    ('landscape', PersistenceLandscape(n_bins=100, n_layers=3)),
    ('silhouette', Silhouette(n_bins=100, power=2)),
    ('heat', HeatKernel(sigma=0.1, n_bins=100))
]
 
# 提取多个特征
features = []
feature_names = []
 
for name, extractor in feature_extractors:
    X_feat = extractor.fit_transform(X_persistence)
    features.append(X_feat)
    feature_names.append(name)
    print(f"{name}: {X_feat.shape}")
 
# 拼接所有特征
X_combined = np.hstack(features)
print(f"组合特征维度: {X_combined.shape}")

4.3 与sklearn分类器结合

from gtda.pipeline import make_pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
 
# 完整的分类管道
classification_pipeline = make_pipeline(
    VietorisRipsPersistence(metric='euclidean', max_dim=1),
    PersistenceScaler(),
    Filtering(method='trim', threshold=0.05),
    BettiNumbers(n_bins=10),
    RandomForestClassifier(n_estimators=100, random_state=42)
)
 
# 准备数据
X_train, X_test, y_train, y_test = train_test_split(
    point_clouds, labels, test_size=0.2
)
 
# 训练
classification_pipeline.fit(X_train, y_train)
 
# 预测
y_pred = classification_pipeline.predict(X_test)
 
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"分类准确率: {accuracy:.4f}")

5. 深度学习集成

5.1 PyTorch几何集成

import torch
import torch.nn as nn
from torch_geometric.nn import MessagePassing, global_mean_pool
 
class TopologicalMessagePassing(MessagePassing):
    """
    拓扑感知消息传递层
    """
    
    def __init__(self, in_channels, out_channels):
        super().__init__(aggr='add')
        self.lin = nn.Linear(in_channels, out_channels)
        self.edge_lin = nn.Linear(in_channels * 2, out_channels)
    
    def forward(self, x, edge_index, edge_attr=None):
        # x: 节点特征
        # edge_index: 边索引
        # edge_attr: 边特征(可选,包含拓扑信息)
        
        return self.propagate(edge_index, x=x, edge_attr=edge_attr)
    
    def message(self, x_i, x_j, edge_attr):
        # 消息:源节点特征 + 边特征
        if edge_attr is not None:
            return self.edge_lin(torch.cat([x_i, edge_attr], dim=-1))
        return self.lin(x_j)
 
class TopoGNN(nn.Module):
    """
    拓扑感知图神经网络
    """
    
    def __init__(self, node_dim, edge_dim, hidden_dim, out_dim):
        super().__init__()
        
        self.conv1 = TopologicalMessagePassing(node_dim, hidden_dim)
        self.conv2 = TopologicalMessagePassing(hidden_dim, hidden_dim)
        self.conv3 = TopologicalMessagePassing(hidden_dim, hidden_dim)
        
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        )
    
    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        edge_attr = data.edge_attr if hasattr(data, 'edge_attr') else None
        
        # 拓扑感知消息传递
        x = torch.relu(self.conv1(x, edge_index, edge_attr))
        x = torch.relu(self.conv2(x, edge_index, edge_attr))
        x = torch.relu(self.conv3(x, edge_index, edge_attr))
        
        # 图级别池化
        x = global_mean_pool(x, data.batch)
        
        return self.classifier(x)

5.2 可微分持久同调

import torch
import torch.nn as nn
 
class SoftPersistenceDiagram(nn.Module):
    """
    软化持久同调层
    近似可微分的持久同调计算
    """
    
    def __init__(self, max_samples=100, temperature=0.1):
        super().__init__()
        self.max_samples = max_samples
        self.temperature = temperature
    
    def forward(self, points):
        """
        points: (batch, n_points, dim)
        返回: (batch, n_features)
        """
        batch_size = points.shape[0]
        device = points.device
        
        # 计算成对距离矩阵
        diff = points.unsqueeze(2) - points.unsqueeze(1)  # (B, N, N, D)
        dist = torch.norm(diff, dim=-1)  # (B, N, N)
        
        # 软化排序(可微分)
        # 使用Gumbel-Softmax近似排序
        weights = torch.softmax(-dist / self.temperature, dim=-1)
        
        # 估计持久度统计量
        features = []
        for b in range(batch_size):
            d = dist[b]  # (N, N)
            w = weights[b]  # (N, N)
            
            # 加权距离
            weighted_dist = (w * d).sum()
            
            # 邻域大小
            neighborhood = (d < d.median()).float().sum()
            
            features.append([weighted_dist, neighborhood])
        
        return torch.tensor(features, device=device)
 
class TopoDeepLayer(nn.Module):
    """
    拓扑深度学习层
    结合PH特征和深度特征
    """
    
    def __init__(self, deep_dim, topo_dim, hidden_dim):
        super().__init__()
        
        self.deep_mlp = nn.Linear(deep_dim, hidden_dim)
        self.topo_mlp = nn.Linear(topo_dim, hidden_dim)
        
        self.fusion = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU()
        )
    
    def forward(self, deep_features, topo_features):
        deep = self.deep_mlp(deep_features)
        topo = self.topo_mlp(topo_features)
        
        combined = torch.cat([deep, topo], dim=-1)
        return self.fusion(combined)

5.3 端到端训练

import torch
from torch.utils.data import Dataset, DataLoader
 
class TopoPointCloudDataset(Dataset):
    """
    拓扑点云数据集
    """
    
    def __init__(self, point_clouds, labels, compute_topo=True):
        self.point_clouds = point_clouds
        self.labels = labels
        self.compute_topo = compute_topo
        
        if compute_topo:
            from ripser import ripser
            self.topo_features = []
            for pc in point_clouds:
                result = ripser(pc, maxdim=1, thresh=1.0)
                dgm = result['dgms'][1]
                # 提取持久度统计
                if len(dgm) > 0 and dgm[0, 1] < float('inf'):
                    pers = dgm[:, 1] - dgm[:, 0]
                    feat = [len(pers), pers.mean(), pers.max(), pers.std()]
                else:
                    feat = [0, 0, 0, 0]
                self.topo_features.append(feat)
    
    def __len__(self):
        return len(self.point_clouds)
    
    def __getitem__(self, idx):
        pc = torch.tensor(self.point_clouds[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        if self.compute_topo:
            topo = torch.tensor(self.topo_features[idx], dtype=torch.float32)
            return pc, topo, label
        return pc, label
 
def train_topo_model():
    """
    端到端拓扑感知模型训练
    """
    # 创建数据集
    dataset = TopoPointCloudDataset(point_clouds, labels)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
    
    # 创建模型
    model = TopoGNN(
        node_dim=3,
        edge_dim=1,
        hidden_dim=128,
        out_dim=num_classes
    )
    
    # 优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()
    
    # 训练循环
    model.train()
    for epoch in range(100):
        total_loss = 0
        for batch in dataloader:
            if len(batch) == 3:
                pcs, topo_features, labels = batch
            else:
                pcs, labels = batch
                topo_features = None
            
            optimizer.zero_grad()
            
            # 前向传播
            # 这里需要将点云转换为图格式
            # 简化示例
            out = model(pcs)
            loss = criterion(out, labels)
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Loss={total_loss:.4f}")
    
    return model

6. 可视化工具

6.1 Mapper算法

import numpy as np
from kmapper import KeplerMapper
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
 
# 创建Mapper
mapper = KeplerMapper(verbose=0)
 
# 准备数据
data = np.random.random((1000, 20))
data = StandardScaler().fit_transform(data)
 
# 拟合并映射
lens = mapper.fit_transform(data, projection='umap')
graph = mapper.fit_transform(
    data,
    lens=lens,
    cover=km.Cover(n_cubes=15, perc_overlap=0.5),
    clusterer=km.LenslessCmeans(n_clusters=5)
)
 
# 可视化
mapper.visualize(
    graph,
    path_html="mapper_output.html",
    title="Topological Data Exploration"
)

6.2 持久图可视化

import matplotlib.pyplot as plt
from persim import plot_diagrams
from ripser import ripser
 
# 计算持久同调
data = generate_complex_shape()
result = ripser(data, maxdim=2)
 
# 持久图
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
 
for i in range(3):
    dgm = result['dgms'][i]
    axes[i].scatter(dgm[:, 0], dgm[:, 1])
    axes[i].plot([0, dgm[:, 1].max()], [0, dgm[:, 1].max()], 'k--')
    axes[i].set_xlabel('Birth')
    axes[i].set_ylabel('Death')
    axes[i].set_title(f'Persistence Diagram (dim={i})')
 
plt.tight_layout()
plt.savefig('persistence_diagrams.png')

7. 性能优化

7.1 子采样策略

import numpy as np
from ripser import ripser
 
def subsample_persistent_homology(points, n_samples=1000, seed=42):
    """
    大规模数据的持久同调计算
    使用随机子采样
    """
    n_points = len(points)
    
    if n_points <= n_samples:
        return ripser(points, maxdim=2)
    
    np.random.seed(seed)
    indices = np.random.choice(n_points, n_samples, replace=False)
    sampled_points = points[indices]
    
    return ripser(sampled_points, maxdim=2)
 
def adaptive_subsample(points, base_n=500, max_iterations=5):
    """
    自适应子采样
    根据结果稳定性决定是否增加样本
    """
    prev_persistence = None
    
    for n in [base_n * (2**i) for i in range(max_iterations)]:
        result = subsample_persistent_homology(points, n_samples=n)
        current_persistence = result['dgms'][1][:, 1] - result['dgms'][1][:, 0]
        
        if prev_persistence is not None:
            # 检查收敛性
            if np.abs(current_persistence.mean() - prev_persistence.mean()) < 0.01:
                return result
        
        prev_persistence = current_persistence
    
    return result

7.2 并行计算

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import numpy as np
from ripser import ripser
 
def parallel_persistence_computation(data_list, n_workers=4):
    """
    并行计算多个数据集的持久同调
    """
    # 使用进程池(CPU密集型)
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(
            lambda d: ripser(d, maxdim=2),
            data_list
        ))
    return results
 
def batch_with_progress(data_list, batch_size=100):
    """
    分批处理大数据
    """
    results = []
    for i in range(0, len(data_list), batch_size):
        batch = data_list[i:i+batch_size]
        batch_results = parallel_persistence_computation(batch)
        results.extend(batch_results)
        print(f"Processed {min(i+batch_size, len(data_list))}/{len(data_list)}")
    return results

8. 实践项目建议

8.1 入门项目

  1. 持久同调可视化:对不同形状数据计算并可视化
  2. 拓扑特征分类:将PH特征加入传统ML管道
  3. Mapper探索:对高维数据使用Mapper进行降维可视化

8.2 进阶项目

  1. 拓扑增强GNN:在图神经网络中加入拓扑特征
  2. 拓扑约束生成:强制生成数据保持特定拓扑
  3. 动态拓扑追踪:时变数据的拓扑分析

8.3 研究项目

  1. 可微分PH层:设计端到端可训练的拓扑层
  2. 拓扑Transformer:融合注意力与拓扑归纳偏置
  3. 拓扑强化学习:状态空间的拓扑先验

参考文献


相关文档