概述
贝叶斯网络(Bayesian Network)是一种用有向无环图(DAG)表示随机变量之间条件独立性关系的概率图模型。1
贝叶斯网络示例:学生成绩模型
[智力] ──→ [成绩]
│ ▲
│ │
▼ │
[难度] ──→ [推荐信]
核心思想
贝叶斯网络将联合分布分解为局部条件分布的乘积:
其中 是节点 的父节点集合。
1. 贝叶斯网络的定义
1.1 结构
贝叶斯网络由两部分组成:
- 有向无环图(DAG) :表示变量之间的依赖关系
- 条件概率分布(CPD) :每个节点的局部条件概率分布
1.2 条件独立性
贝叶斯网络编码了有向分离(d-separation)所定义的条件独立性:
给定父节点 的条件下,节点 与非其后代的节点 条件独立:
1.3 三种基本结构
结构A:链式(Chain)
X → Y → Z
P(Z|X) = ∑_Y P(Z|Y) P(Y|X)
结构B:分叉(Fork)
X ← Y → Z
P(X,Z|Y) = P(X|Y) P(Z|Y)
结构C:对撞(Collider)
X → Y ← Z
P(X,Z) = P(X) P(Z) (X和Z边缘独立)
P(X,Z|Y) ≠ P(X|Y) P(Z|Y) (条件独立失效)
1.4 I-Map与因式分解
如果图 中的每个条件独立性都存在于联合分布 中,则 是 的 I-Map(独立映射)。
此时,联合分布可以因式分解为:
2. 条件概率分布
2.1 离散CPD
对于离散变量,CPD是一个条件概率表(CPT):
| 智力(I) | 成绩(G) | P(G|I) |
|---|---|---|
| 高 | 优 | 0.9 |
| 高 | 良 | 0.08 |
| 高 | 及格 | 0.02 |
| 低 | 优 | 0.3 |
| 低 | 良 | 0.4 |
| 低 | 及格 | 0.3 |
2.2 线性高斯CPD
对于连续变量:
2.3 条件高斯CPD
混合模型:连续父节点,离散子节点(或反之)。
2.4 逻辑CPD
3. 参数学习
3.1 最大似然估计(MLE)
给定完整数据 ,参数的对数似然:
对于贝叶斯网络:
3.2 贝叶斯估计
引入参数的先验分布 :
共轭先验:
| CPD类型 | 共轭先验 |
|---|---|
| 伯努利 | Beta分布 |
| 多项式 | Dirichlet分布 |
| 高斯 | Normal-Wishart |
3.3 实现:离散贝叶斯网络学习
import numpy as np
from collections import defaultdict
class DiscreteBayesianNetwork:
"""离散贝叶斯网络"""
def __init__(self, structure, node_names, node_values):
"""
structure: dict, 每个节点的父节点列表
node_names: list of str, 变量名
node_values: dict, 每个变量的取值数
"""
self.structure = structure
self.node_names = node_names
self.node_values = node_values
self.node_dims = {n: node_values[n] for n in node_names}
self.cpds = {} # 存储CPT
self.node_to_idx = {n: i for i, n in enumerate(node_names)}
def fit_mle(self, data):
"""
data: (N, num_vars) 数据矩阵
使用最大似然估计学习CPT
"""
num_samples = len(data)
for node in self.node_names:
parents = self.structure[node]
node_idx = self.node_to_idx[node]
if len(parents) == 0:
# 无父节点:计算边缘分布
counts = np.zeros(self.node_dims[node])
for sample in data:
counts[sample[node_idx]] += 1
self.cpds[node] = counts / num_samples
else:
# 有父节点:计算条件分布
parent_idx = [self.node_to_idx[p] for p in parents]
# 父节点取值组合数
parent_dim = np.prod([self.node_dims[p] for p in parents])
# 初始化计数
counts = np.zeros((parent_dim, self.node_dims[node]))
parent_counts = np.zeros(parent_dim)
for sample in data:
# 计算父节点取值组合的索引
parent_idx_val = 0
for j, p in enumerate(parents):
multiplier = np.prod([self.node_dims[parents[k]]
for k in range(j+1, len(parents))], dtype=int)
parent_idx_val += sample[parent_idx[j]] * multiplier
parent_counts[parent_idx_val] += 1
counts[parent_idx_val, sample[node_idx]] += 1
# 归一化得到条件概率
self.cpds[node] = {}
for i in range(parent_dim):
if parent_counts[i] > 0:
self.cpds[node][i] = counts[i] / parent_counts[i]
else:
self.cpds[node][i] = np.ones(self.node_dims[node]) / self.node_dims[node]
def fit_bayesian(self, data, alpha=1.0):
"""
贝叶斯估计(使用Dirichlet先验)
alpha: 先验伪计数
"""
num_samples = len(data)
for node in self.node_names:
parents = self.structure[node]
node_idx = self.node_to_idx[node]
if len(parents) == 0:
counts = np.zeros(self.node_dims[node]) + alpha
for sample in data:
counts[sample[node_idx]] += 1
self.cpds[node] = counts / counts.sum()
else:
parent_idx = [self.node_to_idx[p] for p in parents]
parent_dim = np.prod([self.node_dims[p] for p in parents])
counts = np.zeros((parent_dim, self.node_dims[node])) + alpha
for sample in data:
parent_idx_val = 0
for j, p in enumerate(parents):
multiplier = np.prod([self.node_dims[parents[k]]
for k in range(j+1, len(parents))], dtype=int)
parent_idx_val += sample[parent_idx[j]] * multiplier
counts[parent_idx_val, sample[node_idx]] += 1
self.cpds[node] = {}
for i in range(parent_dim):
self.cpds[node][i] = counts[i] / counts[i].sum()
# 示例:学习学生成绩模型
def learn_student_model():
# 定义结构: 智力→成绩, 难度→成绩, 难度→推荐信, 成绩→推荐信
structure = {
'I': [], # 智力:无父节点
'D': [], # 难度:无父节点
'G': ['I', 'D'], # 成绩:父节点为智力和难度
'L': ['D', 'G'] # 推荐信:父节点为难度和成绩
}
node_names = ['I', 'D', 'G', 'L']
node_values = {
'I': 2, # 0=低, 1=高
'D': 2, # 0=简单, 1=困难
'G': 3, # 0=及格, 1=良, 2=优
'L': 2 # 0=差, 1=好
}
# 生成模拟数据
np.random.seed(42)
n_samples = 1000
# 真实参数
P_I = [0.7, 0.3] # P(I)
P_D = [0.6, 0.4] # P(D)
# 生成数据
data = []
for _ in range(n_samples):
I = np.random.choice(2, p=P_I)
D = np.random.choice(2, p=P_D)
# P(G|I,D)
if I == 1 and D == 0: # 高智力 + 简单
G = np.random.choice(3, p=[0.02, 0.08, 0.90])
elif I == 1 and D == 1: # 高智力 + 困难
G = np.random.choice(3, p=[0.10, 0.40, 0.50])
elif I == 0 and D == 0: # 低智力 + 简单
G = np.random.choice(3, p=[0.30, 0.40, 0.30])
else: # 低智力 + 困难
G = np.random.choice(3, p=[0.70, 0.25, 0.05])
# P(L|G,D)
if G == 2 and D == 0: # 优成绩 + 简单
L = np.random.choice(2, p=[0.1, 0.9])
elif G == 2 and D == 1:
L = np.random.choice(2, p=[0.2, 0.8])
elif G == 1:
L = np.random.choice(2, p=[0.4, 0.6])
else:
L = np.random.choice(2, p=[0.8, 0.2])
data.append([I, D, G, L])
data = np.array(data)
# 学习网络
bn = DiscreteBayesianNetwork(structure, node_names, node_values)
bn.fit_mle(data)
# 输出学习的CPT
print("学习到的条件概率表:")
print(f"P(I=高) = {bn.cpds['I'][1]:.3f}")
print(f"P(D=困难) = {bn.cpds['D'][1]:.3f}")
return bn4. 推理方法
4.1 精确推理
变量消除(Variable Elimination)
通过逐步消除非查询变量来计算边缘概率:
class VariableElimination:
"""变量消除算法"""
def __init__(self, bn):
self.bn = bn
self.factors = {} # 存储因子
def _create_factor(self, node, evidence):
"""为节点创建因子"""
parents = self.bn.structure[node]
node_idx = self.bn.node_to_idx[node]
node_dim = self.bn.node_dims[node]
if len(parents) == 0:
# 无父节点:CPT就是边缘分布
return self.bn.cpds[node]
else:
# 有父节点:CPT
return self.bn.cpds[node]
def query(self, query_var, evidence={}):
"""
查询 P(query_var | evidence)
"""
# 初始化:创建与证据一致的因子
factors = []
for node in self.bn.node_names:
if node in evidence:
# 证据节点:固定值
continue
factor = self._create_factor(node, evidence)
factors.append((node, factor))
# 变量消除顺序(简化版本:按出现顺序)
elimination_order = [f[0] for f in factors if f[0] != query_var]
# 逐步消除
while elimination_order:
var = elimination_order.pop(0)
# 找到包含var的因子
relevant_factors = []
remaining_factors = []
for node, factor in factors:
if node == var:
relevant_factors.append((node, factor))
else:
remaining_factors.append((node, factor))
# 乘积
product = relevant_factors[0][1].copy()
for i in range(1, len(relevant_factors)):
product = self._multiply(product, relevant_factors[i])
# 边缘化
marginalized = self._marginalize(product, var)
factors = remaining_factors
if marginalized is not None:
factors.append((var, marginalized))
# 乘积所有剩余因子
result = list(factors[0][1])
for i in range(1, len(factors)):
result = self._multiply(result, factors[i][1])
# 归一化
result = np.array(result)
result = result / result.sum()
return result
def _multiply(self, f1, f2):
"""因子乘法"""
# 简化版本:假设因子维度相同
return f1 * f2
def _marginalize(self, factor, var):
"""因子边缘化(求和)"""
return np.sum(factor)
# 示例查询
def query_example():
bn = learn_student_model()
ve = VariableElimination(bn)
# 查询 P(G | I=高)
# 即给定智力高,成绩分布
result = ve.query('G', evidence={'I': 1})
print("\n查询 P(G | I=高):")
print(f"及格: {result[0]:.3f}")
print(f"良: {result[1]:.3f}")
print(f"优: {result[2]:.3f}")信念传播(Belief Propagation)
对于树结构的贝叶斯网络,使用消息传递进行精确推理:
class BeliefPropagation:
"""信念传播算法(树结构)"""
def __init__(self, bn):
self.bn = bn
self.messages = {} # 存储消息
def _compute_message(self, node, parent):
"""计算从node到parent的消息"""
# 获取节点的CPT
cpt = self.bn.cpds[node]
# 收集其他子节点的消息
children = self._get_children(node)
other_messages = []
for child in children:
if child != parent:
other_messages.append(self.messages.get((child, node), None))
# 计算消息
# 消息 = Σ_其他父节点 CPT × 子节点消息
return cpt
def _get_children(self, node):
"""获取节点的子节点"""
children = []
for n, parents in self.bn.structure.items():
if node in parents:
children.append(n)
return children4.2 近似推理
蒙特卡洛采样
class GibbsSampling:
"""吉布斯采样"""
def __init__(self, bn, num_samples=10000):
self.bn = bn
self.num_samples = num_samples
def sample(self, evidence={}):
"""
吉布斯采样
evidence: dict, 证据变量及其值
"""
num_vars = len(self.bn.node_names)
samples = []
# 初始化
current_state = {}
for node in self.bn.node_names:
if node in evidence:
current_state[node] = evidence[node]
else:
# 随机初始化
current_state[node] = np.random.randint(self.bn.node_dims[node])
# 采样
for _ in range(self.num_samples):
for node in self.bn.node_names:
if node in evidence:
continue
# 计算条件分布 P(node | 其他变量)
prob = self._compute_conditional(node, current_state, evidence)
# 采样
current_state[node] = np.random.choice(
self.bn.node_dims[node], p=prob
)
samples.append(current_state.copy())
return samples
def _compute_conditional(self, node, state, evidence):
"""计算条件概率分布"""
parents = self.bn.structure[node]
node_dim = self.bn.node_dims[node]
prob = np.zeros(node_dim)
for val in range(node_dim):
# 计算联合概率的分子
log_prob = 0
# 节点的条件概率
if len(parents) == 0:
prob[val] = self.bn.cpds[node][val] if val == state.get(node, 0) else 0
else:
parent_vals = tuple(state[p] for p in parents)
prob[val] = self.bn.cpds[node].get(parent_vals, np.ones(node_dim)/node_dim)[val]
# 归一化
if prob.sum() > 0:
prob = prob / prob.sum()
else:
prob = np.ones(node_dim) / node_dim
return prob
def query(self, query_var, evidence={}):
"""查询 P(query_var | evidence)"""
samples = self.sample(evidence)
# 统计查询变量的分布
counts = np.zeros(self.bn.node_dims[query_var])
for sample in samples:
counts[sample[query_var]] += 1
return counts / len(samples)
# 示例:吉布斯采样查询
def gibbs_query_example():
bn = learn_student_model()
gibbs = GibbsSampling(bn, num_samples=10000)
# 查询 P(G | I=高)
result = gibbs.query('G', evidence={'I': 1})
print("\n吉布斯采样查询 P(G | I=高):")
print(f"及格: {result[0]:.3f}")
print(f"良: {result[1]:.3f}")
print(f"优: {result[2]:.3f}")5. 与神经网络的结合
5.1 贝叶斯神经网络
将贝叶斯网络的理念引入神经网络:
class BayesianNeuralNetwork:
"""贝叶斯神经网络"""
def __init__(self, layers):
"""
layers: list of (input_dim, output_dim)
"""
self.layers = layers
self.priors = {}
# 初始化先验(标准高斯)
for i, (in_dim, out_dim) in enumerate(layers):
self.priors[f'W_{i}'] = torch.zeros(out_dim, in_dim)
self.priors[f'b_{i}'] = torch.zeros(out_dim)
def fit_vi(self, X, y, num_iterations=1000):
"""
变分推断训练
"""
# 为每个权重定义变分分布
self.variational_params = {}
for i, (in_dim, out_dim) in enumerate(self.layers):
self.variational_params[f'W_{i}_mean'] = nn.Parameter(torch.randn(out_dim, in_dim) * 0.1)
self.variational_params[f'W_{i}_logvar'] = nn.Parameter(torch.zeros(out_dim, in_dim) - 1)
self.variational_params[f'b_{i}_mean'] = nn.Parameter(torch.zeros(out_dim))
self.variational_params[f'b_{i}_logvar'] = nn.Parameter(torch.zeros(out_dim) - 1)
optimizer = torch.optim.Adam(self.variational_params.values(), lr=0.01)
for iteration in range(num_iterations):
optimizer.zero_grad()
# 从变分分布采样
kl_loss = 0
for name, param in self.variational_params.items():
mean = self.variational_params[name.replace('logvar', 'mean')]
logvar = self.variational_params[name]
kl_loss += -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
# 前向传播(使用重参数化)
h = X
for i, (in_dim, out_dim) in enumerate(self.layers):
W_mean = self.variational_params[f'W_{i}_mean']
W_logvar = self.variational_params[f'W_{i}_logvar']
b_mean = self.variational_params[f'b_{i}_mean']
b_logvar = self.variational_params[f'b_{i}_logvar']
# 重参数化采样
W = W_mean + torch.randn_like(W_mean) * torch.exp(0.5 * W_logvar)
b = b_mean + torch.randn_like(b_mean) * torch.exp(0.5 * b_logvar)
h = h @ W.T + b
if i < len(self.layers) - 1:
h = torch.relu(h)
# 重建损失
recon_loss = F.cross_entropy(h, y)
# 总损失
loss = recon_loss + kl_loss / len(X)
loss.backward()
optimizer.step()
if iteration % 100 == 0:
print(f"Iter {iteration}: Loss = {loss.item():.4f}")5.2 贝叶斯图神经网络
class BayesianGraphNeuralNetwork(nn.Module):
"""贝叶斯GNN"""
def __init__(self, in_channels, hidden_channels, out_channels):
super().__init__()
# 变分参数
self.W1_mean = nn.Parameter(torch.randn(hidden_channels, in_channels))
self.W1_logvar = nn.Parameter(torch.zeros(hidden_channels, in_channels) - 1)
self.W2_mean = nn.Parameter(torch.randn(out_channels, hidden_channels))
self.W2_logvar = nn.Parameter(torch.zeros(out_channels, hidden_channels) - 1)
def forward(self, x, edge_index):
# 采样权重
W1 = self.W1_mean + torch.randn_like(self.W1_mean) * torch.exp(0.5 * self.W1_logvar)
W2 = self.W2_mean + torch.randn_like(self.W2_mean) * torch.exp(0.5 * self.W2_logvar)
# GNN前向传播
h = x @ W1.T
h = F.relu(self._propagate(h, edge_index))
h = h @ W2.T
return h
def _propagate(self, h, edge_index):
"""邻居聚合"""
out = torch.zeros_like(h)
for i in range(edge_index.shape[1]):
src, dst = edge_index[0, i], edge_index[1, i]
out[dst] += h[src]
return out
def kl_divergence(self):
"""计算KL散度"""
kl = 0
for mean, logvar in [(self.W1_mean, self.W1_logvar),
(self.W2_mean, self.W2_logvar)]:
kl += -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
return kl6. 应用场景
6.1 诊断系统
症状 → 疾病诊断
[头痛] ──┐
[发烧] ──┼──→ [流感?] ──→ [治疗方案]
[咳嗽] ──┘
6.2 风险评估
# 信用风险评估
risk_structure = {
'收入': [],
'负债': [],
'信用历史': [],
'还款能力': ['收入', '负债'],
'信用评分': ['信用历史', '还款能力'],
'违约风险': ['还款能力', '信用评分']
}6.3 因果发现
贝叶斯网络可用于从数据中发现因果结构:
class StructureLearning:
"""结构学习"""
def __init__(self, data, max_parents=3):
self.data = data
self.max_parents = max_parents
self.num_vars = data.shape[1]
def score(self, structure):
"""使用BIC评分评估结构"""
score = 0
for node in range(self.num_vars):
parents = structure.get(node, [])
score += self._bic_score(node, parents)
return score
def _bic_score(self, node, parents):
"""BIC评分"""
n = len(self.data)
# 计算对数似然
ll = 0
# ... (计算条件对数似然)
# BIC = LL - 0.5 * k * log(n)
k = len(parents) # 参数数
bic = ll - 0.5 * k * np.log(n)
return bic
def hill_climbing(self):
"""爬山算法搜索最优结构"""
current_structure = {i: [] for i in range(self.num_vars)}
for _ in range(1000):
best_score = self.score(current_structure)
best_move = None
# 尝试添加边
for i in range(self.num_vars):
for j in range(self.num_vars):
if i != j and j not in current_structure[i]:
if len(current_structure[i]) < self.max_parents:
new_structure = current_structure.copy()
new_structure[i] = new_structure[i] + [j]
new_score = self.score(new_structure)
if new_score > best_score:
best_score = new_score
best_move = ('add', i, j)
if best_move:
if best_move[0] == 'add':
current_structure[best_move[1]].append(best_move[2])
else:
break
return current_structure7. 相关主题
| 主题 | 描述 |
|---|---|
| 马尔可夫网络与CRF | 无向图模型 |
| 条件随机场 | 序列标注的CRF |
| 变分推断 | 近似推理方法 |
| 因果推断基础 | 因果建模 |
参考
Footnotes
-
Koller & Friedman, “Probabilistic Graphical Models: Principles and Techniques”, MIT Press 2009 ↩