图模型结构学习

概述

图模型结构学习(Structure Learning)旨在从数据中自动发现变量之间的条件依赖关系,构建最优的图结构12。这在生物信息学(基因调控网络)、因果发现、金融网络分析等领域有重要应用。

核心挑战:结构学习的搜索空间是超指数级的(),需要在表达能力和计算效率之间权衡。


1. 结构学习问题定义

1.1 基本设定

给定数据集 ,其中每个样本 维随机向量。

目标:学习最优的有向无环图(DAG)结构 ,使得:

1.2 贝叶斯方法 vs 频率方法

方法目标优点缺点
贝叶斯自然先验计算复杂
评分搜索最大化评分函数灵活可能陷入局部最优
约束满足满足条件独立性测试理论基础强对噪声敏感

1.3 结构学习的挑战

  1. 计算复杂度:DAG空间的搜索是NP难问题
  2. 等价类问题:不同DAG可能编码相同的条件独立性
  3. 样本复杂度:需要足够数据才能可靠检测依赖关系
  4. 计算-效率权衡:精确算法不可扩展,近似方法需要权衡

2. 基于约束的方法

2.1 PC算法原理

PC算法通过条件独立性测试逐步构建图结构3

核心思想:从完全图开始,移去可以通过条件独立性测试解释的边。

2.2 条件独立性测试

对于连续数据,使用偏相关(Partial Correlation):

零假设

使用Fisher’s z变换进行统计检验:

2.3 PC算法步骤

import numpy as np
from scipy import stats
from itertools import combinations
 
class PCAlgorithm:
    """PC算法实现"""
    
    def __init__(self, alpha=0.05):
        self.alpha = alpha
        self.n = None
        self.data = None
        self.adj_matrix = None
        self.sepsets = {}  # 分离集
    
    def fit(self, data):
        """
        运行PC算法
        
        参数:
            data: 数据矩阵 [N, n]
        """
        self.data = data
        self.n = data.shape[1]
        
        # 初始化:完全无向图
        self.adj_matrix = np.ones((self.n, self.n)) - np.eye(self.n)
        
        # 逐步移除边
        depth = 0
        while True:
            removed = False
            
            for i in range(self.n):
                for j in range(i+1, self.n):
                    if self.adj_matrix[i, j] == 0:
                        continue
                    
                    # 获取i和j的邻居(排除彼此)
                    neighbors_i = self._get_neighbors(i, j)
                    neighbors_j = self._get_neighbors(j, i)
                    
                    if len(neighbors_i) < depth and len(neighbors_j) < depth:
                        continue
                    
                    # 测试所有大小为depth的条件集
                    for k in range(depth + 1):
                        for subset_i in combinations(neighbors_i, min(k, len(neighbors_i))):
                            for subset_j in combinations(neighbors_j, min(k, len(neighbors_j))):
                                cond_set = list(set(subset_i) | set(subset_j))
                                
                                if len(cond_set) != depth:
                                    continue
                                
                                if self._is_conditionally_independent(i, j, cond_set):
                                    self.adj_matrix[i, j] = self.adj_matrix[j, i] = 0
                                    self.sepsets[(i, j)] = self.sepsets[(j, i)] = set(cond_set)
                                    removed = True
                                    break
                                    
            if not removed and depth > np.max(np.sum(self.adj_matrix)):
                break
            depth += 1
        
        return self.adj_matrix
    
    def _get_neighbors(self, i, exclude):
        """获取节点的邻居(排除指定节点)"""
        neighbors = np.where(self.adj_matrix[i] == 1)[0]
        return [n for n in neighbors if n != exclude]
    
    def _is_conditionally_independent(self, i, j, cond_set):
        """条件独立性测试"""
        if len(cond_set) == 0:
            corr, _ = stats.pearsonr(self.data[:, i], self.data[:, j])
            p_value = 2 * (1 - stats.norm.cdf(abs(corr)))
            return p_value > self.alpha
        
        # 偏相关检验
        rho = self._partial_correlation(i, j, list(cond_set))
        n = len(self.data)
        z = 0.5 * np.log((1 + rho) / (1 - rho))
        se = 1 / np.sqrt(n - len(cond_set) - 3)
        z_stat = z / se
        p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
        
        return p_value > self.alpha
    
    def _partial_correlation(self, i, j, cond_set):
        """计算偏相关系数"""
        if len(cond_set) == 0:
            return np.corrcoef(self.data[:, i], self.data[:, j])[0, 1]
        
        # 使用回归残差计算偏相关
        from sklearn.linear_model import LinearRegression
        
        # 对x_i关于条件集回归
        lr_i = LinearRegression().fit(self.data[:, cond_set], self.data[:, i])
        res_i = self.data[:, i] - lr_i.predict(self.data[:, cond_set])
        
        # 对x_j关于条件集回归
        lr_j = LinearRegression().fit(self.data[:, cond_set], self.data[:, j])
        res_j = self.data[:, j] - lr_j.predict(self.data[:, cond_set])
        
        return np.corrcoef(res_i, res_j)[0, 1]

2.4 PC算法的局限性

问题影响
单一条件集大小限制可能遗漏高阶条件独立
顺序依赖不同边处理顺序可能影响结果
对噪声敏感小样本可能导致错误判断

3. 基于评分的方法

3.1 评分函数设计

评分函数分解为结构评分 + 局部评分

其中 是节点 在图 中的父节点集。

3.2 常用评分函数

BIC/MDL评分

其中 是模型的自由参数数量。

对于贝叶斯网络:

BDeu评分

BDeu(Bayesian Dirichlet equivalent uniform)使用等权先验:

其中

3.3 贪婪搜索实现

from sklearn.metrics import mutual_info_score
 
class GreedyStructureSearch:
    """贪婪结构搜索"""
    
    def __init__(self, n_nodes, maxParents=3, score='bic'):
        self.n_nodes = n_nodes
        self.maxParents = maxParents
        self.score = score
        self.adj_matrix = np.zeros((n_nodes, n_nodes))
    
    def _local_score(self, node, parents, data):
        """计算节点的条件评分"""
        if len(parents) == 0:
            parents = []
        
        # 简化的贝叶斯评分
        n = len(data)
        k = len(np.unique(data[:, node]))  # 节点取值数
        q = np.prod([len(np.unique(data[:, p])) for p in parents]) if parents else 1
        
        # 计算似然
        if len(parents) == 0:
            counts = np.bincount(data[:, node], minlength=k)
            log_lik = np.sum(counts * np.log((counts + 1) / (n + k)))
        else:
            # 简化实现
            log_lik = -len(parents) * 0.5 * np.log(n)
        
        # BIC惩罚
        dim = (k - 1) * q if q > 0 else (k - 1)
        penalty = 0.5 * dim * np.log(n)
        
        return log_lik - penalty
    
    def _get_parents(self, node):
        """获取节点的父节点"""
        return [i for i in range(self.n_nodes) if self.adj_matrix[i, node] == 1]
    
    def _get_non_parents(self, node):
        """获取可能的非父节点"""
        parents = set(self._get_parents(node))
        candidates = []
        
        for i in range(self.n_nodes):
            if i != node and i not in parents:
                # 检查是否形成环
                if self._would_create_cycle(i, node):
                    continue
                # 检查父节点数量限制
                if len(parents) >= self.maxParents:
                    continue
                candidates.append(i)
        
        return candidates
    
    def _would_create_cycle(self, i, j):
        """检查添加边 i->j 是否形成环"""
        # DFS检查从j能否到达i
        visited = set()
        stack = [j]
        
        while stack:
            current = stack.pop()
            if current == i:
                return True
            if current in visited:
                continue
            visited.add(current)
            stack.extend([k for k in range(self.n_nodes) 
                         if self.adj_matrix[k, current] == 1])
        
        return False
    
    def fit(self, data, max_iter=100):
        """贪婪搜索"""
        current_score = self._compute_total_score(data)
        
        for iteration in range(max_iter):
            improved = False
            
            # 尝试添加、删除或反转每条边
            for i in range(self.n_nodes):
                for j in range(self.n_nodes):
                    if i == j:
                        continue
                    
                    if self.adj_matrix[i, j] == 1:
                        # 尝试删除边 i->j
                        self.adj_matrix[i, j] = 0
                        new_score = self._compute_total_score(data)
                        
                        if new_score > current_score:
                            current_score = new_score
                            improved = True
                        else:
                            self.adj_matrix[i, j] = 1
                    else:
                        # 尝试添加边 i->j
                        if self._would_create_cycle(i, j):
                            continue
                        
                        self.adj_matrix[i, j] = 1
                        new_score = self._compute_total_score(data)
                        
                        if new_score > current_score:
                            current_score = new_score
                            improved = True
                        else:
                            self.adj_matrix[i, j] = 0
            
            if not improved:
                break
        
        return self.adj_matrix
    
    def _compute_total_score(self, data):
        """计算总评分"""
        total = 0
        for i in range(self.n_nodes):
            parents = self._get_parents(i)
            total += self._local_score(i, parents, data)
        return total

4. 高斯图模型与Graphical Lasso

4.1 问题定义

对于连续变量,假设数据来自多元高斯分布:

图的边由精度矩阵(协方差矩阵的逆) 决定:

  • :节点 条件独立
  • :存在边

4.2 Graphical Lasso

Graphical Lasso通过最大化 正则化的对数似然来估计稀疏精度矩阵4

其中 是样本协方差矩阵, 是正则化参数。

4.3 Python实现

from sklearn.covariance import GraphicalLasso, GraphicalLassoCV
import numpy as np
 
def learn_gaussian_graph_structure(data, alpha=0.01, method='glasso'):
    """
    学习高斯图模型结构
    
    参数:
        data: 数据矩阵 [N, n]
        alpha: 正则化参数
        method: 'glasso' 或 'mb' (邻域选择)
    """
    n_samples, n_features = data.shape
    
    if method == 'glasso':
        # Graphical Lasso
        model = GraphicalLasso(alpha=alpha, max_iter=1000)
        model.fit(data)
        
        # 提取图结构
        precision = model.precision_
        adj_matrix = (np.abs(precision) > 1e-6).astype(int)
        np.fill_diagonal(adj_matrix, 0)
        
        return adj_matrix, precision, model
    
    elif method == 'mb':
        # 邻域选择方法
        adj_matrix = np.zeros((n_features, n_features))
        precision = np.zeros((n_features, n_features))
        
        from sklearn.linear_model import Lasso
        
        # 对每个节点单独回归
        for i in range(n_features):
            y = data[:, i]
            X = np.delete(data, i, axis=1)
            
            # L1正则化回归
            lasso = Lasso(alpha=alpha, max_iter=5000)
            lasso.fit(X, y)
            
            # 非零系数表示有边
            coefs = lasso.coef_
            non_zero = np.abs(coefs) > 1e-6
            
            # 填充邻接矩阵
            for j, nz in enumerate(non_zero):
                if nz:
                    actual_j = j if j < i else j + 1
                    adj_matrix[actual_j, i] = adj_matrix[i, actual_j] = 1
        
        return adj_matrix, precision, None
 
# 使用示例
np.random.seed(42)
n_samples, n_features = 200, 10
 
# 生成稀疏精度矩阵
true_precision = np.random.randn(n_features, n_features) * 0.5
true_precision = (true_precision + true_precision.T) / 2
np.fill_diagonal(true_precision, 1)
# 使其稀疏
true_precision = true_precision * (np.random.rand(n_features, n_features) > 0.7)
 
# 生成数据
cov = np.linalg.inv(true_precision)
data = np.random.multivariate_normal(np.zeros(n_features), cov, n_samples)
 
# 学习结构
adj_matrix, precision, model = learn_gaussian_graph_structure(data, alpha=0.1)
 
print("估计的邻接矩阵:")
print(adj_matrix.astype(int))

5. 搜索与优化的高级方法

5.1 整数规划方法

将结构学习表述为整数规划:

其中 是边指示变量, 是边权重。

5.2 动态规划

对于树结构或低树宽的图,动态规划可以找到全局最优:

def dp_structure_learning(data, tree_width):
    """
    动态规划结构学习
    
    适用于树宽较小的图
    """
    # 实现基于变量消元的动态规划
    pass

5.3 分数边界的利用

贪婪边添加的下界

利用此性质可以剪枝搜索空间。


6. 因果发现与结构学习

6.1 因果马尔可夫假设

在满足因果马尔可夫假设的图中:

  • 每个节点条件独立于其非后裔给定其父节点

6.2 Faithfulness假设

这确保条件独立测试能够正确反映图结构。

6.3 从观测数据到因果图

方法要求输出
PC算法FaithfulnessMarkov等价类
GES评分函数最优DAG
FCIFaithfulness + Causal sufficiency可能因果图

7. 实践指南

7.1 方法选择

场景推荐方法
小样本、离散数据PC算法
大样本、连续数据Graphical Lasso
需要完整DAGGES
高维稀疏GLasso + 交叉验证
因果发现PC + FCI

7.2 参数选择

参数选择建议
显著性水平 (PC)0.01-0.05
正则化 (GLasso)交叉验证选择
最大父节点数数据维度函数

7.3 评估方法

def evaluate_structure(true_adj, est_adj):
    """评估结构学习结果"""
    from sklearn.metrics import precision_score, recall_score, f1_score
    
    # 将邻接矩阵转换为边集合
    true_edges = set(zip(*np.where(true_adj)))
    est_edges = set(zip(*np.where(est_adj)))
    
    tp = len(true_edges & est_edges)
    fp = len(est_edges - true_edges)
    fn = len(true_edges - est_edges)
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    # 结构汉明距离
    shd = fp + fn + len(true_edges.symmetric_difference(est_edges)) // 2
    
    return {
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'shd': shd  # Structural Hamming Distance
    }

8. 总结

方法对比

方法类型优点缺点
PC约束理论基础强对噪声敏感
GES评分可扩展可能局部最优
Graphical Lasso连续高效仅无向图
IP精确全局最优计算昂贵
爬山近似简单可能局部最优

未来方向

  • 深度结构学习:神经网络学习图生成
  • 因果发现集成:结合多种方法
  • 在线结构学习:流数据上的自适应

参考资料


相关主题

Footnotes

  1. Chickering, D. M. (2002). “Optimal Structure Identification with Greedy Search.” JMLR.

  2. Koller, D., & Friedman, N. (2009). “Probabilistic Graphical Models.” MIT Press.

  3. Spirtes, P., Glymour, C., & Scheines, R. (2000). “Causation, Prediction, and Search.” MIT Press.

  4. Friedman, J., Hastie, T., & Tibshirani, R. (2008). “Sparse Inverse Covariance Estimation with the Graphical Lasso.” Biostatistics.