概述
知识图谱(Knowledge Graph, KG)以三元组的形式表示结构化知识,其中 表示头实体, 表示关系, 表示尾实体。1
Example: (Paris, located_in, France)
(Eiffel Tower, built_by, Gustave Eiffel)
(Deep Learning, subfield_of, Machine Learning)
知识图谱的核心任务
| 任务 | 描述 | 评估指标 |
|---|---|---|
| 链接预测 | 预测缺失的三元组 或 | MRR, Hits@K |
| 实体分类 | 预测实体的类型 | Accuracy |
| 三元组分类 | 判断给定的 是否正确 | Accuracy |
传统方法的局限性
早期知识图谱嵌入方法(如TransE、DistMult、ComplEx)将实体和关系映射到向量空间,但存在以下局限:
- 只利用一阶邻域:无法捕获实体周围的图结构
- 忽略关系类型异构性:不同关系可能有不同的邻域模式
- 难以处理归纳场景:新实体/关系泛化困难
GNN的引入解决了这些问题——通过消息传递聚合多跳邻域信息。
1. R-GCN:关系图卷积网络
1.1 核心思想
R-GCN(Relational Graph Convolutional Network)将GCN扩展到多关系图,为每种关系类型 维护独立的权重矩阵。2
1.2 前向传播公式
对于实体 在关系 下的表示:
其中:
- :关系 下实体 的邻居集合
- :归一化常数(如 )
- :关系 的权重矩阵
- :自环的权重矩阵
1.3 问题:参数爆炸
假设:
- 实体数:
- 关系类型数:(如Freebase有1500+种关系)
- 隐藏维度:
参数量:,这在大规模知识图谱中是不可接受的。
1.4 解决方案:基/分解权重
基分解(Basis Decomposition)
将 个关系矩阵分解为 个基础矩阵的线性组合()。
class BasisDecomposition(nn.Module):
"""基分解:W_r = sum_b a_rb * V_b"""
def __init__(self, num_relations, hidden_dim, num_bases):
super().__init__()
self.num_bases = num_bases
# B个基础变换
self.V = nn.ParameterList([
nn.Parameter(torch.randn(hidden_dim, hidden_dim))
for _ in range(num_bases)
])
# 关系到基的系数
self.a = nn.Parameter(torch.randn(num_relations, num_bases))
def forward(self, relation_id):
# 组合权重矩阵
W = torch.zeros_like(self.V[0])
for b in range(self.num_bases):
W += self.a[relation_id, b] * self.V[b]
return W块分解(Block Decomposition)
将权重矩阵分解为多个对角块:
1.5 PyTorch实现
import torch
import torch.nn as nn
import torch.nn.functional as F
class RGCNLayer(nn.Module):
"""R-GCN层实现"""
def __init__(self, in_channels, out_channels, num_relations, num_bases=8, activation=F.relu):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.num_relations = num_relations
self.num_bases = num_bases
self.activation = activation
# 基分解
self.bases = nn.Parameter(torch.randn(num_bases, in_channels, out_channels))
self.combs = nn.Parameter(torch.randn(num_relations, num_bases))
# 初始化
nn.init.xavier_uniform_(self.bases)
nn.init.xavier_uniform_(self.combs)
def forward(self, x, edge_index, edge_type):
"""
x: 节点特征 (N, in_channels)
edge_index: 边索引 (2, E)
edge_type: 边类型 (E,)
"""
N = x.shape[0]
out = torch.zeros(N, self.out_channels, device=x.device)
# 预计算关系权重矩阵
W_r = torch.einsum('rb,bij->rij', self.combs, self.bases) # (R, in, out)
# 按关系类型聚合
for r in range(self.num_relations):
# 找到关系类型为r的边
mask = edge_type == r
edges_r = edge_index[:, mask]
if edges_r.shape[1] == 0:
continue
src, dst = edges_r[0], edges_r[1]
# 聚合:h_dst += W_r @ h_src
out.index_add_(0, dst, F.linear(x[src], W_r[r]))
# 归一化
deg = torch.bincount(edge_index[1], minlength=N).float()
deg[deg == 0] = 1
out = out / deg.unsqueeze(-1)
if self.activation is not None:
out = self.activation(out)
return out
class RGCN(nn.Module):
"""完整的R-GCN模型"""
def __init__(self, num_nodes, num_relations, in_channels, hidden_channels, out_channels, num_layers=2):
super().__init__()
self.num_layers = num_layers
# 实体嵌入
self.embed = nn.Embedding(num_nodes, in_channels)
# R-GCN层
self.layers = nn.ModuleList()
self.layers.append(RGCNLayer(in_channels, hidden_channels, num_relations))
for _ in range(num_layers - 2):
self.layers.append(RGCNLayer(hidden_channels, hidden_channels, num_relations))
self.layers.append(RGCNLayer(hidden_channels, out_channels, num_relations, activation=None))
def forward(self, edge_index, edge_type):
h = self.embed.weight
for layer in self.layers:
h = layer(h, edge_index, edge_type)
return h2. CompGCN:组合操作的关系GNN
2.1 核心思想
CompGCN不仅处理实体嵌入,还同时学习关系嵌入,并使用组合操作来融合头尾实体的信息。3
2.2 三元组表示
对于三元组 ,定义其表示为:
其中 是组合操作, 是激活函数。
2.3 组合操作
| 操作 | 公式 | 特点 |
|---|---|---|
| Sub | 差分关系,如TransE | |
| Mult | 乘法关系,如DistMult | |
| Corr | 相关性度量 | |
| Circle | 拼接多操作 |
2.4 关系去偏
CompGCN还包含关系去偏(relation debiasing)机制,防止关系嵌入退化:
3. 知识图谱补全的GNN方法
3.1 链接预测任务
给定查询 或 ,预测缺失的实体。
评分函数设计
基于距离的评分:
(TransE风格)
基于语义匹配的评分:
(DistMult风格)
负采样
训练时需要对正样本 生成负样本:
- 头替换:,其中 随机替换
- 尾替换:,其中 随机替换
- 关系替换:,其中 随机替换
3.2 完整训练代码
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import RGCNConv
class KGCM(nn.Module):
"""知识图谱补全模型"""
def __init__(self, num_entities, num_relations, embedding_dim):
super().__init__()
self.num_entities = num_entities
self.num_relations = num_relations
self.embedding_dim = embedding_dim
# 实体嵌入
self.entity_embed = nn.Embedding(num_entities, embedding_dim)
# 关系嵌入
self.relation_embed = nn.Embedding(num_relations, embedding_dim)
# R-GCN编码器
self.rgcn1 = RGCNConv(embedding_dim, embedding_dim, num_relations)
self.rgcn2 = RGCNConv(embedding_dim, embedding_dim, num_relations)
# 评分函数
self.score_func = 'distmult' # 或 'transE'
# 初始化
nn.init.xavier_uniform_(self.entity_embed.weight)
nn.init.xavier_uniform_(self.relation_embed.weight)
def encode(self, edge_index, edge_type):
"""编码图结构"""
h = self.entity_embed.weight
# R-GCN前向传播
h = self.rgcn1(h, edge_index, edge_type)
h = F.relu(h)
h = F.dropout(h, p=0.3, training=self.training)
h = self.rgcn2(h, edge_index, edge_type)
return h
def score(self, h, r, t):
"""计算三元组分数"""
if self.score_func == 'distmult':
# DistMult: s = h^T R t
r_mat = self.relation_embed(r)
return torch.sum(h * (r_mat * t), dim=-1)
elif self.score_func == 'transE':
# TransE: s = -||h + r - t||
r_vec = self.relation_embed(r)
return -torch.norm(h + r_vec - t, dim=-1)
elif self.score_func == 'complex':
# ComplEx: s = Re(h * r * conjugate(t))
r_real, r_imag = self.relation_embed(r).chunk(2, dim=-1)
h_real, h_imag = h.chunk(2, dim=-1)
t_real, t_imag = t.chunk(2, dim=-1)
score_real = h_real * r_real * t_real + h_imag * r_real * t_imag
score_imag = h_real * r_imag * t_real + h_imag * r_imag * t_imag
return torch.sum(score_real + score_imag, dim=-1)
def forward(self, pos_edge_index, pos_edge_type, neg_edge_index, neg_edge_type):
"""
pos_edge_index: (2, num_pos) 正样本边
neg_edge_index: (2, num_neg) 负样本边
"""
# 编码实体
h = self.encode(pos_edge_index, pos_edge_type)
# 正样本分数
pos_h = h[pos_edge_index[0]]
pos_t = h[pos_edge_index[1]]
pos_scores = self.score(pos_h, pos_edge_type, pos_t)
# 负样本分数
neg_h = h[neg_edge_index[0]]
neg_t = h[neg_edge_index[1]]
neg_scores = self.score(neg_h, neg_edge_type, neg_t)
return pos_scores, neg_scores
def link_predict(self, h_idx, r_idx, k=10):
"""
给定 (h, r, ?) 预测尾实体
返回top-k候选
"""
h = self.entity_embed(h_idx)
r = self.relation_embed(r_idx)
# 计算所有候选的分数
all_t = self.entity_embed.weight
scores = self.score(h.expand_as(all_t), r.expand_as(all_t), all_t)
# 返回top-k
_, top_k_idx = torch.topk(scores, k)
return top_k_idx, scores[top_k_idx]
def train_kgc_model(model, edge_index, edge_type, optimizer, epochs=100, batch_size=1024):
"""训练知识图谱补全模型"""
model.train()
num_triples = edge_index.shape[1]
for epoch in range(epochs):
# 采样正样本
perm = torch.randperm(num_triples)[:batch_size]
pos_edge_index = edge_index[:, perm]
pos_edge_type = edge_type[perm]
# 生成负样本:随机替换头或尾
neg_edge_index = pos_edge_index.clone()
neg_edge_type = pos_edge_type.clone()
# 50%替换头,50%替换尾
mask = torch.rand(perm.shape[0]) < 0.5
num_entities = model.num_entities
head_corrupt = torch.randint(0, num_entities, (mask.sum().item(),))
tail_corrupt = torch.randint(0, num_entities, ((~mask).sum().item(),))
neg_edge_index[0][mask] = head_corrupt
neg_edge_index[1][~mask] = tail_corrupt
# 前向传播
optimizer.zero_grad()
pos_scores, neg_scores = model(pos_edge_index, pos_edge_type, neg_edge_index, neg_edge_type)
# 损失函数:margin ranking loss
margin = 1.0
loss = F.margin_ranking_loss(pos_scores, neg_scores, torch.ones_like(pos_scores), margin=margin)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Loss = {loss.item():.4f}")
return model4. 注意力机制在知识图谱中的应用
4.1 注意力GNN for KGC
将注意力机制引入知识图谱,为不同的三元组分配不同的重要性权重。4
class AttentionKGNNLayer(nn.Module):
"""带注意力的KGC层"""
def __init__(self, in_channels, out_channels, num_relations):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.num_relations = num_relations
# 关系嵌入
self.relation_embed = nn.Embedding(num_relations, in_channels)
# 注意力参数
self.W_q = nn.Linear(in_channels, out_channels)
self.W_k = nn.Linear(in_channels, out_channels)
self.W_v = nn.Linear(in_channels, out_channels)
# 关系特定的注意力
self.W_r_att = nn.Linear(in_channels, 1)
# 线性变换
self.W = nn.Linear(in_channels, out_channels)
nn.init.xavier_uniform_(self.relation_embed.weight)
def forward(self, h, edge_index, edge_type):
N = h.shape[0]
R = self.num_relations
# 关系嵌入
r_embed = self.relation_embed.weight # (R, in_channels)
# Query, Key, Value
q = self.W_q(h) # (N, out)
k = self.W_k(h) # (N, out)
v = self.W_v(h) # (N, out)
out = torch.zeros(N, self.out_channels, device=h.device)
for r in range(R):
# 找到关系类型为r的边
mask = edge_type == r
edges_r = edge_index[:, mask]
if edges_r.shape[1] == 0:
continue
src, dst = edges_r[0], edges_r[1]
# 计算注意力分数
# s = attention(h_src, h_dst, r)
r_vec = r_embed[r] # (in_channels,)
# 简化的注意力:q_dst + k_src + r_vec
att_scores = torch.sum(
q[dst] * (k[src] + r_vec), dim=-1
) # (E_r,)
# 归一化
att_weights = F.softmax(att_scores, dim=0)
# 加权聚合
out.index_add_(0, dst, F.linear(v[src] * r_vec, self.W) * att_weights.unsqueeze(-1))
return F.relu(out)4.2 关系感知注意力
超越固定的关系嵌入,使用可学习的注意力头来捕获复杂的关系模式:
5. 多跳推理
5.1 问题定义
知识图谱中的推理往往需要多跳路径:
Query: (Rome, ? , Italy)
Answer: capital_of
路径: Rome → located_in → Lazio → part_of → Italy
↓
capital_of ← (Rome, capital_of, Italy)
5.2 路径建模
基于RNN的路径推理
class PathRNN(nn.Module):
"""RNN建模推理路径"""
def __init__(self, embedding_dim, hidden_dim, num_relations):
super().__init__()
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.num_relations = num_relations
# 实体和关系嵌入
self.entity_embed = nn.Embedding(num_entities, embedding_dim)
self.relation_embed = nn.Embedding(num_relations, embedding_dim)
# RNN编码器
self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
# 分类器
self.classifier = nn.Linear(hidden_dim, num_entities)
def forward(self, start_entity, paths):
"""
start_entity: 起始实体ID
paths: 路径序列 [(relation, entity), ...]
"""
# 初始化RNN
h_0 = self.entity_embed(start_entity).unsqueeze(0) # (1, dim)
# 构建输入序列
seq_input = []
for r, e in paths:
r_emb = self.relation_embed(r)
e_emb = self.entity_embed(e)
seq_input.append(r_emb + e_emb) # 组合
seq_input = torch.stack(seq_input).unsqueeze(0) # (1, seq_len, dim)
# RNN编码
output, h_T = self.rnn(seq_input, h_0)
# 使用最终状态预测
logits = self.classifier(h_T.squeeze(0))
return logits5.3 注意力机制的多跳推理
class MultiHopAttention(nn.Module):
"""多跳注意力推理"""
def __init__(self, embedding_dim, num_heads=4):
super().__init__()
self.embedding_dim = embedding_dim
self.num_heads = num_heads
self.multihead_attn = nn.MultiheadAttention(embedding_dim, num_heads)
self.edge_proj = nn.Linear(embedding_dim * 2, embedding_dim)
def forward(self, query, keys, edge_features, num_hops=3):
"""
query: 查询向量
keys: 邻居实体
edge_features: 边特征(关系嵌入)
"""
h = query.unsqueeze(0) # (1, dim)
for hop in range(num_hops):
# 构建键值对
key_value_input = torch.cat([keys, edge_features], dim=-1)
k = v = self.edge_proj(key_value_input)
# 注意力
h, attn_weights = self.multihead_attn(h, k, v)
# 更新查询
query = h.squeeze(0)
return query, attn_weights6. 实践案例:FB15k-237链接预测
6.1 数据集
FB15k-237是Freebase的子集:
- 14,541个实体
- 237种关系
- 272,115个三元组
6.2 评估协议
对每个测试三元组 :
- 将 替换为所有其他实体,计算分数
- 排序得到真实尾实体的排名
- 报告 MRR (Mean Reciprocal Rank) 和 Hits@K
6.3 完整训练脚本
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.datasets import FB15k237
from torch_geometric.nn import RGCNConv, EntityClassification
def load_data():
"""加载FB15k-237数据"""
dataset = FB15k237(root='./data/FB15k237')
return dataset[0]
def train_and_evaluate():
data = load_data()
num_entities = data.num_nodes
num_relations = int(data.edge_type.max()) + 1
# 创建模型
model = KGCM(
num_entities=num_entities,
num_relations=num_relations,
embedding_dim=256
).to('cuda')
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 训练
edge_index = data.edge_index.to('cuda')
edge_type = data.edge_type.to('cuda')
for epoch in range(200):
model.train()
# 正样本
pos_edge_index = edge_index
pos_edge_type = edge_type
# 负样本
num_neg = edge_index.shape[1]
neg_edge_index = edge_index.clone()
neg_edge_type = edge_type.clone()
# 随机替换头
head_corrupt = torch.randint(0, num_entities, (num_neg // 2,)).to('cuda')
tail_corrupt = torch.randint(0, num_entities, (num_neg // 2,)).to('cuda')
neg_edge_index[0, :num_neg // 2] = head_corrupt
neg_edge_index[1, num_neg // 2:] = tail_corrupt
# 训练
optimizer.zero_grad()
pos_scores, neg_scores = model(
pos_edge_index, pos_edge_type,
neg_edge_index, neg_edge_type
)
loss = F.margin_ranking_loss(
pos_scores, neg_scores,
torch.ones_like(pos_scores),
margin=0.5
)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
if epoch % 20 == 0:
print(f"Epoch {epoch}: Loss = {loss.item():.4f}")
# 评估
model.eval()
with torch.no_grad():
h = model.encode(edge_index, edge_type)
# 简化评估:计算过滤后的MRR
# 实际应使用标准评估协议
hits_at_10 = evaluate(model, h, data.test_edge_index, data.test_edge_type)
print(f"Hits@10: {hits_at_10:.4f}")
def evaluate(model, entity_embed, test_edge_index, test_edge_type):
"""评估链接预测性能"""
num_entities = model.num_entities
ranks = []
hits_at_10 = 0
num_eval = 0
for i in range(test_edge_index.shape[1]):
h_idx = test_edge_index[0, i].item()
r_idx = test_edge_type[i].item()
t_idx = test_edge_index[1, i].item()
# 计算所有候选的分数
h = entity_embed[h_idx]
r = model.relation_embed(r_idx)
all_t = entity_embed
scores = torch.sum(
h.unsqueeze(0) * (r.unsqueeze(0) * all_t), dim=-1
)
# 过滤掉训练集中存在的三元组
true_score = scores[t_idx].item()
rank = (scores > true_score).sum().item() + 1
ranks.append(1.0 / rank)
if rank <= 10:
hits_at_10 += 1
num_eval += 1
if num_eval % 100 == 0:
break
mrr = sum(ranks) / len(ranks)
hits_at_10 /= num_eval
return hits_at_10
if __name__ == '__main__':
train_and_evaluate()7. 相关主题
参考
Footnotes
-
Bordes et al., “Translating Embeddings for Modeling Multi-relational Data”, NeurIPS 2013 ↩
-
Schlichtkrull et al., “Modeling Relational Data with Graph Convolutional Networks”, ESWC 2018 ↩
-
Vashishth et al., “Composition-based Multi-Relational Graph Neural Networks”, ICLR 2020 ↩
-
Nathani et al., “Learning Attention-based Embeddings for Relation Prediction in Knowledge Graphs”, ACL 2019 ↩