概率电路基础理论
概率电路(Probabilistic Circuits,PC)是一类支持多项式时间精确推断的概率模型,通过将概率分布表示为计算图,实现边缘化、条件概率、期望计算等操作的精确高效求解。1 本文档深入探讨概率电路的理论基础、计算复杂性、以及与神经网络的联系。
1. 概率电路的定义与表示
1.1 形式化定义
概率电路是一个有向无环图(DAG),由以下四类节点构成:
| 节点类型 | 符号表示 | 功能描述 |
|---|---|---|
| 输入单元(Leaves) | 基础概率分布,如 、 | |
| 求和单元(Sum) | 加权求和: | |
| 乘积单元(Product) | 独立乘积: | |
| 归一化单元(Norm) | 归一化输出确保 valid 概率分布 |
电路的计算语义:
给定输入 ,电路通过自底向上的评估计算得到 :
1.2 作用域与分解性
作用域(Scope) 定义每个节点计算的变量集合:
关键性质:
- 叶节点:,单一变量
- 乘积节点:
- 求和节点:
1.3 电路表示示例
考虑二元概率分布 的电路表示:
[S] (根: w₁·A + w₂·B)
/ \
/ \
[P₁] [P₂] (乘积节点)
/ \ / \
[A] [B] [A] [B] (叶节点)
数学表达式:
这实际上是一个高斯混合模型的电路表示形式。
2. 电路计算的复杂性理论
2.1 可追踪推断的定义
可追踪性(Tractability):如果概率查询 可以在电路规模的多项式时间内计算,即 for some constant ,则称查询 在电路 上是可追踪的。
支持的精确查询类型:
| 查询类型 | 符号表示 | 电路操作 |
|---|---|---|
| 概率评估 | 电路评估 | |
| 边缘化 | 消除不相关变量 | |
| 条件概率 | 归一化边缘比 | |
| MAP推断 | 传播最大值 | |
| 期望计算 | 加权求和 |
2.2 复杂度层次结构
概率电路的推断复杂度与电路结构密切相关:
可追踪
↑
│
┌───────────┼───────────┐
│ │ │
线性链 树结构 平面网格
│ │ │
↓ ↓ ↓
O(n) O(n²) O(n³)
时间复杂度分析:
对于具有 个节点的电路 :
- 评估: — 拓扑排序后逐层计算
- 边缘化: — 对每个变量消除一次
- MAP: — 类似前向传播的选择操作
2.3 精确推断的数学保证
定理(电路推断正确性):对于平滑且一致的电路 ,电路评估给出精确的联合概率分布:
证明思路:
- 基础情况:叶节点返回精确的边缘分布
- 归纳假设:假设所有子节点的输出精确
- 归纳步骤:
- 求和节点:(凸组合)
- 乘积节点:(独立性分解)
2.4 与传统概率图模型的对比
核心优势:概率电路将推断复杂性编码到电路结构设计中,使得特定查询可以精确高效求解。
3. Sum-Product Networks (SPN) 详解
3.1 SPN的定义
和-积网络(SPN)是由 Poon 和 Domingos 在 2011 年提出的概率电路特殊形式。2
结构约束:
- 层次结构:节点分为 层, 为叶节点层, 为根节点层
- 交替连接:层间连接,同层不相连
- 作用域约束:每个节点维护作用域信息
SPN 层次结构示意:
Layer L (根) [S]─────────────────────[S]
/ \ / \
Layer L-1 [P] [P] ... [P] [P]
/ \ / \ / \ / \
Layer L-2 [S][S][S][S] [S][S][S][S]
...
Layer 0 [X₁][X₂][X₃][X₄] ... [Xₙ]
(叶节点: 变量分布)
3.2 SPN的数学形式
完整形式:
其中 是叶节点分布, 是求和节点的权重。
等效电路形式:
设 为规范化后的 SPN,则:
3.3 SPN的三大结构
3.3.1 链式结构(Chain)
X₁ → [S₁] → X₂ → [S₂] → X₃ → ... → X_d → [S_d] → Root
适用于序列化数据(如时间序列、自然语言):
3.3.2 完全二叉树结构(Full Binary Tree)
[Root]
/ \
[S₁] [S₂]
/ \ / \
[P₁] [P₂] [P₃] [P₄]
/ \ / \ / \ / \
[x₁][x₂][x₃][x₄] [x₁][x₂][x₃][x₄]
适用于聚类数据:不同子区域使用不同参数集。
3.3.3 混合结构(Mixed)
结合链式和树状结构,灵活建模复杂依赖关系。
3.4 SPN的条件独立性
定义:SPN 的每个节点 定义了变量子集 上的分布,该分布可以分解为:
条件独立性检验:
若两个变量 不在同一作用域路径上,则:
3.5 SPN的参数学习
目标函数:最大化对数似然
EM算法框架:
class SPN_EM:
"""SPN 的 EM 参数学习"""
def __init__(self, spn):
self.spn = spn
def e_step(self, data):
"""
E步:计算每个样本对每个分量的后验概率
γ_{k,i} = P(z_i = i | x^{(k)})
"""
responsibilities = []
for x in data:
# 前向传播计算各节点值
values = self.spn.forward(x)
# 从叶到根计算边缘概率
log_p = self.spn.root.value
# 计算各分量的加权贡献
responsibilities.append({})
for i, weight in enumerate(self.spn.root.weights):
log_gamma = np.log(weight) + values[i] - log_p
responsibilities[-1][i] = np.exp(log_gamma)
return responsibilities
def m_step(self, data, responsibilities):
"""
M步:更新权重使似然最大化
w_i = (1/N) * Σ_k γ_{k,i}
"""
N = len(data)
for node in self.spn.sum_nodes:
# 收集后验权重和
total_gamma = 0
for k, gamma in enumerate(responsibilities):
total_gamma += gamma[node.component_id]
# 更新权重
node.weight = total_gamma / N
# 归一化权重
self.spn.normalize()
def fit(self, data, n_epochs=100):
"""训练循环"""
for epoch in range(n_epochs):
# E步
responsibilities = self.e_step(data)
# M步
self.m_step(data, responsibilities)
# 计算对数似然
log_likelihood = self.compute_log_likelihood(data)
print(f"Epoch {epoch}: Log-likelihood = {log_likelihood:.4f}")4. 电路与神经网络的联系
4.1 计算图视角
概率电路与神经网络在计算图层面具有深刻联系:
| 方面 | 神经网络 | 概率电路 |
|---|---|---|
| 图结构 | DAG | DAG |
| 节点操作 | 线性变换 + 非线性激活 | 加权求和 + 乘积 |
| 参数位置 | 边权重 | 节点权重 |
| 前向传播 | 确定性计算 | 概率传播 |
| 梯度计算 | 反向传播 | 期望传播 |
4.2 电路作为神经网络层
线性电路层:
非线性电路层(激活函数对应):
| 神经网络激活 | 电路对应操作 |
|---|---|
| ReLU | 半区域求和分解 |
| Sigmoid | Sigmoid 树结构 |
| Softmax | 归一化求和 |
4.3 深度电路的表达能力
深度电路逼近定理:
任何 维上的离散概率分布可以用深度为 的概率电路精确表示。3
直觉理解:
- 浅层电路:表达能力受限于局部结构
- 深层电路:通过层次化分解实现指数级表达能力增长
4.4 神经网络到概率电路的编译
class NeuralToCircuitCompiler:
"""将神经网络编译为概率电路"""
def compile(self, nn_model):
"""
编译流程:
1. 展开计算图
2. 替换非线性操作为概率电路子结构
3. 编码参数为电路权重
"""
# 展开计算图
dag = self.extract_dag(nn_model)
# 编译每个操作
circuit_layers = []
for node in dag.topological_sort():
if isinstance(node, nn.Linear):
circuit_layers.append(self.compile_linear(node))
elif isinstance(node, nn.ReLU):
circuit_layers.append(self.compile_relu(node))
elif isinstance(node, nn.Sigmoid):
circuit_layers.append(self.compile_sigmoid(node))
# 组合为完整电路
return self.compose_circuit(circuit_layers)
def compile_relu(self, relu):
"""
ReLU 分解为概率电路
ReLU(x) = max(0, x)
分解为:x = x⁺ - x⁻,其中 x⁺, x⁻ ≥ 0
对应电路:
[Sum]
/ \
[Id] [Zero]
|
[Product]
"""
return ReLUCircuit()
def compile_sigmoid(self, sigmoid):
"""
Sigmoid 分解为概率电路
σ(x) = 1 / (1 + e^(-x))
用和-积网络逼近
"""
depth = 10 # 泰勒展开阶数
return SigmoidCircuit(depth)4.5 概率电路的神经网络训练
电路可以使用梯度下降进行端到端训练:
class DifferentiableCircuit(nn.Module):
"""可微分概率电路"""
def __init__(self, num_variables, layer_sizes):
super().__init__()
# 初始化可学习权重
self.sum_weights = nn.ParameterList([
nn.Parameter(torch.randn(s))
for s in layer_sizes
])
# 高斯混合组件
self.means = nn.Parameter(torch.randn(num_variables, layer_sizes[0]))
self.log_vars = nn.Parameter(torch.zeros(num_variables, layer_sizes[0]))
def forward(self, x):
"""
电路前向传播
返回对数概率
"""
batch_size = x.size(0)
# Layer 0: 叶节点(高斯分布)
h = self.compute_leaf_log_prob(x) # (batch, num_components)
# 逐层传播
for layer_idx, (num_nodes, num_children) in enumerate(
self.get_layer_config()
):
if layer_idx % 2 == 0: # Sum layer
h = self.sum_layer(h)
else: # Product layer
h = self.product_layer(h)
return h # 返回对数似然
def compute_leaf_log_prob(self, x):
"""计算叶节点的对数概率"""
# (x - μ)² / (2σ²) + log(σ)
diff = x.unsqueeze(-1) - self.means # (batch, var, comp)
log_prob = -0.5 * (
diff ** 2 / self.log_vars.exp() +
self.log_vars +
np.log(2 * np.pi)
)
return log_prob.sum(dim=1) # (batch, comp)
def sum_layer(self, h):
"""
求和层:加权和
h_out[j] = Σ_i w_{ji} * h_in[i]
"""
weights = F.softmax(self.sum_weights[len(self.activations)], dim=0)
return h @ weights
def product_layer(self, h):
"""
乘积层:独立变量的乘积
h_out[j] = ∏_i h_in[i]
"""
# 使用 log-sum-exp 保持数值稳定
return h.sum(dim=-1)5. 层次化变分推断与电路
5.1 电路作为变分后验
在贝叶斯深度学习中,电路可以用于表示变分后验 :
其中 是一个以 为条件的概率电路。
5.2 层次化潜变量模型
考虑 层潜变量的层次模型:
电路表示:
[Root] ────────────────────────── p(x)
/ \
/ \
[S₁] [S₂] ← 第L层混合
| |
[P] [P] ← 条件独立
| |
[S] [S] ← 第L-1层
.
.
.
[x] ← 观测变量
5.3 电路变分推断算法
class CircuitVariationalInference:
"""基于概率电路的变分推断"""
def __init__(self, circuit, q_circuit):
self.p_circuit = circuit # 先验/生成分布
self.q_circuit = q_circuit # 变分后验
def elbo(self, x):
"""
计算证据下界
ELBO = E_q[log p(x, z)] - E_q[log q(z|x)]
= E_q[log p(x|z)] + E_q[log p(z)] - E_q[log q(z|x)]
"""
# E_q[log p(x, z)] = E_q[log p(x|z)] + E_q[log p(z)]
log_px_given_z = self.compute_conditional(x)
log_pz = self.q_circuit.compute_prior_kl()
# E_q[log q(z|x)]
log_qz_given_x = self.q_circuit.evaluate(x)
elbo = log_px_given_z + log_pz - log_qz_given_x
return elbo
def fit(self, data, n_epochs=100, lr=0.001):
"""优化 ELBO"""
optimizer = torch.optim.Adam(
self.q_circuit.parameters(),
lr=lr
)
for epoch in range(n_epochs):
optimizer.zero_grad()
# 计算 ELBO
elbo = self.elbo(data)
# 最大化 ELBO = 最小化 -ELBO
loss = -elbo.mean()
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: ELBO = {elbo.mean():.4f}")5.4 归一化流与电路
归一化流可以通过电路实现:
class FlowCircuit(nn.Module):
"""基于电路的归一化流"""
def __init__(self, base_dist_circuit, flow_circuits):
super().__init__()
self.base_dist = base_dist_circuit
self.flows = nn.ModuleList(flow_circuits)
def forward(self, x):
"""
前向传播:计算对数密度
log p(x) = log p(z_K) + Σ_k log |det J_k|
"""
z = x
log_det_sum = 0
for flow in self.flows:
z, log_det = flow(z)
log_det_sum += log_det
# 基分布的对数密度
log_p_zK = self.base_dist(z)
return log_p_zK + log_det_sum6. 学习与推断算法
6.1 结构学习
6.1.1 基于分组的结构学习
class GroupingStructureLearner:
"""
基于变量分组的电路结构学习
核心思想:将高度相关的变量分组到同一 Product 节点下
"""
def __init__(self, max_group_size=4):
self.max_group_size = max_group_size
def learn_structure(self, data):
"""
学习电路结构
步骤:
1. 计算变量间的互信息
2. 基于互信息构建变量分组
3. 构建 Sum-Product 层
"""
n_vars = data.shape[1]
# 1. 计算互信息矩阵
mi_matrix = self.compute_mutual_information(data)
# 2. 聚类变量
groups = self.cluster_variables(mi_matrix)
# 3. 构建电路
circuit = self.build_circuit_from_groups(groups, data)
return circuit
def compute_mutual_information(self, data):
"""计算变量间的互信息"""
n_vars = data.shape[1]
mi_matrix = np.zeros((n_vars, n_vars))
for i in range(n_vars):
for j in range(i+1, n_vars):
mi = self.estimate_mi(data[:, i], data[:, j])
mi_matrix[i, j] = mi
mi_matrix[j, i] = mi
return mi_matrix
def cluster_variables(self, mi_matrix):
"""基于互信息的谱聚类"""
# 构建相似图
# 使用谱聚类获得变量分组
pass
def build_circuit_from_groups(self, groups, data):
"""从分组构建电路"""
circuit = ProbabilisticCircuit()
# 添加叶节点层
for group in groups:
leaf_probs = self.estimate_leaf_distribution(data[:, group])
circuit.add_leaf_group(group, leaf_probs)
# 逐层构建 Sum-Product 结构
current_level = circuit.leaf_layer
while len(current_level.nodes) > 1:
# 决定是 Sum 层还是 Product 层
if self.should_use_sum_layer(current_level):
current_level = circuit.add_sum_layer()
else:
current_level = circuit.add_product_layer()
return circuit6.1.2 端到端可微结构学习
class DifferentiableStructureLearner(nn.Module):
"""
可微分结构学习
使用 Gumbel-Softmax 进行离散结构选择
"""
def __init__(self, n_variables, hidden_dim):
super().__init__()
# 结构参数(软选择)
self.structure_logits = nn.Parameter(
torch.randn(n_variables, hidden_dim)
)
def forward(self, x, temperature=1.0, hard=False):
"""
前向传播,返回学习的电路结构
Args:
x: 输入数据
temperature: Gumbel-Softmax 温度
hard: 是否使用硬选择
"""
# Gumbel-Softmax 重参数化
gumbels = -torch.empty_like(self.structure_logits).exponential_().log()
logits = (self.structure_logits.log_softmax(dim=-1) + gumbels) / temperature
soft_structure = F.softmax(logits, dim=-1)
if hard:
# 硬选择:one-hot
structure = F.one_hot(
logits.argmax(dim=-1),
num_classes=self.structure_logits.size(-1)
).float()
else:
structure = soft_structure
return structure, soft_structure6.2 参数学习算法
6.2.1 生成对抗电路学习
class AdversarialCircuitLearning:
"""
对抗学习用于电路参数估计
使用判别器区分真实数据和电路生成的数据
"""
def __init__(self, circuit, discriminator):
self.circuit = circuit
self.discriminator = discriminator
def train_step(self, real_data):
"""
对抗训练步骤
交替更新:
1. 判别器:最大化真实/生成样本的区分度
2. 生成器(电路):最小化判别器的损失
"""
batch_size = real_data.size(0)
device = real_data.device
# 1. 更新判别器
self.discriminator_optimizer.zero_grad()
# 真实样本
real_labels = torch.ones(batch_size, device=device)
d_real = self.discriminator(real_data)
loss_d_real = F.binary_cross_entropy_with_logits(d_real, real_labels)
# 生成样本(从电路采样)
fake_data = self.circuit.sample(batch_size)
fake_labels = torch.zeros(batch_size, device=device)
d_fake = self.discriminator(fake_data.detach())
loss_d_fake = F.binary_cross_entropy_with_logits(d_fake, fake_labels)
loss_discriminator = (loss_d_real + loss_d_fake) / 2
loss_discriminator.backward()
self.discriminator_optimizer.step()
# 2. 更新电路
self.circuit_optimizer.zero_grad()
fake_data = self.circuit.sample(batch_size)
d_fake = self.discriminator(fake_data)
# 电路希望骗过判别器
loss_generator = F.binary_cross_entropy_with_logits(
d_fake,
torch.ones(batch_size, device=device)
)
# 添加 ELBO 正则项
elbo = self.circuit.elbo(real_data)
loss_total = loss_generator - 0.1 * elbo
loss_total.backward()
self.circuit_optimizer.step()
return {
'd_loss': loss_discriminator.item(),
'g_loss': loss_generator.item(),
'elbo': elbo.mean().item()
}6.3 推断算法
6.3.1 并行电路推断
class ParallelCircuitInference:
"""
并行化电路推断算法
利用 GPU 进行批量电路评估
"""
def __init__(self, circuit, device='cuda'):
self.circuit = circuit
self.device = device
def batch_evaluate(self, X):
"""
批量评估电路概率
Args:
X: (batch_size, n_variables) 输入数据
Returns:
log_probs: (batch_size,) 对数概率
"""
batch_size = X.size(0)
# 逐层前向传播
current_values = X # (batch, ...)
for layer in self.circuit.layers:
current_values = layer.forward(current_values)
# 最后一层归一化
log_probs = F.log_softmax(current_values, dim=-1)
return log_probs
def batch_marginal(self, X, marginal_vars):
"""
批量边缘化推断
消除非边际变量
"""
# 克隆电路并标记边缘化变量
circuit = self.circuit.clone()
# 设置边缘化节点
for var_idx in range(circuit.n_variables):
if var_idx not in marginal_vars:
circuit.set_eliminated(var_idx)
# 批量评估
return self.batch_evaluate(X)
def batch_map(self, X):
"""
批量 MAP 推断
找到最可能的配置
"""
batch_size = X.size(0)
# 前向传播计算各节点的值
node_values = self.circuit.forward(X)
# 后向传播找到最优配置
best_config = []
# 从根开始
current_nodes = [self.circuit.root]
for layer_idx in range(len(self.circuit.layers) - 1, -1, -1):
next_nodes = []
for node in current_nodes:
if isinstance(node, SumNode):
# 选择权重最大的子节点
child_values = [node_values[child.id] for child in node.children]
best_child = node.children[child_values.argmax()]
next_nodes.append(best_child)
elif isinstance(node, ProductNode):
# 所有子节点都需要考虑
next_nodes.extend(node.children)
current_nodes = next_nodes
# 收集叶节点配置
for leaf in current_nodes:
best_config.append(leaf.value)
return torch.stack(best_config, dim=1)7. PyTorch 代码实现
7.1 基础概率电路类
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np
from typing import List, Tuple, Optional
class Node:
"""电路节点基类"""
def __init__(self, node_id: int):
self.id = node_id
self.scope = set()
self.children = []
def forward(self, x: torch.Tensor) -> torch.Tensor:
raise NotImplementedError
def backward(self, grad: torch.Tensor):
raise NotImplementedError
class LeafNode(Node):
"""叶节点:表示变量分布"""
def __init__(self, node_id: int, var_idx: int,
distribution_type: str = 'gaussian'):
super().__init__(node_id)
self.var_idx = var_idx
self.distribution_type = distribution_type
# 高斯分布参数
self.mean = nn.Parameter(torch.randn(1))
self.log_var = nn.Parameter(torch.zeros(1))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
计算叶节点的对数概率密度
Args:
x: (batch_size, n_variables)
Returns:
log_prob: (batch_size,)
"""
x_i = x[:, self.var_idx] # 提取对应变量
mean = self.mean.squeeze()
std = torch.exp(0.5 * self.log_var).squeeze()
# 高斯对数密度
log_prob = -0.5 * (
math.log(2 * math.pi) +
self.log_var +
(x_i - mean) ** 2 / torch.exp(self.log_var)
)
return log_prob
class SumNode(Node):
"""求和节点:加权求和"""
def __init__(self, node_id: int, num_children: int):
super().__init__(node_id)
self.num_children = num_children
# 权重(自动归一化)
self.log_weights = nn.Parameter(torch.randn(num_children))
@property
def weights(self):
"""归一化权重"""
return F.softmax(self.log_weights, dim=0)
def forward(self, child_values: List[torch.Tensor]) -> torch.Tensor:
"""
求和节点前向传播
Args:
child_values: 子节点的值列表
Returns:
value: (batch_size,)
"""
# 加权和
weighted_sum = sum(
w * v for w, v in zip(self.weights, child_values)
)
return weighted_sum
def scope_union(self):
"""计算作用域并集"""
scope = set()
for child in self.children:
scope.update(child.scope)
self.scope = scope
class ProductNode(Node):
"""乘积节点:独立分解"""
def __init__(self, node_id: int, num_children: int):
super().__init__(node_id)
self.num_children = num_children
def forward(self, child_values: List[torch.Tensor]) -> torch.Tensor:
"""
乘积节点前向传播
使用 log-sum-exp 保持数值稳定
Args:
child_values: 子节点的值列表
Returns:
value: (batch_size,)
"""
# log(∏ v_i) = Σ log(v_i)
# 使用 log-space 加法
log_sum = sum(child_values)
return log_sum
def scope_union(self):
"""计算作用域并集"""
scope = set()
for child in self.children:
scope.update(child.scope)
self.scope = scope
class ProbabilisticCircuit(nn.Module):
"""
完整概率电路实现
支持前向传播、ELBO 计算和采样
"""
def __init__(self, n_variables: int):
super().__init__()
self.n_variables = n_variables
self.leaves = nn.ModuleList()
self.sum_nodes = nn.ModuleList()
self.product_nodes = nn.ModuleList()
self.node_list = [] # 拓扑序
self.root = None
def add_leaf(self, var_idx: int, dist_type: str = 'gaussian') -> LeafNode:
"""添加叶节点"""
leaf = LeafNode(len(self.node_list), var_idx, dist_type)
leaf.scope = {var_idx}
self.leaves.append(leaf)
self.node_list.append(leaf)
return leaf
def add_sum_node(self, num_children: int) -> SumNode:
"""添加求和节点"""
node = SumNode(len(self.node_list), num_children)
self.sum_nodes.append(node)
self.node_list.append(node)
return node
def add_product_node(self, num_children: int) -> ProductNode:
"""添加乘积节点"""
node = ProductNode(len(self.node_list), num_children)
self.product_nodes.append(node)
self.node_list.append(node)
return node
def set_root(self, root: Node):
"""设置根节点"""
self.root = root
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
电路前向传播
Args:
x: (batch_size, n_variables) 输入数据
Returns:
log_prob: (batch_size,) 对数概率
"""
# 按拓扑序评估
node_values = {}
for node in self.node_list:
if isinstance(node, LeafNode):
node_values[node.id] = node(x)
elif isinstance(node, SumNode):
child_vals = [node_values[c.id] for c in node.children]
node_values[node.id] = node(child_vals)
elif isinstance(node, ProductNode):
child_vals = [node_values[c.id] for c in node.children]
node_values[node.id] = node(child_vals)
return node_values[self.root.id]
def elbo(self, x: torch.Tensor, prior_std: float = 1.0) -> torch.Tensor:
"""
计算 ELBO(用于学习)
ELBO = log p(x) - D_KL(q || p)
这里简化为 log p(x)
"""
return self.forward(x)
def sample(self, n_samples: int) -> torch.Tensor:
"""
从电路分布采样
使用祖先采样
"""
samples = torch.zeros(n_samples, self.n_variables)
# 拓扑序:从叶到根确定分布参数,从根到叶采样
# 这里简化处理:假设电路表示混合高斯
with torch.no_grad():
for node in self.node_list:
if isinstance(node, SumNode):
# 选择一个组件
weights = node.weights.cpu().numpy()
selected_idx = np.random.choice(
len(weights), size=n_samples, p=weights
)
elif isinstance(node, LeafNode):
# 从对应高斯采样
mean = node.mean.item()
std = math.exp(0.5 * node.log_var.item())
samples[:, node.var_idx] = torch.randn(n_samples) * std + mean
return samples
def build_simple_spn(n_variables: int, n_components: int = 4) -> ProbabilisticCircuit:
"""
构建简单的 SPN 结构
结构:混合高斯模型
[Sum] (根,混合权重)
/ | \ ...
/ | \
[P] [P] [P] (乘积节点,每组变量的独立分布)
/\ /\ /\
[x₁][x₂] [x₁][x₂] ... (叶节点)
"""
circuit = ProbabilisticCircuit(n_variables)
# 添加叶节点(每个变量一个)
leaves = []
for i in range(n_variables):
leaf = circuit.add_leaf(i)
leaves.append(leaf)
# 创建乘积节点组
# 每 n_variables // n_components 个变量一组
vars_per_group = max(1, n_variables // n_components)
product_nodes = []
for g in range(n_components):
var_start = g * vars_per_group
var_end = min((g + 1) * vars_per_group, n_variables)
group_leaves = leaves[var_start:var_end]
if len(group_leaves) > 1:
# 创建乘积节点
prod = circuit.add_product_node(len(group_leaves))
prod.children = group_leaves
prod.scope_union()
product_nodes.append(prod)
else:
product_nodes.append(group_leaves[0])
# 创建根节点
root = circuit.add_sum_node(len(product_nodes))
root.children = product_nodes
root.scope_union()
circuit.set_root(root)
return circuit7.2 完整训练示例
def train_probabilistic_circuit():
"""
完整训练示例:使用变分推断训练概率电路
"""
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
# 生成模拟数据(双峰分布)
n_samples = 1000
n_variables = 2
# 混合高斯数据
data1 = torch.randn(n_samples // 2, n_variables) + torch.tensor([2.0, 2.0])
data2 = torch.randn(n_samples // 2, n_variables) + torch.tensor([-2.0, -2.0])
data = torch.cat([data1, data2], dim=0)
# 打乱数据
perm = torch.randperm(len(data))
data = data[perm]
# 构建电路
circuit = build_simple_spn(n_variables, n_components=4)
# 优化器
optimizer = torch.optim.Adam(circuit.parameters(), lr=0.05)
# 训练循环
n_epochs = 200
batch_size = 64
losses = []
for epoch in range(n_epochs):
epoch_loss = 0
n_batches = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
optimizer.zero_grad()
# 计算对数似然
log_prob = circuit(batch)
# 最大化对数似然 = 最小化负对数似然
loss = -log_prob.mean()
loss.backward()
optimizer.step()
epoch_loss += loss.item()
n_batches += 1
avg_loss = epoch_loss / n_batches
losses.append(avg_loss)
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}")
# 测试:评估和采样
print("\n评估电路...")
test_log_prob = circuit(data).mean()
print(f"测试集平均对数概率: {test_log_prob:.4f}")
# 采样
print("\n从电路采样...")
samples = circuit.sample(n_samples=10)
print("采样结果:")
print(samples[:5])
return circuit, losses
# 运行训练
if __name__ == "__main__":
circuit, losses = train_probabilistic_circuit()7.3 电路可视化工具
class CircuitVisualizer:
"""概率电路可视化"""
def __init__(self, circuit: ProbabilisticCircuit):
self.circuit = circuit
def to_networkx(self):
"""转换为 NetworkX 图"""
import networkx as nx
G = nx.DiGraph()
# 添加节点
for node in self.circuit.node_list:
if isinstance(node, LeafNode):
G.add_node(node.id,
type='leaf',
var_idx=node.var_idx)
elif isinstance(node, SumNode):
G.add_node(node.id,
type='sum',
weights=node.weights.detach().numpy())
elif isinstance(node, ProductNode):
G.add_node(node.id, type='product')
if node == self.circuit.root:
G.add_node(node.id, is_root=True)
# 添加边
for node in self.circuit.node_list:
if hasattr(node, 'children'):
for child in node.children:
G.add_edge(child.id, node.id)
return G
def plot(self, save_path: str = 'circuit.png'):
"""绘制电路图"""
import networkx as nx
import matplotlib.pyplot as plt
G = self.to_networkx()
plt.figure(figsize=(12, 8))
# 节点颜色
color_map = []
for node in G.nodes():
node_data = G.nodes[node]
if node_data.get('is_root', False):
color_map.append('red')
elif node_data['type'] == 'leaf':
color_map.append('green')
elif node_data['type'] == 'sum':
color_map.append('blue')
else:
color_map.append('orange')
# 布局
pos = nx.spring_layout(G)
# 绘制
nx.draw(G, pos,
node_color=color_map,
with_labels=True,
node_size=500,
font_size=8)
plt.savefig(save_path)
plt.close()8. 总结与展望
8.1 核心要点总结
| 主题 | 核心概念 |
|---|---|
| 电路定义 | DAG + Sum/Product 节点 + 叶节点 |
| 可追踪性 | 精确边缘化、条件概率、MAP |
| SPN | 层次化 Sum-Product 结构 |
| 神经网络联系 | 计算图统一、梯度训练 |
| 变分推断 | 电路作为变分后验 |
| 学习算法 | EM、梯度下降、对抗学习 |
8.2 与现有文档的关联
概率电路与其他概率模型密切相关:
8.3 前沿研究方向
- 深度概率电路:更深的网络结构,更强的表达能力
- 可微电路搜索:自动化电路结构设计
- 概率电路 transformers:将电路思想引入注意力机制
- 因果概率电路:结合因果推断的电路模型
参考
脚注
Footnotes
-
Choi, Y., et al. (2020). “Building Tractable Probabilistic Models with Generalized Additive Noise Models”. NeurIPS 2020. ↩
-
Poon, H., & Domingos, P. (2011). “Sum-Product Networks: A New Probabilistic Model”. UAI 2011. ↩
-
Liang, Y., et al. (2024). “Towards Expressive and Tractable Probabilistic Models”. arXiv:2402.00759. ↩