马尔可夫网络与条件随机场
马尔可夫网络(Markov Network)是无向图模型的代表,与贝叶斯网络(有向图模型)相对应。条件随机场(Conditional Random Field, CRF)是在序列标注等任务中广泛应用的判别式无向图模型。1
一、马尔可夫网络基础
定义与表示
马尔可夫网络(又称马尔可夫随机场,Markov Random Field, MRF)使用无向图表示随机变量之间的条件独立性:
X1 ──── X2
│╲ │╱
│ ╲ │ ╲
│ ╲X3────X4
│ ╱ ╱
│ ╱ ╱
X5 ──── X6
基本要素:
- 节点:随机变量
- 边:变量间的依赖关系
- 团:完全子图(所有节点两两相连)
与贝叶斯网络的对比
| 特性 | 贝叶斯网络(有向) | 马尔可夫网络(无向) |
|---|---|---|
| 边方向 | 有向边 | 无向边 |
| 分解形式 | 条件概率乘积 | 势函数乘积 |
| 适合建模 | 因果关系 | 相关关系/相互作用 |
| 推理难度 | 通常较易 | 通常较难 |
| 参数 | 条件概率表 | 团势函数(未归一化) |
联合概率分解
设图包含一组最大团 ,则联合分布可以分解为:
其中:
- 是团势函数(clique potential),非负
- 是配分函数(partition function)
注意:势函数没有概率解释,只表示变量间的兼容性。
二、Hammersley-Clifford 定理
定理内容
Hammersley-Clifford 定理建立了马尔可夫网络与吉布斯分布的等价性:
若正分布 满足马尔可夫独立性,则 可以表示为势函数的乘积形式(吉布斯分布)。
条件独立性
马尔可夫网络的三个基本条件独立性:
-
成对马尔可夫性:给定其他所有节点,两个非相邻节点条件独立
-
局部马尔可夫性:给定邻居,节点独立于所有非后代节点
-
全局马尔可夫性:给定分离集,两个节点子集条件独立
三种马尔可夫性在树结构上等价
树结构:任意两个条件独立性等价
A ─── B ─── C
A ⊥ C | B ✓(成对)
A ⊥ {非B} | B ✓(局部)
{A} ⊥ {C} | {B} ✓(全局)
三、常见马尔可夫网络模型
1. 成对马尔可夫网络
只包含一元势函数和二元势函数:
图像去噪模型:
import numpy as np
class ImageDenoisingMRF:
"""
图像去噪的马尔可夫网络模型
势函数设计:
- 一元势函数:鼓励像素接近观测值
- 二元势函数:鼓励相邻像素相似(平滑先验)
"""
def __init__(self, image, noise_std, smooth_weight):
self.image = image.astype(np.float32)
self.h, self.w = image.shape
self.noise_std = noise_std
self.smooth_weight = smooth_weight
def unary_potential(self, x, y):
"""一元势函数:数据保真项"""
obs = self.image[x, y]
return np.exp(-0.5 * (x - obs)**2 / self.noise_std**2)
def binary_potential(self, x1, y1, x2, y2):
"""二元势函数:平滑先验"""
# Potts模型:鼓励相同标签
diff = np.abs(self.labels[x1, y1] - self.labels[x2, y2])
return np.exp(-self.smooth_weight * diff)
def pairwise_mrf_energy(self, labels):
"""
计算MRF能量(用于优化)
E = Σ 数据项 + λ Σ 平滑项
"""
h, w = labels.shape
energy = 0.0
# 数据项
for i in range(h):
for j in range(w):
energy += (labels[i, j] - self.image[i, j])**2 / (2 * self.noise_std**2)
# 平滑项
for i in range(h):
for j in range(w - 1):
energy += self.smooth_weight * (labels[i, j] - labels[i, j+1])**2
if i < h - 1:
energy += self.smooth_weight * (labels[i, j] - labels[i+1, j])**2
return energy2. 条件随机场(CRF)
CRF是判别式模型,直接建模条件概率 :
其中 是特征函数。
3. 指数族分布
马尔可夫网络的势函数可以写成指数形式:
这使得 成为指数族分布,有良好的统计性质。
四、条件随机场(CRF)
CRF的基本思想
CRF由Lafferty等人于2001年提出,主要用于序列标注问题。2
核心思想:直接建模条件概率,避免了生成模型中归一化的问题。
线性链CRF
最常用的CRF形式是线性链CRF:
Y1 ──Y2── Y3 ──Y4── Y5
│ │ │
X1 X2 X3
联合特征包括:
- 当前位置的观测特征
- 标签转移特征
条件概率定义
其中配分函数:
特征函数设计
转移特征
def transition_feature(y_prev, y_curr, tag_to_idx):
"""标签转移特征"""
features = {}
for prev_tag in tag_to_idx:
for curr_tag in tag_to_idx:
if y_prev == prev_tag and y_curr == curr_tag:
features[f'T_{prev_tag}_{curr_tag}'] = 1.0
return features状态特征
def state_feature(y, x, t, word, tag_to_idx, word_dict):
"""观测特征(状态特征)"""
features = {}
# 当前词的特征
features['BOS'] = 1.0 if t == 0 else 0.0
features['EOS'] = 1.0 if t == len(x) - 1 else 0.0
# 词形特征
features[f'word={word}'] = 1.0
features[f'is_upper={word[0].isupper()}'] = 1.0 if word[0].isupper() else 0.0
features[f'is_digit={word.isdigit()}'] = 1.0 if word.isdigit() else 0.0
features[f'suffix={word[-3:]}'] = 1.0
features[f'prefix={word[:3]}'] = 1.0
return features五、CRF的参数学习
对数似然函数
给定训练集 :
展开:
梯度计算
对参数 求导:
其中 是特征在路径上的求和。
期望计算:前向-后向算法
def crf_gradient(words, tags, weights, tag_to_idx, word_to_idx):
"""
计算CRF的梯度
使用前向-后向算法高效计算期望
"""
T = len(words)
n_tags = len(tag_to_idx)
# 构建特征矩阵
features = []
for t in range(T):
feat = compute_features(words, t, tags, tag_to_idx, word_to_idx)
features.append(feat)
# 前向传递
alpha = np.zeros((T, n_tags))
alpha[0] = features[0]['initial'] + features[0]['state']
for t in range(1, T):
for j in range(n_tags):
# log-sum-exp 技巧避免数值溢出
scores = alpha[t-1] + features[t]['trans'][:, j] + features[t]['state'][j]
alpha[t, j] = np.logsumexp(scores)
# 后向传递
beta = np.zeros((T, n_tags))
beta[T-1] = 0.0 # 终止状态
for t in range(T-2, -1, -1):
for i in range(n_tags):
scores = features[t+1]['trans'][i, :] + features[t+1]['state'] + beta[t+1]
beta[t, i] = np.logsumexp(scores)
# 计算配分函数
Z = np.logsumexp(alpha[T-1])
# 计算梯度
gradient = {}
# 经验期望:特征在真实标签上的计数
for t in range(T):
feat_t = features[t]['total']
for fname, fval in feat_t.items():
gradient[fname] = gradient.get(fname, 0) + fval
# 模型期望:使用前向后向计算
for fname in gradient:
# 初始化期望计数
exp_count = np.zeros(T)
for t in range(T):
# P(y_t = tag | x)
p_y_t = np.exp(alpha[t] + beta[t] - Z)
# 累加期望
feat_val = features[t]['total'].get(fname, 0.0) if isinstance(
features[t]['total'], dict) else features[t]['state'][tag_to_idx.get(fname, 0)]
if isinstance(feat_val, dict):
exp_count[t] = sum(p_y_t[j] * feat_val.get(j, 0) for j in range(n_tags))
else:
exp_count[t] = p_y_t[tag_to_idx.get(fname, 0)] * feat_val
gradient[fname] -= np.sum(exp_count)
return gradient使用scikit-learn-crfsuite
import sklearn_crfsuite
from sklearn_crfsuite import metrics
# 准备训练数据
X_train = [sent2features(s) for s in train_sents]
y_train = [sent2labels(s) for s in train_sents]
# 训练CRF
crf = sklearn_crfsuite.CRF(
algorithm='lbfgs',
c1=0.1, # L1正则化
c2=0.1, # L2正则化
max_iterations=100,
all_possible_transitions=True
)
crf.fit(X_train, y_train)
# 预测
y_pred = crf.predict(X_test)六、CRF的推断
推断问题
给定观测 ,求最可能的标签序列:
Viterbi算法
def viterbi_decode(words, weights, tag_to_idx, word_to_idx):
"""
Viterbi算法求解最优路径
时间复杂度: O(T * K^2)
空间复杂度: O(T * K)
"""
T = len(words)
K = len(tag_to_idx)
idx_to_tag = {v: k for k, v in tag_to_idx.items()}
# 初始化
viterbi = np.zeros((T, K))
backpointer = np.zeros((T, K), dtype=int)
# t=0时刻
for j in range(K):
tag = idx_to_tag[j]
viterbi[0, j] = compute_initial_score(tag) + compute_state_score(tag, words[0], 0)
# 动态规划
for t in range(1, T):
for j in range(K):
curr_tag = idx_to_tag[j]
# 计算到每个前驱状态的得分
scores = viterbi[t-1] + np.array([
compute_trans_score(idx_to_tag[i], curr_tag) +
compute_state_score(curr_tag, words[t], t)
for i in range(K)
])
best_prev = np.argmax(scores)
viterbi[t, j] = scores[best_prev]
backpointer[t, j] = best_prev
# 回溯
best_path = np.zeros(T, dtype=int)
best_path[T-1] = np.argmax(viterbi[T-1])
for t in range(T-2, -1, -1):
best_path[t] = backpointer[t+1, best_path[t+1]]
return [idx_to_tag[i] for i in best_path]七、应用场景
1. 命名实体识别(NER)
def extract_features(sentence, i):
"""NER任务的特征提取"""
word = sentence[i]
features = {
'bias': 1.0,
'word.lower()': word.lower(),
'word[-3:]': word[-3:],
'word[-2:]': word[-2:],
'word.isupper()': word.isupper(),
'word.istitle()': word.istitle(),
'word.isdigit()': word.isdigit(),
'word.isalpha()': word.isalpha(),
'BOS': i == 0,
'EOS': i == len(sentence) - 1,
}
# 上下文特征
if i > 0:
word1 = sentence[i-1]
features.update({
'-1:word.lower()': word1.lower(),
'-1:word.istitle()': word1.istitle(),
})
else:
features['BOS'] = True
if i < len(sentence) - 1:
word1 = sentence[i+1]
features.update({
'+1:word.lower()': word1.lower(),
'+1:word.istitle()': word1.istitle(),
})
else:
features['EOS'] = True
return features
def sent2features(sent):
return [extract_features(sent, i) for i in range(len(sent))]
def sent2labels(sent):
return [label for token, postag, label in sent]2. 词性标注(POS Tagging)
# 特征模板
POS_FEATURES = {
# 词本身
'word': lambda w: w.lower(),
'suffix': lambda w: w[-2:] if len(w) > 2 else w,
'prefix': lambda w: w[:2] if len(w) > 2 else w,
'is_digit': lambda w: w.isdigit(),
'is_alpha': lambda w: w.isalpha(),
'is_upper': lambda w: w[0].isupper() if w else False,
# 上下文
'word-1': lambda s, i: s[i-1].lower() if i > 0 else '<BOS>',
'word+1': lambda s, i: s[i+1].lower() if i < len(s)-1 else '<EOS>',
}3. 图像分割
class ImageSegmentationCRF:
"""
图像分割的Dense CRF
使用全连接CRF进行像素级标注
"""
def __init__(self, unary_potentials, image, num_classes):
self.unary = unary_potentials # (H, W, K)
self.image = image # (H, W, 3)
self.K = num_classes
self.h, self.w = image.shape[:2]
def potts_model(self, label_i, label_j):
"""Potts势函数"""
return 1.0 if label_i != label_j else 0.0
def binary_potential(self, i, j, k, l):
"""二元势函数:高斯核加权"""
pos_i = np.array([i // self.w, i % self.w])
pos_j = np.array([j // self.w, j % self.w])
color_i = self.image[pos_i[0], pos_i[1]]
color_j = self.image[pos_j[0], pos_j[1]]
# 位置核
pos_diff = np.sum((pos_i - pos_j)**2)
pos_kernel = np.exp(-pos_diff / (2 * self.spatial_std**2))
# 颜色核
color_diff = np.sum((color_i - color_j)**2)
color_kernel = np.exp(-color_diff / (2, self.color_std**2))
return pos_kernel * color_kernel * self.potts_model(k, l)
def mean_field_approximation(self, Q, num_iter=10):
"""
均值场近似进行CRF推断
Q[k, i] ≈ P(label_i = k | 观测)
"""
for iteration in range(num_iter):
Q_new = np.zeros_like(Q)
for k in range(self.K):
# 一元势
log_unary = np.log(self.unary[:, :, k] + 1e-10)
# 二元势的期望
for l in range(self.K):
binary_weight = self.binary_potential_matrix(k, l)
binary_term = np.sum(binary_weight * Q[:, :, l], axis=2)
log_unary -= binary_weight
Q_new[:, :, k] = np.exp(log_unary - np.logaddexp.reduce(log_unary, axis=2))
Q = Q_new / Q_new.sum(axis=2, keepdims=True)
return Q4. 语音识别
CRF可用于音素序列识别,结合声学特征和语言学约束。
八、深度学习时代的CRF
CRF与神经网络的结合
在序列标注任务中,CRF常作为神经网络的最后一层:
输入序列 → BiLSTM编码器 → 发射分数 → CRF层 → 输出序列
↓
学习特征表示
class BiLSTM_CRF(nn.Module):
"""BiLSTM-CRF模型"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, tag_to_idx):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
batch_first=True, bidirectional=True)
self.hidden2tag = nn.Linear(hidden_dim, len(tag_to_idx))
# CRF参数
self.tag_to_idx = tag_to_idx
self.n_tags = len(tag_to_idx)
# 转移矩阵
self.transitions = nn.Parameter(torch.randn(self.n_tags, self.n_tags))
def forward(self, sentences, tags=None, mask=None):
"""
前向传播
Args:
sentences: (batch, seq_len)
tags: (batch, seq_len) - 训练时提供
mask: (batch, seq_len) - padding mask
"""
# 编码
embeddings = self.embedding(sentences)
lstm_out, _ = self.lstm(embeddings)
emissions = self.hidden2tag(lstm_out) # (batch, seq_len, n_tags)
if tags is None:
# 推理模式:使用Viterbi
return self._viterbi_decode(emissions, mask)
else:
# 训练模式:计算损失
return self._crf_loss(emissions, tags, mask)
def _viterbi_decode(self, emissions, mask):
"""Viterbi解码"""
batch_size, seq_len, n_tags = emissions.shape
viterbi = emissions[:, 0].clone()
backpointers = []
for t in range(1, seq_len):
# (batch, n_tags, 1) + (1, n_tags, n_tags) + (batch, 1, n_tags)
score = viterbi[:, :, None] + self.transitions[None, :, :] + emissions[:, t, None, :]
best_tags = score.argmax(dim=1)
viterbi = score.gather(1, best_tags.unsqueeze(1)).squeeze(1)
backpointers.append(best_tags)
# 回溯
best_path = torch.zeros(batch_size, seq_len, dtype=torch.long, device=emissions.device)
best_path[:, -1] = viterbi.argmax(dim=1)
for t in range(seq_len - 2, -1, -1):
best_path[:, t] = backpointers[t].gather(1, best_path[:, t+1].unsqueeze(1)).squeeze(1)
return best_path
def _crf_loss(self, emissions, tags, mask):
"""CRF损失:负对数似然"""
# 正向得分
score = self._forward_score(emissions, tags, mask)
# 配分函数
partition = self._partition_function(emissions, mask)
# 负对数似然
loss = partition - score
return loss.mean()
def _forward_score(self, emissions, tags, mask):
"""计算真实路径的得分"""
batch_size, seq_len, n_tags = emissions.shape
score = emissions[:, 0].gather(1, tags[:, 0].unsqueeze(1)).squeeze(1)
for t in range(1, seq_len):
transition_score = self.transitions[tags[:, t-1], tags[:, t]]
emission_score = emissions[:, t].gather(1, tags[:, t].unsqueeze(1)).squeeze(1)
score = score + (transition_score + emission_score) * mask[:, t]
return score
def _partition_function(self, emissions, mask):
"""计算配分函数(使用前向算法)"""
batch_size, seq_len, n_tags = emissions.shape
alpha = emissions[:, 0]
for t in range(1, seq_len):
alpha_t = []
for tag in range(n_tags):
# emission[:, t, tag] + transitions[:, tag] + alpha[:, tag]
scores = emissions[:, t, tag:tag+1] + self.transitions[tag:tag+1] + alpha
alpha_t.append(torch.logsumexp(scores, dim=1))
alpha = torch.stack(alpha_t, dim=1)
alpha = alpha * mask[:, t:t+1]
return torch.logsumexp(alpha, dim=1)BERT + CRF
现代NER系统常使用预训练语言模型 + CRF:
class BERT_CRF_NER(nn.Module):
"""BERT + CRF for Named Entity Recognition"""
def __init__(self, bert_model, num_labels):
super().__init__()
self.bert = bert_model
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(bert_model.config.hidden_size, num_labels)
self.crf = CRF(num_labels, batch_first=True)
def forward(self, input_ids, attention_mask, labels=None):
# BERT编码
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
sequence_output = self.dropout(outputs.last_hidden_state)
# 发射分数
emissions = self.classifier(sequence_output)
if labels is not None:
# 训练:返回CRF损失
loss = -self.crf(emissions, labels, mask=attention_mask.byte(), reduction='mean')
return loss
else:
# 推理:返回预测标签
return self.crf.decode(emissions, mask=attention_mask.byte())九、模型比较与选择
生成模型 vs 判别模型
| 特性 | 生成模型(NB, HMM) | 判别模型(CRF) |
|---|---|---|
| 建模对象 | $P(Y | |
| 计算复杂度 | 较低 | 较高 |
| 处理缺失特征 | 好 | 较差 |
| 特征工程 | 复杂 | 灵活 |
| 序列建模 | 适合 | 更适合 |
| 长距离依赖 | 困难 | 较容易 |
HMM vs CRF
| 特性 | HMM | CRF |
|---|---|---|
| 独立性假设 | 观测条件独立 | 可建模任意依赖 |
| 特征类型 | 概率 | 任意特征函数 |
| 发射概率 | 必须归一化 | 不需要 |
| 学习 | EM/极大似然 | 梯度下降 |
| 推理 | 前向后向/Viterbi | Viterbi |
何时使用CRF
✅ 适合使用CRF:
- 序列标注任务(NER、POS、Chunking)
- 需要建模标签之间的转移关系
- 观测特征丰富且复杂
- 需要全局一致的输出
❌ 不适合使用CRF:
- 简单分类问题
- 特征难以设计
- 计算资源受限
十、CRF的正则化与调参
超参数选择
from sklearn.model_selection import GridSearchCV
param_grid = {
'c1': [0.0, 0.1, 0.5, 1.0], # L1正则化
'c2': [0.0, 0.1, 0.5, 1.0], # L2正则化
'max_iterations': [100, 200, 500],
}
crf = sklearn_crfsuite.CRF()
grid_search = GridSearchCV(crf, param_grid, cv=5, scoring='f1_micro')
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")类别权重
处理类别不平衡:
crf = sklearn_crfsuite.CRF(
algorithm='lbfgs',
c1=0.1,
c2=0.1,
class_weight='balanced' # 自动平衡类别权重
)