概述
因果发现(Causal Discovery)旨在从观测数据中自动推断变量之间的因果关系,是因果推断与机器学习交叉的前沿领域。本章介绍主要的因果发现方法,包括基于约束的方法(PC算法、FCI算法)、基于分数的方法(GES算法)以及函数因果模型。1
问题定义
从观测数据恢复因果结构
给定一组变量 的观测数据,目标是恢复它们之间的因果关系图。
核心挑战:
- 因果方向通常无法从观测分布唯一确定(Markov等价类)
- 需要在效率与准确性之间权衡
- 隐性混杂因素可能导致错误结论
因果充分性假设
因果充分性(Causal Sufficiency): 不存在未观测的混杂因素影响任何一对变量。
当此假设不成立时,需要使用FCI等处理潜在混杂的算法。
因果马尔可夫条件与忠诚性
因果马尔可夫条件(Causal Markov Condition)
在因果图中,每个节点在其父节点条件下独立于所有非后代节点:
含义: 知道变量的直接原因后,不需要知道其他信息来预测它。
忠诚性(Faithfulness)
如果条件独立性关系在数据中成立,则它们必然由因果图结构导致:
意义: 避免虚假条件独立性被图的结构掩盖。
基于约束的方法
PC算法
PC算法(Spirtes-Glymour-Scheines)是最经典的因果发现算法,通过条件独立性检验逐步构建骨架图,然后定向边。
算法步骤
PC Algorithm:
1. 从完全无向图开始
2. 对每个相邻节点对(X, Y):
- 移除边如果存在分离集 S ⊂ Adj(X) \ {Y}
使得 X ⊥ Y | S
3. 对每条边,根据d-分离定向其他边
4. 返回马尔可夫等价类
import numpy as np
from scipy import stats
from itertools import combinations
class PCAlgorithm:
"""
PC算法实现(简化版)
"""
def __init__(self, data, alpha=0.05):
"""
data: (n_samples, n_variables) 观测数据
alpha: 独立性检验显著性水平
"""
self.data = data
self.alpha = alpha
self.n_samples, self.n_vars = data.shape
self.var_names = [f"X{i}" for i in range(self.n_vars)]
def partial_correlation(self, i, j, S):
"""
计算X_i和X_j在给定集合S下的偏相关系数
"""
if len(S) == 0:
return stats.pearsonr(self.data[:, i], self.data[:, j])[0]
# 回归残差法计算偏相关
from sklearn.linear_model import LinearRegression
# X_i 对 S 回归
lr_i = LinearRegression().fit(self.data[:, list(S)], self.data[:, i])
r_i = self.data[:, i] - lr_i.predict(self.data[:, list(S)])
# X_j 对 S 回归
lr_j = LinearRegression().fit(self.data[:, list(S)], self.data[:, j])
r_j = self.data[:, j] - lr_j.predict(self.data[:, list(S)])
return stats.pearsonr(r_i, r_j)[0]
def conditional_independence_test(self, i, j, S):
"""
条件独立性检验 H0: X_i ⊥ X_j | S
"""
rho = self.partial_correlation(i, j, S)
n = self.n_samples
# Fisher z-transform
if abs(rho) >= 1 - 1e-10:
return False, 0
z = 0.5 * np.log((1 + rho) / (1 - rho))
se = 1 / np.sqrt(n - len(S) - 3)
z_stat = abs(z / se)
# 双侧检验
p_value = 2 * (1 - stats.norm.cdf(z_stat))
return p_value > self.alpha, p_value
def pc_step(self, sep_set, depth):
"""
PC算法的单个深度步骤
"""
edges_to_check = self.get_undirected_edges()
remove_edges = set()
for (i, j) in edges_to_check:
adj_i = self.get_adjacent(j, sep_set) - {j}
# 尝试不同大小的条件集
for size in range(depth + 1):
for S in combinations(adj_i, size):
S = set(S)
is_indep, p = self.conditional_independence_test(i, j, S)
if is_indep:
# 记录分离集
sep_set[(i, j)] = S
sep_set[(j, i)] = S
remove_edges.add((i, j))
remove_edges.add((j, i))
break
else:
continue
break
return remove_edges
def orient_edges(self, sep_set):
"""
边定向(基于v-结构检测)
"""
# 检测 v-结构: X → Z ← Y, X ⊥ Y | ∅
v_structures = []
for z in range(self.n_vars):
neighbors = self.get_adjacent(z, sep_set)
for (x, y) in combinations(neighbors, 2):
if (x, y) not in sep_set and (y, x) not in sep_set:
# 检测X-Y是否在Z的d-分离集中
if x not in sep_set.get((z, y), set()) and \
y not in sep_set.get((z, x), set()):
v_structures.append((x, z, y))
return v_structures
def run(self, max_depth=3):
"""
运行PC算法
"""
# 步骤1-2: 骨架学习
sep_set = {}
for depth in range(max_depth + 1):
removed = self.pc_step(sep_set, depth)
if not removed:
break
# 步骤3: 边定向
v_structures = self.orient_edges(sep_set)
return {
'skeleton': self.get_skeleton(),
'v_structures': v_structures,
'sep_sets': sep_set
}PC算法的局限性
| 问题 | 说明 |
|---|---|
| Markov等价类 | PC算法只能恢复马尔可夫等价类,方向不可区分 |
| 高维稀疏性 | 条件独立性检验在变量多时功效下降 |
| 连续变量 | 假设线性、高斯关系 |
| 非线性 | 无法处理非线性因果机制 |
FCI算法:处理潜在混杂
扩展的条件独立性
当存在未观测混杂时,PC算法的假设不成立。FCI算法引入可能必然连接(Possible-D-SEP)概念。
FCI的核心改进
- 骨架构建: 使用更保守的独立性检验
- 方向规则: 区分确定方向与可能方向
- 额外边标记: 标注可能存在的未观测混杂
class FCIAlgorithm:
"""
FCI算法实现框架
"""
def run(self, data, alpha=0.05):
"""
FCI主流程
"""
# 1. PC骨架(保守版)
pc = PCAlgorithm(data, alpha)
skeleton = pc.run(max_depth=3)
# 2. 可能必然后继(POSS)计算
poss = self.compute_possible_ancestors(skeleton)
# 3. 定向(考虑潜在混杂)
oriented = self.orient_with_latent(skeleton, poss)
return oriented基于分数的方法:GES算法
贪婪等价搜索(Greedy Equivalence Search)
GES算法通过优化评分函数搜索最优的马尔可夫等价类。
BDeu评分(Baysian Dirichlet equivalent uniform):
其中 是父节点配置的样本数, 是且的样本数。
from sklearn.metrics import bic_score
import networkx as nx
class GESAlgorithm:
"""
GES算法实现
"""
def __init__(self, data):
self.data = data
self.n_samples, self.n_vars = data.shape
def local_score(self, X, Y, Z):
"""
计算局部评分变化
使用BIC近似
"""
# 简化的BIC评分
if Z is None:
n = len(X)
r2 = np.corrcoef(X, Y)[0, 1]**2
if r2 == 1:
return 0
bic = -n/2 * np.log(1 - r2)
return bic
else:
# 偏相关BIC
pass
def greedy_search(self):
"""
贪婪前向搜索 + 贪婪后向搜索
"""
graph = nx.Graph()
graph.add_nodes_from(range(self.n_vars))
# 前向搜索
improved = True
while improved:
improved = False
best_score = self.compute_total_score(graph)
# 尝试添加每条边
for i in range(self.n_vars):
for j in range(i+1, self.n_vars):
if not graph.has_edge(i, j):
# 尝试添加边
graph.add_edge(i, j)
score = self.compute_total_score(graph)
if score > best_score:
best_score = score
improved = True
else:
graph.remove_edge(i, j)
# 后向搜索(类似)
return graph函数因果模型
线性非高斯无环模型(LiNGAM)
当噪声项为非高斯分布时,线性因果模型的方向可以唯一确定。
模型:
其中 是严格下三角的系数矩阵, 是非高斯噪声向量。
from sklearn.decomposition import FastICA
import numpy as np
class LiNGAM:
"""
DirectLiNGAM算法实现
"""
def fit(self, X):
"""
X: (n_samples, n_variables)
"""
n = X.shape[1]
# 1. 中心化和白化
X_centered = X - X.mean(axis=0)
cov = np.cov(X_centered.T)
_, V = np.linalg.eigh(cov)
W = V.T
X_white = X_centered @ W.T
# 2. ICA分解
ica = FastICA(n_components=n, random_state=42)
S = ica.fit_transform(X_white)
W_ica = ica.mixing_
# 3. 估计因果顺序
# 回归残差法
causal_order = []
remaining = set(range(n))
for _ in range(n):
scores = []
for i in remaining:
# 用其他变量回归i
mask = list(remaining - {i})
from sklearn.linear_model import LinearRegression
lr = LinearRegression().fit(X_white[:, mask], X_white[:, i])
residuals = X_white[:, i] - lr.predict(X_white[:, mask])
# 非高斯性评分
score = np.abs(stats.kurtosis(residuals))
scores.append((score, i))
# 选择最小非高斯性的变量作为原因
scores.sort()
_, selected = scores[0]
causal_order.append(selected)
remaining.remove(selected)
return causal_order非线性因果发现
后非线性模型(Post-Nonlinear Model):
其中 是可逆的非线性变换, 是线性混合, 是噪声。
因果发现与机器学习的结合
因果发现的应用场景
| 领域 | 应用 | 方法选择 |
|---|---|---|
| 生物医学 | 基因调控网络 | LiNGAM, PC |
| 经济学 | 政策效果评估 | IV, RDD |
| 计算机视觉 | 视觉因果 | 结构化因果模型 |
| 强化学习 | 世界模型 | 因果表示学习 |
最新进展:因果表示学习
目标: 从高维数据中同时学习因果表示和因果机制。
传统方法: 因果表示学习:
低维变量 → 因果发现 高维数据(图像/文本)
↓
因果表示 φ(X)
↓
因果图 + 机制
代码实现:完整的因果发现流程
import pandas as pd
import numpy as np
from sklearn.datasets import make_linear_ngcmn
def full_causal_discovery_pipeline(n_samples=1000, n_vars=5):
"""
完整的因果发现流程
"""
# 生成线性非高斯数据
# X1 = e1
# X2 = X1 + e2
# X3 = X1 + e3
# X4 = X2 + X3 + e4
# X5 = X3 + e5
np.random.seed(42)
# 生成外生噪声(非高斯)
from scipy.stats import laplace
e = laplace.rvs(size=(n_samples, n_vars))
X = np.zeros((n_samples, n_vars))
X[:, 0] = e[:, 0] # X1 = e1
X[:, 1] = X[:, 0] + e[:, 1] # X2 = X1 + e2
X[:, 2] = X[:, 0] + e[:, 2] # X3 = X1 + e3
X[:, 3] = X[:, 1] + X[:, 2] + e[:, 3] # X4 = X2 + X3 + e4
X[:, 4] = X[:, 2] + e[:, 4] # X5 = X3 + e5
df = pd.DataFrame(X, columns=[f'X{i}' for i in range(1, n_vars+1)])
# 使用不同方法发现因果结构
print("=" * 50)
print("1. PC算法结果:")
pc = PCAlgorithm(df.values, alpha=0.05)
result = pc.run()
print(f"V-structures: {result['v_structures']}")
print("=" * 50)
print("2. LiNGAM结果:")
lingam = LiNGAM()
causal_order = lingam.fit(df.values)
print(f"估计的因果顺序: {[f'X{i+1}' for i in causal_order]}")
print(f"真实因果顺序: X1 → X2 → X3 → X4 → X5")
return df
if __name__ == "__main__":
df = full_causal_discovery_pipeline()评估与验证
评估指标
| 指标 | 含义 | 适用场景 |
|---|---|---|
| SHD | 结构汉明距离 | 与真实图比较 |
| MEC大小 | 马尔可夫等价类大小 | 等价类评估 |
| F1-边 | 边的精确率和召回率 | 边预测精度 |
| F1-方向 | 方向预测精度 | 方向准确性 |
真实数据验证
由于真实因果结构通常未知,验证因果发现算法的常用方法:
- 扰动实验: 干预某些变量,验证预测的因果效应
- 预测任务: 因果图结构应有助于干预预测
- 与领域知识比较: 专家验证
参考资料
相关词条:因果推断基础,Do-Calculus与因果效应识别,贝叶斯网络
Footnotes
-
Peters, J., Janzing, D., & Schölkopf, B. “Elements of Causal Inference: Foundations and Learning Algorithms.” MIT Press, 2017. ↩