概述

因果发现(Causal Discovery)旨在从观测数据中自动推断变量之间的因果关系,是因果推断与机器学习交叉的前沿领域。本章介绍主要的因果发现方法,包括基于约束的方法(PC算法、FCI算法)、基于分数的方法(GES算法)以及函数因果模型。1


问题定义

从观测数据恢复因果结构

给定一组变量 的观测数据,目标是恢复它们之间的因果关系图。

核心挑战:

  1. 因果方向通常无法从观测分布唯一确定(Markov等价类)
  2. 需要在效率与准确性之间权衡
  3. 隐性混杂因素可能导致错误结论

因果充分性假设

因果充分性(Causal Sufficiency): 不存在未观测的混杂因素影响任何一对变量。

当此假设不成立时,需要使用FCI等处理潜在混杂的算法。


因果马尔可夫条件与忠诚性

因果马尔可夫条件(Causal Markov Condition)

在因果图中,每个节点在其父节点条件下独立于所有非后代节点:

含义: 知道变量的直接原因后,不需要知道其他信息来预测它。

忠诚性(Faithfulness)

如果条件独立性关系在数据中成立,则它们必然由因果图结构导致:

意义: 避免虚假条件独立性被图的结构掩盖。


基于约束的方法

PC算法

PC算法(Spirtes-Glymour-Scheines)是最经典的因果发现算法,通过条件独立性检验逐步构建骨架图,然后定向边。

算法步骤

PC Algorithm:
1. 从完全无向图开始
2. 对每个相邻节点对(X, Y):
   - 移除边如果存在分离集 S ⊂ Adj(X) \ {Y}
     使得 X ⊥ Y | S
3. 对每条边,根据d-分离定向其他边
4. 返回马尔可夫等价类
import numpy as np
from scipy import stats
from itertools import combinations
 
class PCAlgorithm:
    """
    PC算法实现(简化版)
    """
    
    def __init__(self, data, alpha=0.05):
        """
        data: (n_samples, n_variables) 观测数据
        alpha: 独立性检验显著性水平
        """
        self.data = data
        self.alpha = alpha
        self.n_samples, self.n_vars = data.shape
        self.var_names = [f"X{i}" for i in range(self.n_vars)]
    
    def partial_correlation(self, i, j, S):
        """
        计算X_i和X_j在给定集合S下的偏相关系数
        """
        if len(S) == 0:
            return stats.pearsonr(self.data[:, i], self.data[:, j])[0]
        
        # 回归残差法计算偏相关
        from sklearn.linear_model import LinearRegression
        
        # X_i 对 S 回归
        lr_i = LinearRegression().fit(self.data[:, list(S)], self.data[:, i])
        r_i = self.data[:, i] - lr_i.predict(self.data[:, list(S)])
        
        # X_j 对 S 回归
        lr_j = LinearRegression().fit(self.data[:, list(S)], self.data[:, j])
        r_j = self.data[:, j] - lr_j.predict(self.data[:, list(S)])
        
        return stats.pearsonr(r_i, r_j)[0]
    
    def conditional_independence_test(self, i, j, S):
        """
        条件独立性检验 H0: X_i ⊥ X_j | S
        """
        rho = self.partial_correlation(i, j, S)
        n = self.n_samples
        
        # Fisher z-transform
        if abs(rho) >= 1 - 1e-10:
            return False, 0
        
        z = 0.5 * np.log((1 + rho) / (1 - rho))
        se = 1 / np.sqrt(n - len(S) - 3)
        z_stat = abs(z / se)
        
        # 双侧检验
        p_value = 2 * (1 - stats.norm.cdf(z_stat))
        return p_value > self.alpha, p_value
    
    def pc_step(self, sep_set, depth):
        """
        PC算法的单个深度步骤
        """
        edges_to_check = self.get_undirected_edges()
        remove_edges = set()
        
        for (i, j) in edges_to_check:
            adj_i = self.get_adjacent(j, sep_set) - {j}
            
            # 尝试不同大小的条件集
            for size in range(depth + 1):
                for S in combinations(adj_i, size):
                    S = set(S)
                    is_indep, p = self.conditional_independence_test(i, j, S)
                    
                    if is_indep:
                        # 记录分离集
                        sep_set[(i, j)] = S
                        sep_set[(j, i)] = S
                        remove_edges.add((i, j))
                        remove_edges.add((j, i))
                        break
                else:
                    continue
                break
        
        return remove_edges
    
    def orient_edges(self, sep_set):
        """
        边定向(基于v-结构检测)
        """
        # 检测 v-结构: X → Z ← Y, X ⊥ Y | ∅
        v_structures = []
        
        for z in range(self.n_vars):
            neighbors = self.get_adjacent(z, sep_set)
            for (x, y) in combinations(neighbors, 2):
                if (x, y) not in sep_set and (y, x) not in sep_set:
                    # 检测X-Y是否在Z的d-分离集中
                    if x not in sep_set.get((z, y), set()) and \
                       y not in sep_set.get((z, x), set()):
                        v_structures.append((x, z, y))
        
        return v_structures
    
    def run(self, max_depth=3):
        """
        运行PC算法
        """
        # 步骤1-2: 骨架学习
        sep_set = {}
        
        for depth in range(max_depth + 1):
            removed = self.pc_step(sep_set, depth)
            if not removed:
                break
        
        # 步骤3: 边定向
        v_structures = self.orient_edges(sep_set)
        
        return {
            'skeleton': self.get_skeleton(),
            'v_structures': v_structures,
            'sep_sets': sep_set
        }

PC算法的局限性

问题说明
Markov等价类PC算法只能恢复马尔可夫等价类,方向不可区分
高维稀疏性条件独立性检验在变量多时功效下降
连续变量假设线性、高斯关系
非线性无法处理非线性因果机制

FCI算法:处理潜在混杂

扩展的条件独立性

当存在未观测混杂时,PC算法的假设不成立。FCI算法引入可能必然连接(Possible-D-SEP)概念。

FCI的核心改进

  1. 骨架构建: 使用更保守的独立性检验
  2. 方向规则: 区分确定方向与可能方向
  3. 额外边标记: 标注可能存在的未观测混杂
class FCIAlgorithm:
    """
    FCI算法实现框架
    """
    
    def run(self, data, alpha=0.05):
        """
        FCI主流程
        """
        # 1. PC骨架(保守版)
        pc = PCAlgorithm(data, alpha)
        skeleton = pc.run(max_depth=3)
        
        # 2. 可能必然后继(POSS)计算
        poss = self.compute_possible_ancestors(skeleton)
        
        # 3. 定向(考虑潜在混杂)
        oriented = self.orient_with_latent(skeleton, poss)
        
        return oriented

基于分数的方法:GES算法

GES算法通过优化评分函数搜索最优的马尔可夫等价类。

BDeu评分(Baysian Dirichlet equivalent uniform):

其中 是父节点配置的样本数,的样本数。

from sklearn.metrics import bic_score
import networkx as nx
 
class GESAlgorithm:
    """
    GES算法实现
    """
    
    def __init__(self, data):
        self.data = data
        self.n_samples, self.n_vars = data.shape
    
    def local_score(self, X, Y, Z):
        """
        计算局部评分变化
        使用BIC近似
        """
        # 简化的BIC评分
        if Z is None:
            n = len(X)
            r2 = np.corrcoef(X, Y)[0, 1]**2
            if r2 == 1:
                return 0
            bic = -n/2 * np.log(1 - r2)
            return bic
        else:
            # 偏相关BIC
            pass
    
    def greedy_search(self):
        """
        贪婪前向搜索 + 贪婪后向搜索
        """
        graph = nx.Graph()
        graph.add_nodes_from(range(self.n_vars))
        
        # 前向搜索
        improved = True
        while improved:
            improved = False
            best_score = self.compute_total_score(graph)
            
            # 尝试添加每条边
            for i in range(self.n_vars):
                for j in range(i+1, self.n_vars):
                    if not graph.has_edge(i, j):
                        # 尝试添加边
                        graph.add_edge(i, j)
                        score = self.compute_total_score(graph)
                        
                        if score > best_score:
                            best_score = score
                            improved = True
                        else:
                            graph.remove_edge(i, j)
        
        # 后向搜索(类似)
        return graph

函数因果模型

线性非高斯无环模型(LiNGAM)

当噪声项为非高斯分布时,线性因果模型的方向可以唯一确定。

模型:

其中 是严格下三角的系数矩阵, 是非高斯噪声向量。

from sklearn.decomposition import FastICA
import numpy as np
 
class LiNGAM:
    """
    DirectLiNGAM算法实现
    """
    
    def fit(self, X):
        """
        X: (n_samples, n_variables)
        """
        n = X.shape[1]
        
        # 1. 中心化和白化
        X_centered = X - X.mean(axis=0)
        cov = np.cov(X_centered.T)
        _, V = np.linalg.eigh(cov)
        W = V.T
        X_white = X_centered @ W.T
        
        # 2. ICA分解
        ica = FastICA(n_components=n, random_state=42)
        S = ica.fit_transform(X_white)
        W_ica = ica.mixing_
        
        # 3. 估计因果顺序
        # 回归残差法
        causal_order = []
        remaining = set(range(n))
        
        for _ in range(n):
            scores = []
            for i in remaining:
                # 用其他变量回归i
                mask = list(remaining - {i})
                from sklearn.linear_model import LinearRegression
                lr = LinearRegression().fit(X_white[:, mask], X_white[:, i])
                residuals = X_white[:, i] - lr.predict(X_white[:, mask])
                
                # 非高斯性评分
                score = np.abs(stats.kurtosis(residuals))
                scores.append((score, i))
            
            # 选择最小非高斯性的变量作为原因
            scores.sort()
            _, selected = scores[0]
            causal_order.append(selected)
            remaining.remove(selected)
        
        return causal_order

非线性因果发现

后非线性模型(Post-Nonlinear Model):

其中 是可逆的非线性变换, 是线性混合, 是噪声。


因果发现与机器学习的结合

因果发现的应用场景

领域应用方法选择
生物医学基因调控网络LiNGAM, PC
经济学政策效果评估IV, RDD
计算机视觉视觉因果结构化因果模型
强化学习世界模型因果表示学习

最新进展:因果表示学习

目标: 从高维数据中同时学习因果表示和因果机制。

传统方法:                    因果表示学习:
                                
低维变量 → 因果发现          高维数据(图像/文本)
                             ↓
                             因果表示 φ(X)
                             ↓
                             因果图 + 机制

代码实现:完整的因果发现流程

import pandas as pd
import numpy as np
from sklearn.datasets import make_linear_ngcmn
 
def full_causal_discovery_pipeline(n_samples=1000, n_vars=5):
    """
    完整的因果发现流程
    """
    # 生成线性非高斯数据
    # X1 = e1
    # X2 = X1 + e2
    # X3 = X1 + e3
    # X4 = X2 + X3 + e4
    # X5 = X3 + e5
    
    np.random.seed(42)
    
    # 生成外生噪声(非高斯)
    from scipy.stats import laplace
    e = laplace.rvs(size=(n_samples, n_vars))
    
    X = np.zeros((n_samples, n_vars))
    X[:, 0] = e[:, 0]  # X1 = e1
    X[:, 1] = X[:, 0] + e[:, 1]  # X2 = X1 + e2
    X[:, 2] = X[:, 0] + e[:, 2]  # X3 = X1 + e3
    X[:, 3] = X[:, 1] + X[:, 2] + e[:, 3]  # X4 = X2 + X3 + e4
    X[:, 4] = X[:, 2] + e[:, 4]  # X5 = X3 + e5
    
    df = pd.DataFrame(X, columns=[f'X{i}' for i in range(1, n_vars+1)])
    
    # 使用不同方法发现因果结构
    print("=" * 50)
    print("1. PC算法结果:")
    pc = PCAlgorithm(df.values, alpha=0.05)
    result = pc.run()
    print(f"V-structures: {result['v_structures']}")
    
    print("=" * 50)
    print("2. LiNGAM结果:")
    lingam = LiNGAM()
    causal_order = lingam.fit(df.values)
    print(f"估计的因果顺序: {[f'X{i+1}' for i in causal_order]}")
    print(f"真实因果顺序: X1 → X2 → X3 → X4 → X5")
    
    return df
 
if __name__ == "__main__":
    df = full_causal_discovery_pipeline()

评估与验证

评估指标

指标含义适用场景
SHD结构汉明距离与真实图比较
MEC大小马尔可夫等价类大小等价类评估
F1-边边的精确率和召回率边预测精度
F1-方向方向预测精度方向准确性

真实数据验证

由于真实因果结构通常未知,验证因果发现算法的常用方法:

  1. 扰动实验: 干预某些变量,验证预测的因果效应
  2. 预测任务: 因果图结构应有助于干预预测
  3. 与领域知识比较: 专家验证

参考资料


相关词条:因果推断基础Do-Calculus与因果效应识别贝叶斯网络

Footnotes

  1. Peters, J., Janzing, D., & Schölkopf, B. “Elements of Causal Inference: Foundations and Learning Algorithms.” MIT Press, 2017.