概述

贝叶斯网络(Bayesian Network)是一种用有向无环图(DAG)表示随机变量之间条件独立性关系的概率图模型。1

贝叶斯网络示例:学生成绩模型

    [智力] ──→ [成绩]
      │           ▲
      │           │
      ▼           │
    [难度] ──→ [推荐信]

核心思想

贝叶斯网络将联合分布分解为局部条件分布的乘积:

其中 是节点 的父节点集合。


1. 贝叶斯网络的定义

1.1 结构

贝叶斯网络由两部分组成:

  1. 有向无环图(DAG) :表示变量之间的依赖关系
  2. 条件概率分布(CPD) :每个节点的局部条件概率分布

1.2 条件独立性

贝叶斯网络编码了有向分离(d-separation)所定义的条件独立性:

给定父节点 的条件下,节点 与非其后代的节点 条件独立:

1.3 三种基本结构

结构A:链式(Chain)
    X → Y → Z
    P(Z|X) = ∑_Y P(Z|Y) P(Y|X)

结构B:分叉(Fork)
    X ← Y → Z
    P(X,Z|Y) = P(X|Y) P(Z|Y)

结构C:对撞(Collider)
    X → Y ← Z
    P(X,Z) = P(X) P(Z)  (X和Z边缘独立)
    P(X,Z|Y) ≠ P(X|Y) P(Z|Y) (条件独立失效)

1.4 I-Map与因式分解

如果图 中的每个条件独立性都存在于联合分布 中,则 I-Map(独立映射)。

此时,联合分布可以因式分解为:


2. 条件概率分布

2.1 离散CPD

对于离散变量,CPD是一个条件概率表(CPT)

智力(I)成绩(G)P(G|I)
0.9
0.08
及格0.02
0.3
0.4
及格0.3

2.2 线性高斯CPD

对于连续变量:

2.3 条件高斯CPD

混合模型:连续父节点,离散子节点(或反之)。

2.4 逻辑CPD


3. 参数学习

3.1 最大似然估计(MLE)

给定完整数据 ,参数的对数似然:

对于贝叶斯网络:

3.2 贝叶斯估计

引入参数的先验分布

共轭先验

CPD类型共轭先验
伯努利Beta分布
多项式Dirichlet分布
高斯Normal-Wishart

3.3 实现:离散贝叶斯网络学习

import numpy as np
from collections import defaultdict
 
class DiscreteBayesianNetwork:
    """离散贝叶斯网络"""
    def __init__(self, structure, node_names, node_values):
        """
        structure: dict, 每个节点的父节点列表
        node_names: list of str, 变量名
        node_values: dict, 每个变量的取值数
        """
        self.structure = structure
        self.node_names = node_names
        self.node_values = node_values
        self.node_dims = {n: node_values[n] for n in node_names}
        
        self.cpds = {}  # 存储CPT
        self.node_to_idx = {n: i for i, n in enumerate(node_names)}
    
    def fit_mle(self, data):
        """
        data: (N, num_vars) 数据矩阵
        使用最大似然估计学习CPT
        """
        num_samples = len(data)
        
        for node in self.node_names:
            parents = self.structure[node]
            node_idx = self.node_to_idx[node]
            
            if len(parents) == 0:
                # 无父节点:计算边缘分布
                counts = np.zeros(self.node_dims[node])
                for sample in data:
                    counts[sample[node_idx]] += 1
                self.cpds[node] = counts / num_samples
            
            else:
                # 有父节点:计算条件分布
                parent_idx = [self.node_to_idx[p] for p in parents]
                
                # 父节点取值组合数
                parent_dim = np.prod([self.node_dims[p] for p in parents])
                
                # 初始化计数
                counts = np.zeros((parent_dim, self.node_dims[node]))
                parent_counts = np.zeros(parent_dim)
                
                for sample in data:
                    # 计算父节点取值组合的索引
                    parent_idx_val = 0
                    for j, p in enumerate(parents):
                        multiplier = np.prod([self.node_dims[parents[k]] 
                                           for k in range(j+1, len(parents))], dtype=int)
                        parent_idx_val += sample[parent_idx[j]] * multiplier
                    
                    parent_counts[parent_idx_val] += 1
                    counts[parent_idx_val, sample[node_idx]] += 1
                
                # 归一化得到条件概率
                self.cpds[node] = {}
                for i in range(parent_dim):
                    if parent_counts[i] > 0:
                        self.cpds[node][i] = counts[i] / parent_counts[i]
                    else:
                        self.cpds[node][i] = np.ones(self.node_dims[node]) / self.node_dims[node]
    
    def fit_bayesian(self, data, alpha=1.0):
        """
        贝叶斯估计(使用Dirichlet先验)
        alpha: 先验伪计数
        """
        num_samples = len(data)
        
        for node in self.node_names:
            parents = self.structure[node]
            node_idx = self.node_to_idx[node]
            
            if len(parents) == 0:
                counts = np.zeros(self.node_dims[node]) + alpha
                for sample in data:
                    counts[sample[node_idx]] += 1
                self.cpds[node] = counts / counts.sum()
            
            else:
                parent_idx = [self.node_to_idx[p] for p in parents]
                parent_dim = np.prod([self.node_dims[p] for p in parents])
                
                counts = np.zeros((parent_dim, self.node_dims[node])) + alpha
                
                for sample in data:
                    parent_idx_val = 0
                    for j, p in enumerate(parents):
                        multiplier = np.prod([self.node_dims[parents[k]] 
                                           for k in range(j+1, len(parents))], dtype=int)
                        parent_idx_val += sample[parent_idx[j]] * multiplier
                    
                    counts[parent_idx_val, sample[node_idx]] += 1
                
                self.cpds[node] = {}
                for i in range(parent_dim):
                    self.cpds[node][i] = counts[i] / counts[i].sum()
 
 
# 示例:学习学生成绩模型
def learn_student_model():
    # 定义结构: 智力→成绩, 难度→成绩, 难度→推荐信, 成绩→推荐信
    structure = {
        'I': [],           # 智力:无父节点
        'D': [],           # 难度:无父节点
        'G': ['I', 'D'],  # 成绩:父节点为智力和难度
        'L': ['D', 'G']   # 推荐信:父节点为难度和成绩
    }
    
    node_names = ['I', 'D', 'G', 'L']
    node_values = {
        'I': 2,  # 0=低, 1=高
        'D': 2,  # 0=简单, 1=困难
        'G': 3,  # 0=及格, 1=良, 2=优
        'L': 2   # 0=差, 1=好
    }
    
    # 生成模拟数据
    np.random.seed(42)
    n_samples = 1000
    
    # 真实参数
    P_I = [0.7, 0.3]  # P(I)
    P_D = [0.6, 0.4]  # P(D)
    
    # 生成数据
    data = []
    for _ in range(n_samples):
        I = np.random.choice(2, p=P_I)
        D = np.random.choice(2, p=P_D)
        
        # P(G|I,D)
        if I == 1 and D == 0:  # 高智力 + 简单
            G = np.random.choice(3, p=[0.02, 0.08, 0.90])
        elif I == 1 and D == 1:  # 高智力 + 困难
            G = np.random.choice(3, p=[0.10, 0.40, 0.50])
        elif I == 0 and D == 0:  # 低智力 + 简单
            G = np.random.choice(3, p=[0.30, 0.40, 0.30])
        else:  # 低智力 + 困难
            G = np.random.choice(3, p=[0.70, 0.25, 0.05])
        
        # P(L|G,D)
        if G == 2 and D == 0:  # 优成绩 + 简单
            L = np.random.choice(2, p=[0.1, 0.9])
        elif G == 2 and D == 1:
            L = np.random.choice(2, p=[0.2, 0.8])
        elif G == 1:
            L = np.random.choice(2, p=[0.4, 0.6])
        else:
            L = np.random.choice(2, p=[0.8, 0.2])
        
        data.append([I, D, G, L])
    
    data = np.array(data)
    
    # 学习网络
    bn = DiscreteBayesianNetwork(structure, node_names, node_values)
    bn.fit_mle(data)
    
    # 输出学习的CPT
    print("学习到的条件概率表:")
    print(f"P(I=高) = {bn.cpds['I'][1]:.3f}")
    print(f"P(D=困难) = {bn.cpds['D'][1]:.3f}")
    
    return bn

4. 推理方法

4.1 精确推理

变量消除(Variable Elimination)

通过逐步消除非查询变量来计算边缘概率:

class VariableElimination:
    """变量消除算法"""
    def __init__(self, bn):
        self.bn = bn
        self.factors = {}  # 存储因子
    
    def _create_factor(self, node, evidence):
        """为节点创建因子"""
        parents = self.bn.structure[node]
        node_idx = self.bn.node_to_idx[node]
        node_dim = self.bn.node_dims[node]
        
        if len(parents) == 0:
            # 无父节点:CPT就是边缘分布
            return self.bn.cpds[node]
        else:
            # 有父节点:CPT
            return self.bn.cpds[node]
    
    def query(self, query_var, evidence={}):
        """
        查询 P(query_var | evidence)
        """
        # 初始化:创建与证据一致的因子
        factors = []
        for node in self.bn.node_names:
            if node in evidence:
                # 证据节点:固定值
                continue
            factor = self._create_factor(node, evidence)
            factors.append((node, factor))
        
        # 变量消除顺序(简化版本:按出现顺序)
        elimination_order = [f[0] for f in factors if f[0] != query_var]
        
        # 逐步消除
        while elimination_order:
            var = elimination_order.pop(0)
            
            # 找到包含var的因子
            relevant_factors = []
            remaining_factors = []
            
            for node, factor in factors:
                if node == var:
                    relevant_factors.append((node, factor))
                else:
                    remaining_factors.append((node, factor))
            
            # 乘积
            product = relevant_factors[0][1].copy()
            for i in range(1, len(relevant_factors)):
                product = self._multiply(product, relevant_factors[i])
            
            # 边缘化
            marginalized = self._marginalize(product, var)
            factors = remaining_factors
            if marginalized is not None:
                factors.append((var, marginalized))
        
        # 乘积所有剩余因子
        result = list(factors[0][1])
        for i in range(1, len(factors)):
            result = self._multiply(result, factors[i][1])
        
        # 归一化
        result = np.array(result)
        result = result / result.sum()
        
        return result
    
    def _multiply(self, f1, f2):
        """因子乘法"""
        # 简化版本:假设因子维度相同
        return f1 * f2
    
    def _marginalize(self, factor, var):
        """因子边缘化(求和)"""
        return np.sum(factor)
 
 
# 示例查询
def query_example():
    bn = learn_student_model()
    
    ve = VariableElimination(bn)
    
    # 查询 P(G | I=高)
    # 即给定智力高,成绩分布
    result = ve.query('G', evidence={'I': 1})
    
    print("\n查询 P(G | I=高):")
    print(f"及格: {result[0]:.3f}")
    print(f"良: {result[1]:.3f}")
    print(f"优: {result[2]:.3f}")

信念传播(Belief Propagation)

对于树结构的贝叶斯网络,使用消息传递进行精确推理:

class BeliefPropagation:
    """信念传播算法(树结构)"""
    def __init__(self, bn):
        self.bn = bn
        self.messages = {}  # 存储消息
    
    def _compute_message(self, node, parent):
        """计算从node到parent的消息"""
        # 获取节点的CPT
        cpt = self.bn.cpds[node]
        
        # 收集其他子节点的消息
        children = self._get_children(node)
        other_messages = []
        for child in children:
            if child != parent:
                other_messages.append(self.messages.get((child, node), None))
        
        # 计算消息
        # 消息 = Σ_其他父节点 CPT × 子节点消息
        return cpt
    
    def _get_children(self, node):
        """获取节点的子节点"""
        children = []
        for n, parents in self.bn.structure.items():
            if node in parents:
                children.append(n)
        return children

4.2 近似推理

蒙特卡洛采样

class GibbsSampling:
    """吉布斯采样"""
    def __init__(self, bn, num_samples=10000):
        self.bn = bn
        self.num_samples = num_samples
    
    def sample(self, evidence={}):
        """
        吉布斯采样
        evidence: dict, 证据变量及其值
        """
        num_vars = len(self.bn.node_names)
        samples = []
        
        # 初始化
        current_state = {}
        for node in self.bn.node_names:
            if node in evidence:
                current_state[node] = evidence[node]
            else:
                # 随机初始化
                current_state[node] = np.random.randint(self.bn.node_dims[node])
        
        # 采样
        for _ in range(self.num_samples):
            for node in self.bn.node_names:
                if node in evidence:
                    continue
                
                # 计算条件分布 P(node | 其他变量)
                prob = self._compute_conditional(node, current_state, evidence)
                
                # 采样
                current_state[node] = np.random.choice(
                    self.bn.node_dims[node], p=prob
                )
            
            samples.append(current_state.copy())
        
        return samples
    
    def _compute_conditional(self, node, state, evidence):
        """计算条件概率分布"""
        parents = self.bn.structure[node]
        node_dim = self.bn.node_dims[node]
        
        prob = np.zeros(node_dim)
        
        for val in range(node_dim):
            # 计算联合概率的分子
            log_prob = 0
            
            # 节点的条件概率
            if len(parents) == 0:
                prob[val] = self.bn.cpds[node][val] if val == state.get(node, 0) else 0
            else:
                parent_vals = tuple(state[p] for p in parents)
                prob[val] = self.bn.cpds[node].get(parent_vals, np.ones(node_dim)/node_dim)[val]
        
        # 归一化
        if prob.sum() > 0:
            prob = prob / prob.sum()
        else:
            prob = np.ones(node_dim) / node_dim
        
        return prob
    
    def query(self, query_var, evidence={}):
        """查询 P(query_var | evidence)"""
        samples = self.sample(evidence)
        
        # 统计查询变量的分布
        counts = np.zeros(self.bn.node_dims[query_var])
        for sample in samples:
            counts[sample[query_var]] += 1
        
        return counts / len(samples)
 
 
# 示例:吉布斯采样查询
def gibbs_query_example():
    bn = learn_student_model()
    
    gibbs = GibbsSampling(bn, num_samples=10000)
    
    # 查询 P(G | I=高)
    result = gibbs.query('G', evidence={'I': 1})
    
    print("\n吉布斯采样查询 P(G | I=高):")
    print(f"及格: {result[0]:.3f}")
    print(f"良: {result[1]:.3f}")
    print(f"优: {result[2]:.3f}")

5. 与神经网络的结合

5.1 贝叶斯神经网络

将贝叶斯网络的理念引入神经网络:

class BayesianNeuralNetwork:
    """贝叶斯神经网络"""
    def __init__(self, layers):
        """
        layers: list of (input_dim, output_dim)
        """
        self.layers = layers
        self.priors = {}
        
        # 初始化先验(标准高斯)
        for i, (in_dim, out_dim) in enumerate(layers):
            self.priors[f'W_{i}'] = torch.zeros(out_dim, in_dim)
            self.priors[f'b_{i}'] = torch.zeros(out_dim)
    
    def fit_vi(self, X, y, num_iterations=1000):
        """
        变分推断训练
        """
        # 为每个权重定义变分分布
        self.variational_params = {}
        for i, (in_dim, out_dim) in enumerate(self.layers):
            self.variational_params[f'W_{i}_mean'] = nn.Parameter(torch.randn(out_dim, in_dim) * 0.1)
            self.variational_params[f'W_{i}_logvar'] = nn.Parameter(torch.zeros(out_dim, in_dim) - 1)
            self.variational_params[f'b_{i}_mean'] = nn.Parameter(torch.zeros(out_dim))
            self.variational_params[f'b_{i}_logvar'] = nn.Parameter(torch.zeros(out_dim) - 1)
        
        optimizer = torch.optim.Adam(self.variational_params.values(), lr=0.01)
        
        for iteration in range(num_iterations):
            optimizer.zero_grad()
            
            # 从变分分布采样
            kl_loss = 0
            for name, param in self.variational_params.items():
                mean = self.variational_params[name.replace('logvar', 'mean')]
                logvar = self.variational_params[name]
                kl_loss += -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
            
            # 前向传播(使用重参数化)
            h = X
            for i, (in_dim, out_dim) in enumerate(self.layers):
                W_mean = self.variational_params[f'W_{i}_mean']
                W_logvar = self.variational_params[f'W_{i}_logvar']
                b_mean = self.variational_params[f'b_{i}_mean']
                b_logvar = self.variational_params[f'b_{i}_logvar']
                
                # 重参数化采样
                W = W_mean + torch.randn_like(W_mean) * torch.exp(0.5 * W_logvar)
                b = b_mean + torch.randn_like(b_mean) * torch.exp(0.5 * b_logvar)
                
                h = h @ W.T + b
                if i < len(self.layers) - 1:
                    h = torch.relu(h)
            
            # 重建损失
            recon_loss = F.cross_entropy(h, y)
            
            # 总损失
            loss = recon_loss + kl_loss / len(X)
            
            loss.backward()
            optimizer.step()
            
            if iteration % 100 == 0:
                print(f"Iter {iteration}: Loss = {loss.item():.4f}")

5.2 贝叶斯图神经网络

class BayesianGraphNeuralNetwork(nn.Module):
    """贝叶斯GNN"""
    def __init__(self, in_channels, hidden_channels, out_channels):
        super().__init__()
        
        # 变分参数
        self.W1_mean = nn.Parameter(torch.randn(hidden_channels, in_channels))
        self.W1_logvar = nn.Parameter(torch.zeros(hidden_channels, in_channels) - 1)
        
        self.W2_mean = nn.Parameter(torch.randn(out_channels, hidden_channels))
        self.W2_logvar = nn.Parameter(torch.zeros(out_channels, hidden_channels) - 1)
    
    def forward(self, x, edge_index):
        # 采样权重
        W1 = self.W1_mean + torch.randn_like(self.W1_mean) * torch.exp(0.5 * self.W1_logvar)
        W2 = self.W2_mean + torch.randn_like(self.W2_mean) * torch.exp(0.5 * self.W2_logvar)
        
        # GNN前向传播
        h = x @ W1.T
        h = F.relu(self._propagate(h, edge_index))
        h = h @ W2.T
        
        return h
    
    def _propagate(self, h, edge_index):
        """邻居聚合"""
        out = torch.zeros_like(h)
        for i in range(edge_index.shape[1]):
            src, dst = edge_index[0, i], edge_index[1, i]
            out[dst] += h[src]
        return out
    
    def kl_divergence(self):
        """计算KL散度"""
        kl = 0
        for mean, logvar in [(self.W1_mean, self.W1_logvar), 
                            (self.W2_mean, self.W2_logvar)]:
            kl += -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
        return kl

6. 应用场景

6.1 诊断系统

症状 → 疾病诊断
[头痛] ──┐
[发烧] ──┼──→ [流感?] ──→ [治疗方案]
[咳嗽] ──┘

6.2 风险评估

# 信用风险评估
risk_structure = {
    '收入': [],
    '负债': [],
    '信用历史': [],
    '还款能力': ['收入', '负债'],
    '信用评分': ['信用历史', '还款能力'],
    '违约风险': ['还款能力', '信用评分']
}

6.3 因果发现

贝叶斯网络可用于从数据中发现因果结构:

class StructureLearning:
    """结构学习"""
    def __init__(self, data, max_parents=3):
        self.data = data
        self.max_parents = max_parents
        self.num_vars = data.shape[1]
    
    def score(self, structure):
        """使用BIC评分评估结构"""
        score = 0
        for node in range(self.num_vars):
            parents = structure.get(node, [])
            score += self._bic_score(node, parents)
        return score
    
    def _bic_score(self, node, parents):
        """BIC评分"""
        n = len(self.data)
        
        # 计算对数似然
        ll = 0
        # ... (计算条件对数似然)
        
        # BIC = LL - 0.5 * k * log(n)
        k = len(parents)  # 参数数
        bic = ll - 0.5 * k * np.log(n)
        
        return bic
    
    def hill_climbing(self):
        """爬山算法搜索最优结构"""
        current_structure = {i: [] for i in range(self.num_vars)}
        
        for _ in range(1000):
            best_score = self.score(current_structure)
            best_move = None
            
            # 尝试添加边
            for i in range(self.num_vars):
                for j in range(self.num_vars):
                    if i != j and j not in current_structure[i]:
                        if len(current_structure[i]) < self.max_parents:
                            new_structure = current_structure.copy()
                            new_structure[i] = new_structure[i] + [j]
                            new_score = self.score(new_structure)
                            
                            if new_score > best_score:
                                best_score = new_score
                                best_move = ('add', i, j)
            
            if best_move:
                if best_move[0] == 'add':
                    current_structure[best_move[1]].append(best_move[2])
            else:
                break
        
        return current_structure

7. 相关主题

主题描述
马尔可夫网络与CRF无向图模型
条件随机场序列标注的CRF
变分推断近似推理方法
因果推断基础因果建模

参考

Footnotes

  1. Koller & Friedman, “Probabilistic Graphical Models: Principles and Techniques”, MIT Press 2009