马尔可夫网络与条件随机场

马尔可夫网络(Markov Network)是无向图模型的代表,与贝叶斯网络(有向图模型)相对应。条件随机场(Conditional Random Field, CRF)是在序列标注等任务中广泛应用的判别式无向图模型。1

一、马尔可夫网络基础

定义与表示

马尔可夫网络(又称马尔可夫随机场,Markov Random Field, MRF)使用无向图表示随机变量之间的条件独立性:

    X1 ──── X2
    │╲      │╱
    │  ╲    │  ╲
    │    ╲X3────X4
    │    ╱    ╱
    │  ╱    ╱
    X5 ──── X6

基本要素

  • 节点:随机变量
  • :变量间的依赖关系
  • :完全子图(所有节点两两相连)

与贝叶斯网络的对比

特性贝叶斯网络(有向)马尔可夫网络(无向)
边方向有向边无向边
分解形式条件概率乘积势函数乘积
适合建模因果关系相关关系/相互作用
推理难度通常较易通常较难
参数条件概率表团势函数(未归一化)

联合概率分解

设图包含一组最大团 ,则联合分布可以分解为:

其中:

  • 是团势函数(clique potential),非负
  • 配分函数(partition function)

注意:势函数没有概率解释,只表示变量间的兼容性。

二、Hammersley-Clifford 定理

定理内容

Hammersley-Clifford 定理建立了马尔可夫网络与吉布斯分布的等价性:

若正分布 满足马尔可夫独立性,则 可以表示为势函数的乘积形式(吉布斯分布)。

条件独立性

马尔可夫网络的三个基本条件独立性:

  1. 成对马尔可夫性:给定其他所有节点,两个非相邻节点条件独立

  2. 局部马尔可夫性:给定邻居,节点独立于所有非后代节点

  3. 全局马尔可夫性:给定分离集,两个节点子集条件独立

三种马尔可夫性在树结构上等价

树结构:任意两个条件独立性等价

   A ─── B ─── C
   
A ⊥ C | B  ✓(成对)
A ⊥ {非B} | B  ✓(局部)
{A} ⊥ {C} | {B}  ✓(全局)

三、常见马尔可夫网络模型

1. 成对马尔可夫网络

只包含一元势函数和二元势函数:

图像去噪模型

import numpy as np
 
class ImageDenoisingMRF:
    """
    图像去噪的马尔可夫网络模型
    
    势函数设计:
    - 一元势函数:鼓励像素接近观测值
    - 二元势函数:鼓励相邻像素相似(平滑先验)
    """
    
    def __init__(self, image, noise_std, smooth_weight):
        self.image = image.astype(np.float32)
        self.h, self.w = image.shape
        self.noise_std = noise_std
        self.smooth_weight = smooth_weight
    
    def unary_potential(self, x, y):
        """一元势函数:数据保真项"""
        obs = self.image[x, y]
        return np.exp(-0.5 * (x - obs)**2 / self.noise_std**2)
    
    def binary_potential(self, x1, y1, x2, y2):
        """二元势函数:平滑先验"""
        # Potts模型:鼓励相同标签
        diff = np.abs(self.labels[x1, y1] - self.labels[x2, y2])
        return np.exp(-self.smooth_weight * diff)
    
    def pairwise_mrf_energy(self, labels):
        """
        计算MRF能量(用于优化)
        
        E = Σ 数据项 + λ Σ 平滑项
        """
        h, w = labels.shape
        energy = 0.0
        
        # 数据项
        for i in range(h):
            for j in range(w):
                energy += (labels[i, j] - self.image[i, j])**2 / (2 * self.noise_std**2)
        
        # 平滑项
        for i in range(h):
            for j in range(w - 1):
                energy += self.smooth_weight * (labels[i, j] - labels[i, j+1])**2
            if i < h - 1:
                energy += self.smooth_weight * (labels[i, j] - labels[i+1, j])**2
        
        return energy

2. 条件随机场(CRF)

CRF是判别式模型,直接建模条件概率

其中 是特征函数。

3. 指数族分布

马尔可夫网络的势函数可以写成指数形式:

这使得 成为指数族分布,有良好的统计性质。

四、条件随机场(CRF)

CRF的基本思想

CRF由Lafferty等人于2001年提出,主要用于序列标注问题。2

核心思想:直接建模条件概率,避免了生成模型中归一化的问题。

线性链CRF

最常用的CRF形式是线性链CRF

Y1 ──Y2── Y3 ──Y4── Y5
 │         │         │
X1         X2         X3

联合特征包括:

  • 当前位置的观测特征
  • 标签转移特征

条件概率定义

其中配分函数:

特征函数设计

转移特征

def transition_feature(y_prev, y_curr, tag_to_idx):
    """标签转移特征"""
    features = {}
    for prev_tag in tag_to_idx:
        for curr_tag in tag_to_idx:
            if y_prev == prev_tag and y_curr == curr_tag:
                features[f'T_{prev_tag}_{curr_tag}'] = 1.0
    return features

状态特征

def state_feature(y, x, t, word, tag_to_idx, word_dict):
    """观测特征(状态特征)"""
    features = {}
    
    # 当前词的特征
    features['BOS'] = 1.0 if t == 0 else 0.0
    features['EOS'] = 1.0 if t == len(x) - 1 else 0.0
    
    # 词形特征
    features[f'word={word}'] = 1.0
    features[f'is_upper={word[0].isupper()}'] = 1.0 if word[0].isupper() else 0.0
    features[f'is_digit={word.isdigit()}'] = 1.0 if word.isdigit() else 0.0
    features[f'suffix={word[-3:]}'] = 1.0
    features[f'prefix={word[:3]}'] = 1.0
    
    return features

五、CRF的参数学习

对数似然函数

给定训练集

展开:

梯度计算

对参数 求导:

其中 是特征在路径上的求和。

期望计算:前向-后向算法

def crf_gradient(words, tags, weights, tag_to_idx, word_to_idx):
    """
    计算CRF的梯度
    
    使用前向-后向算法高效计算期望
    """
    T = len(words)
    n_tags = len(tag_to_idx)
    
    # 构建特征矩阵
    features = []
    for t in range(T):
        feat = compute_features(words, t, tags, tag_to_idx, word_to_idx)
        features.append(feat)
    
    # 前向传递
    alpha = np.zeros((T, n_tags))
    alpha[0] = features[0]['initial'] + features[0]['state']
    
    for t in range(1, T):
        for j in range(n_tags):
            # log-sum-exp 技巧避免数值溢出
            scores = alpha[t-1] + features[t]['trans'][:, j] + features[t]['state'][j]
            alpha[t, j] = np.logsumexp(scores)
    
    # 后向传递
    beta = np.zeros((T, n_tags))
    beta[T-1] = 0.0  # 终止状态
    
    for t in range(T-2, -1, -1):
        for i in range(n_tags):
            scores = features[t+1]['trans'][i, :] + features[t+1]['state'] + beta[t+1]
            beta[t, i] = np.logsumexp(scores)
    
    # 计算配分函数
    Z = np.logsumexp(alpha[T-1])
    
    # 计算梯度
    gradient = {}
    
    # 经验期望:特征在真实标签上的计数
    for t in range(T):
        feat_t = features[t]['total']
        for fname, fval in feat_t.items():
            gradient[fname] = gradient.get(fname, 0) + fval
    
    # 模型期望:使用前向后向计算
    for fname in gradient:
        # 初始化期望计数
        exp_count = np.zeros(T)
        
        for t in range(T):
            # P(y_t = tag | x)
            p_y_t = np.exp(alpha[t] + beta[t] - Z)
            
            # 累加期望
            feat_val = features[t]['total'].get(fname, 0.0) if isinstance(
                features[t]['total'], dict) else features[t]['state'][tag_to_idx.get(fname, 0)]
            if isinstance(feat_val, dict):
                exp_count[t] = sum(p_y_t[j] * feat_val.get(j, 0) for j in range(n_tags))
            else:
                exp_count[t] = p_y_t[tag_to_idx.get(fname, 0)] * feat_val
        
        gradient[fname] -= np.sum(exp_count)
    
    return gradient

使用scikit-learn-crfsuite

import sklearn_crfsuite
from sklearn_crfsuite import metrics
 
# 准备训练数据
X_train = [sent2features(s) for s in train_sents]
y_train = [sent2labels(s) for s in train_sents]
 
# 训练CRF
crf = sklearn_crfsuite.CRF(
    algorithm='lbfgs',
    c1=0.1,  # L1正则化
    c2=0.1,  # L2正则化
    max_iterations=100,
    all_possible_transitions=True
)
crf.fit(X_train, y_train)
 
# 预测
y_pred = crf.predict(X_test)

六、CRF的推断

推断问题

给定观测 ,求最可能的标签序列:

Viterbi算法

def viterbi_decode(words, weights, tag_to_idx, word_to_idx):
    """
    Viterbi算法求解最优路径
    
    时间复杂度: O(T * K^2)
    空间复杂度: O(T * K)
    """
    T = len(words)
    K = len(tag_to_idx)
    idx_to_tag = {v: k for k, v in tag_to_idx.items()}
    
    # 初始化
    viterbi = np.zeros((T, K))
    backpointer = np.zeros((T, K), dtype=int)
    
    # t=0时刻
    for j in range(K):
        tag = idx_to_tag[j]
        viterbi[0, j] = compute_initial_score(tag) + compute_state_score(tag, words[0], 0)
    
    # 动态规划
    for t in range(1, T):
        for j in range(K):
            curr_tag = idx_to_tag[j]
            
            # 计算到每个前驱状态的得分
            scores = viterbi[t-1] + np.array([
                compute_trans_score(idx_to_tag[i], curr_tag) + 
                compute_state_score(curr_tag, words[t], t)
                for i in range(K)
            ])
            
            best_prev = np.argmax(scores)
            viterbi[t, j] = scores[best_prev]
            backpointer[t, j] = best_prev
    
    # 回溯
    best_path = np.zeros(T, dtype=int)
    best_path[T-1] = np.argmax(viterbi[T-1])
    
    for t in range(T-2, -1, -1):
        best_path[t] = backpointer[t+1, best_path[t+1]]
    
    return [idx_to_tag[i] for i in best_path]

七、应用场景

1. 命名实体识别(NER)

def extract_features(sentence, i):
    """NER任务的特征提取"""
    word = sentence[i]
    
    features = {
        'bias': 1.0,
        'word.lower()': word.lower(),
        'word[-3:]': word[-3:],
        'word[-2:]': word[-2:],
        'word.isupper()': word.isupper(),
        'word.istitle()': word.istitle(),
        'word.isdigit()': word.isdigit(),
        'word.isalpha()': word.isalpha(),
        'BOS': i == 0,
        'EOS': i == len(sentence) - 1,
    }
    
    # 上下文特征
    if i > 0:
        word1 = sentence[i-1]
        features.update({
            '-1:word.lower()': word1.lower(),
            '-1:word.istitle()': word1.istitle(),
        })
    else:
        features['BOS'] = True
    
    if i < len(sentence) - 1:
        word1 = sentence[i+1]
        features.update({
            '+1:word.lower()': word1.lower(),
            '+1:word.istitle()': word1.istitle(),
        })
    else:
        features['EOS'] = True
    
    return features
 
def sent2features(sent):
    return [extract_features(sent, i) for i in range(len(sent))]
 
def sent2labels(sent):
    return [label for token, postag, label in sent]

2. 词性标注(POS Tagging)

# 特征模板
POS_FEATURES = {
    # 词本身
    'word': lambda w: w.lower(),
    'suffix': lambda w: w[-2:] if len(w) > 2 else w,
    'prefix': lambda w: w[:2] if len(w) > 2 else w,
    'is_digit': lambda w: w.isdigit(),
    'is_alpha': lambda w: w.isalpha(),
    'is_upper': lambda w: w[0].isupper() if w else False,
    
    # 上下文
    'word-1': lambda s, i: s[i-1].lower() if i > 0 else '<BOS>',
    'word+1': lambda s, i: s[i+1].lower() if i < len(s)-1 else '<EOS>',
}

3. 图像分割

class ImageSegmentationCRF:
    """
    图像分割的Dense CRF
    
    使用全连接CRF进行像素级标注
    """
    
    def __init__(self, unary_potentials, image, num_classes):
        self.unary = unary_potentials  # (H, W, K)
        self.image = image  # (H, W, 3)
        self.K = num_classes
        self.h, self.w = image.shape[:2]
    
    def potts_model(self, label_i, label_j):
        """Potts势函数"""
        return 1.0 if label_i != label_j else 0.0
    
    def binary_potential(self, i, j, k, l):
        """二元势函数:高斯核加权"""
        pos_i = np.array([i // self.w, i % self.w])
        pos_j = np.array([j // self.w, j % self.w])
        
        color_i = self.image[pos_i[0], pos_i[1]]
        color_j = self.image[pos_j[0], pos_j[1]]
        
        # 位置核
        pos_diff = np.sum((pos_i - pos_j)**2)
        pos_kernel = np.exp(-pos_diff / (2 * self.spatial_std**2))
        
        # 颜色核
        color_diff = np.sum((color_i - color_j)**2)
        color_kernel = np.exp(-color_diff / (2, self.color_std**2))
        
        return pos_kernel * color_kernel * self.potts_model(k, l)
    
    def mean_field_approximation(self, Q, num_iter=10):
        """
        均值场近似进行CRF推断
        
        Q[k, i] ≈ P(label_i = k | 观测)
        """
        for iteration in range(num_iter):
            Q_new = np.zeros_like(Q)
            
            for k in range(self.K):
                # 一元势
                log_unary = np.log(self.unary[:, :, k] + 1e-10)
                
                # 二元势的期望
                for l in range(self.K):
                    binary_weight = self.binary_potential_matrix(k, l)
                    binary_term = np.sum(binary_weight * Q[:, :, l], axis=2)
                    log_unary -= binary_weight
                
                Q_new[:, :, k] = np.exp(log_unary - np.logaddexp.reduce(log_unary, axis=2))
            
            Q = Q_new / Q_new.sum(axis=2, keepdims=True)
        
        return Q

4. 语音识别

CRF可用于音素序列识别,结合声学特征和语言学约束。

八、深度学习时代的CRF

CRF与神经网络的结合

在序列标注任务中,CRF常作为神经网络的最后一层:

输入序列 → BiLSTM编码器 → 发射分数 → CRF层 → 输出序列
                              ↓
                       学习特征表示
class BiLSTM_CRF(nn.Module):
    """BiLSTM-CRF模型"""
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tag_to_idx):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
                           batch_first=True, bidirectional=True)
        self.hidden2tag = nn.Linear(hidden_dim, len(tag_to_idx))
        
        # CRF参数
        self.tag_to_idx = tag_to_idx
        self.n_tags = len(tag_to_idx)
        
        # 转移矩阵
        self.transitions = nn.Parameter(torch.randn(self.n_tags, self.n_tags))
    
    def forward(self, sentences, tags=None, mask=None):
        """
        前向传播
        
        Args:
            sentences: (batch, seq_len)
            tags: (batch, seq_len) - 训练时提供
            mask: (batch, seq_len) - padding mask
        """
        # 编码
        embeddings = self.embedding(sentences)
        lstm_out, _ = self.lstm(embeddings)
        emissions = self.hidden2tag(lstm_out)  # (batch, seq_len, n_tags)
        
        if tags is None:
            # 推理模式:使用Viterbi
            return self._viterbi_decode(emissions, mask)
        else:
            # 训练模式:计算损失
            return self._crf_loss(emissions, tags, mask)
    
    def _viterbi_decode(self, emissions, mask):
        """Viterbi解码"""
        batch_size, seq_len, n_tags = emissions.shape
        
        viterbi = emissions[:, 0].clone()
        backpointers = []
        
        for t in range(1, seq_len):
            # (batch, n_tags, 1) + (1, n_tags, n_tags) + (batch, 1, n_tags)
            score = viterbi[:, :, None] + self.transitions[None, :, :] + emissions[:, t, None, :]
            
            best_tags = score.argmax(dim=1)
            viterbi = score.gather(1, best_tags.unsqueeze(1)).squeeze(1)
            backpointers.append(best_tags)
        
        # 回溯
        best_path = torch.zeros(batch_size, seq_len, dtype=torch.long, device=emissions.device)
        best_path[:, -1] = viterbi.argmax(dim=1)
        
        for t in range(seq_len - 2, -1, -1):
            best_path[:, t] = backpointers[t].gather(1, best_path[:, t+1].unsqueeze(1)).squeeze(1)
        
        return best_path
    
    def _crf_loss(self, emissions, tags, mask):
        """CRF损失:负对数似然"""
        # 正向得分
        score = self._forward_score(emissions, tags, mask)
        
        # 配分函数
        partition = self._partition_function(emissions, mask)
        
        # 负对数似然
        loss = partition - score
        
        return loss.mean()
    
    def _forward_score(self, emissions, tags, mask):
        """计算真实路径的得分"""
        batch_size, seq_len, n_tags = emissions.shape
        
        score = emissions[:, 0].gather(1, tags[:, 0].unsqueeze(1)).squeeze(1)
        
        for t in range(1, seq_len):
            transition_score = self.transitions[tags[:, t-1], tags[:, t]]
            emission_score = emissions[:, t].gather(1, tags[:, t].unsqueeze(1)).squeeze(1)
            score = score + (transition_score + emission_score) * mask[:, t]
        
        return score
    
    def _partition_function(self, emissions, mask):
        """计算配分函数(使用前向算法)"""
        batch_size, seq_len, n_tags = emissions.shape
        
        alpha = emissions[:, 0]
        
        for t in range(1, seq_len):
            alpha_t = []
            for tag in range(n_tags):
                # emission[:, t, tag] + transitions[:, tag] + alpha[:, tag]
                scores = emissions[:, t, tag:tag+1] + self.transitions[tag:tag+1] + alpha
                alpha_t.append(torch.logsumexp(scores, dim=1))
            
            alpha = torch.stack(alpha_t, dim=1)
            alpha = alpha * mask[:, t:t+1]
        
        return torch.logsumexp(alpha, dim=1)

BERT + CRF

现代NER系统常使用预训练语言模型 + CRF:

class BERT_CRF_NER(nn.Module):
    """BERT + CRF for Named Entity Recognition"""
    
    def __init__(self, bert_model, num_labels):
        super().__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(bert_model.config.hidden_size, num_labels)
        self.crf = CRF(num_labels, batch_first=True)
    
    def forward(self, input_ids, attention_mask, labels=None):
        # BERT编码
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = self.dropout(outputs.last_hidden_state)
        
        # 发射分数
        emissions = self.classifier(sequence_output)
        
        if labels is not None:
            # 训练:返回CRF损失
            loss = -self.crf(emissions, labels, mask=attention_mask.byte(), reduction='mean')
            return loss
        else:
            # 推理:返回预测标签
            return self.crf.decode(emissions, mask=attention_mask.byte())

九、模型比较与选择

生成模型 vs 判别模型

特性生成模型(NB, HMM)判别模型(CRF)
建模对象$P(Y
计算复杂度较低较高
处理缺失特征较差
特征工程复杂灵活
序列建模适合更适合
长距离依赖困难较容易

HMM vs CRF

特性HMMCRF
独立性假设观测条件独立可建模任意依赖
特征类型概率任意特征函数
发射概率必须归一化不需要
学习EM/极大似然梯度下降
推理前向后向/ViterbiViterbi

何时使用CRF

适合使用CRF

  • 序列标注任务(NER、POS、Chunking)
  • 需要建模标签之间的转移关系
  • 观测特征丰富且复杂
  • 需要全局一致的输出

不适合使用CRF

  • 简单分类问题
  • 特征难以设计
  • 计算资源受限

十、CRF的正则化与调参

超参数选择

from sklearn.model_selection import GridSearchCV
 
param_grid = {
    'c1': [0.0, 0.1, 0.5, 1.0],  # L1正则化
    'c2': [0.0, 0.1, 0.5, 1.0],  # L2正则化
    'max_iterations': [100, 200, 500],
}
 
crf = sklearn_crfsuite.CRF()
grid_search = GridSearchCV(crf, param_grid, cv=5, scoring='f1_micro')
grid_search.fit(X_train, y_train)
 
print(f"Best params: {grid_search.best_params_}")

类别权重

处理类别不平衡:

crf = sklearn_crfsuite.CRF(
    algorithm='lbfgs',
    c1=0.1,
    c2=0.1,
    class_weight='balanced'  # 自动平衡类别权重
)

参考

相关阅读

Footnotes

  1. Koller, D., & Friedman, N. (2009). Probabilistic Graphical Models: Principles and Techniques. MIT Press.

  2. Lafferty, J., McCallum, A., & Pereira, F. (2001). “Conditional Random Fields: Probabilistic Models for Segmenting and Labeling Sequence Data”. ICML.