概述
Do-Calculus是Judea Pearl于1995年提出的形式化因果推理工具,通过三条推理规则系统地判定因果效应是否可从观测数据中识别。本章介绍Do算子的含义、Do-Calculus规则,以及因果效应识别的核心准则。1
Do算子
定义
Do算子 表示对变量X进行强制干预(Intervention),即将X的值固定为x,移除所有指向X的因果箭头。
| 符号 | 含义 |
|---|---|
| $P(Y | X)$ |
| $P(Y | do(X))$ |
干预与观测的区别
考虑因果图 和混杂结构 :
import numpy as np
# 混杂结构的数据生成
np.random.seed(42)
n = 10000
Z = np.random.randn(n) # 混杂因素
# X受Z影响
X = 0.5 * Z + np.random.randn(n)
# Y受X和Z影响
Y = 2 * X + 0.3 * Z + np.random.randn(n)
# 观测关联(包含混杂效应)
corr_obs = np.corrcoef(X, Y)[0, 1]
print(f"观测相关性: {corr_obs:.3f}")
# 真正的因果效应应该是 2
# 但观测相关性被Z的混杂效应放大了关键洞察:
- 回答的是”如果强制设置X=x,Y会怎样”
- 包含X与Y之间所有路径的效应,包括混杂
Do操作的图示
观测(无干预): 干预 do(X=x₀):
Z → X → Y Z → X → Y
↓ ↘ ↓ ↘
└──────→ Y └ X=x₀
X←Z←W X←V X→Y X←→Y X→Y
↑ ↑ ↑ ↑ ↑
混杂 选择 直接 对撞 反馈
偏差 偏差 效应 偏差 回路
Do-Calculus 三条规则
Pearl证明,任意do-expression可以通过以下三条规则逐步转化,直到不再包含do算子。1
规则1:插入/删除观测
含义: 移除与Y在给定X条件下独立的变量。
图示: 移除后门路径上的非对撞节点。
def rule1_example():
"""
规则1示例:Z是X的后代,不影响Y的独立部分
图: X → Z → Y
X ⊥ Y | Z (给定Z后,X与Y条件独立)
因此: P(Y|do(X), Z) = P(Y|do(X))
"""
pass规则2:行动/观测交换
含义: 如果Z不是Y的祖先,则可以用观测Z替代干预do(Z)。
图示: 移除不影响Y的干预。
规则2前提检查:
- 在干预do(X)后的图中,Z不是Y的后代
- 则 do(Z) 可以被 Z 替代
规则3:插入/删除干预
含义: 移除与Y在给定X条件下无因果关系的干预变量。
因果效应识别公式
后门调整公式
当后门路径被阻断时,因果效应可直接从观测数据估计:
后门路径定义: 从X到Y的路径中,以←结尾的路径。
后门准则: 一组变量Z满足后门准则当且仅当:
- Z阻断了X与Y之间的所有后门路径
- Z不包含X的任何后代
def backdoor_adjustment(df, X, Y, Z_covariates):
"""
后门调整公式实现
P(Y|do(X)) = Σ_z P(Y|X, Z=z) * P(Z=z)
参数:
df: 包含所有变量的数据框
X: 处理变量名
Y: 结果变量名
Z_covariates: 后门调整变量列表
"""
effect = {}
for x_val in df[X].unique():
numerator_sum = 0
for z_combination in df[Z_covariates].drop_duplicates().values:
# P(Y=y, Z=z | X=x) 形式
mask = (df[X] == x_val)
for col, val in zip(Z_covariates, z_combination):
mask = mask & (df[col] == val)
p_y_given_xz = df.loc[mask, Y].mean()
p_z = (df[X] == x_val).mean() # 近似 P(Z=z)
numerator_sum += p_y_given_xz * p_z
effect[x_val] = numerator_sum / len(df[Z_covariates].drop_duplicates())
return effect前门调整公式
当后门路径无法完全阻断时,可使用前门准则。
前门准则条件:
- Z完全中介X到Y的因果效应
- Z与X之间没有未阻断的后门路径
- X到Y的所有后门路径都被阻断
前门调整公式:
def frontdoor_adjustment(df, X, Y, Z):
"""
前门调整公式实现
P(Y|do(X)) = Σ_z P(Z=z|X) * Σ_x' P(Y|X=x', Z=z) * P(X=x')
适用场景:X → Z → Y,X到Y的直接路径需要被阻断
但Z完全中介了X的效应
"""
effect = {}
for x_val in df[X].unique():
total = 0
for z_val in df[Z].unique():
# P(Z=z|X=x)
p_z_given_x = ((df[X] == x_val) & (df[Z] == z_val)).mean() / (df[X] == x_val).mean()
inner_sum = 0
for x_prime in df[X].unique():
# P(Y|X=x', Z=z)
mask = (df[X] == x_prime) & (df[Z] == z_val)
p_y_given_xz = df.loc[mask, Y].mean() if mask.sum() > 0 else 0
# P(X=x')
p_x = (df[X] == x_prime).mean()
inner_sum += p_y_given_xz * p_x
total += p_z_given_x * inner_sum
effect[x_val] = total
return effect后门准则 vs 前门准则
| 方面 | 后门准则 | 前门准则 |
|---|---|---|
| 机制 | 阻断后门路径 | 利用中介变量 |
| 识别条件 | 需控制所有混杂 | 需存在完全中介 |
| 变量要求 | 混杂变量可观测 | 需可观测的完全中介 |
| 灵活性 | 较高 | 较低 |
| 应用场景 | 混杂可测 | 混杂不可测,但中介可测 |
决策树
因果效应识别策略:
Z(混杂)可观测?
├── 是 → 使用后门准则,控制Z
│ 检查:Z是否阻断所有后门路径?
│ 检查:Z是否包含X的后代?
│
└── 否 → Z(混杂)不可观测?
├── 是 → 存在完全中介Z?
│ ├── 是 → 使用前门准则
│ └── 否 → 因果效应不可识别
│ (除非使用工具变量)
│
└── 否 → 考虑工具变量方法
干预下的图变换
原子干预 do(X=x₀)
将所有指向X的箭头删除,并将X固定为x₀。
干预前: 干预 do(X=x₀) 后:
A → X → Y A X=x₀ → Y
↓ ↑ ↓
B → C B → C
所有指向X的边都被删除
子集干预 do(X⊂S)
对变量子集进行干预,其他保持观测。
class CausalGraph:
def __init__(self, edges):
"""
edges: 列表 [(parent, child), ...]
"""
self.adj = {}
for parent, child in edges:
if child not in self.adj:
self.adj[child] = []
self.adj[child].append(parent)
def apply_intervention(self, do_variables):
"""
应用干预,返回干预后的图结构
"""
new_edges = []
for parent, child in edges:
# 删除所有指向被干预变量的边
if child in do_variables:
continue
# 删除所有被干预变量作为父节点的边
if parent not in do_variables:
new_edges.append((parent, child))
return CausalGraph(new_edges)可识别性的判定
可识别性定义
一个因果量是可识别的,如果它可以唯一地表示为观测分布的函数。
判定算法(ID算法):
- 从查询(如)开始
- 反复应用Do-Calculus规则
- 如果最终不包含do算子,则可识别
- 否则,可能需要额外假设或数据
不可识别的案例
混淆结构(未观测混杂):
U → X
↓ ↓
Z → Y
其中U未观测。
此结构下,P(Y|do(X)) 不可识别。
与其他识别方法的比较
| 方法 | 数据需求 | 假设强度 | 适用场景 |
|---|---|---|---|
| 后门调整 | 观测数据 | 混杂可测 | 混杂已知且可观测 |
| 前门调整 | 观测数据 | 存在完全中介 | 混杂不可测 |
| 工具变量 | 观测数据 | 存在有效工具 | 混杂不可测 |
| 倾向评分 | 观测数据 | 混杂可测、可重叠 | 选择偏差 |
| 回归 discontinuity | 观测数据 | 断点处随机分配 | 自然实验 |
参见贝叶斯网络中的条件独立性与d-分离理论。
代码实现:完整的Do-Calculus
import numpy as np
from itertools import combinations
class DoCalculus:
"""
Do-Calculus规则实现
"""
def __init__(self, graph):
"""
graph: 邻接表表示的因果图
"""
self.graph = graph
self.nodes = set(graph.keys())
def is_d_separated(self, X, Y, Z):
"""
判定X和Y在给定Z下是否d-分离
"""
# 简化的d-分离判定
# 实际实现需要更复杂的图遍历算法
pass
def rule1(self, Y, do_X, Z, W):
"""
规则1:移除给定X,W后与Y独立的Z
"""
if self.is_d_separated(Y, Z, do_X.union(W)):
return self.remove_node(Y, do_X, Z, W)
return None
def rule2(self, Y, do_X, do_Z, W):
"""
规则2:如果Z不是Y的祖先,用观测替代do(Z)
"""
if not self.is_ancestor(Z, Y, do_X):
return self.replace_do_with_obs(Y, do_X, do_Z, W)
return None
def is_ancestor(self, node, target, do_set):
"""判定node是否是target的祖先(干预后图中)"""
# BFS/DFS图遍历
pass