概述
本文深入解析GraphRAG的核心技术组件,包括实体与关系抽取、社区检测算法、层次化索引结构以及查询处理机制。理解这些技术细节对于优化GraphRAG系统至关重要。
1. 实体与关系抽取(Entity & Relation Extraction)
1.1 抽取Pipeline
原始文本 → 分块 → LLM抽取 → 后处理 → 知识图谱
↓
实体消歧
关系去重
类型推断
1.2 实体抽取策略
GraphRAG采用LLM进行零样本实体抽取,支持多种实体类型:
# 实体类型定义
ENTITY_TYPES = {
"PERSON": "人物、组织中的个人",
"ORGANIZATION": "公司、政府机构、团队",
"LOCATION": "地点、地理实体",
"EVENT": "事件、会议、活动",
"PRODUCT": "产品、软件、服务",
"CONCEPT": "概念、理论、方法",
"DOCUMENT": "文档、报告、书籍",
"WORK_OF_ART": "艺术作品、创作"
}
# 实体抽取Prompt模板
ENTITY_EXTRACTION_TEMPLATE = """
你是一个信息抽取专家。从以下文本中抽取所有{entity_types}类型的实体。
要求:
1. 每个实体应具有明确的指代
2. 为每个实体提供简短描述
3. 使用最简洁的实体名称
输出格式(JSON数组):
[
{{"name": "实体名称", "type": "实体类型", "description": "一句话描述"}},
...
]
文本:
{chunk_text}
只输出JSON数组,不要包含其他内容。
"""1.3 关系抽取策略
# 关系抽取Prompt
RELATION_EXTRACTION_TEMPLATE = """
基于以下实体,抽取它们之间的关系。
实体列表:
{entities}
关系类型参考:
- WORKS_FOR: 工作于/隶属于
- LOCATED_IN: 位于
- PART_OF: 是...的一部分
- KNOWS: 认识/了解
- INTERACTS_WITH: 与...互动
- CAUSES: 导致/引起
- USES: 使用/利用
- PRODUCE: 生产/产生
- FOUNDED_BY: 由...创立
- RELATED_TO: 与...相关
输出格式(JSON数组):
[
{{"source": "实体A", "target": "实体B", "type": "关系类型", "description": "关系描述"}},
...
]
如果两个实体之间没有明确关系,不要包含在输出中。
"""1.4 实体消歧与合并
def entity_disambiguation(entities, similarity_threshold=0.85):
"""
实体消歧:合并指向同一真实实体的不同表述
策略:
1. 基于名称相似度(编辑距离、语义相似度)
2. 基于描述语义相似度
3. 基于共现关系
"""
from sklearn.metrics.pairwise import cosine_similarity
# 计算名称和描述的嵌入相似度
entity_embeddings = embed_entities(entities)
similarity_matrix = cosine_similarity(entity_embeddings)
# 构建合并图
merge_graph = nx.Graph()
merge_graph.add_nodes_from(range(len(entities)))
for i, j in combinations(range(len(entities)), 2):
if similarity_matrix[i, j] > similarity_threshold:
merge_graph.add_edge(i, j)
# 获取连通分量作为合并组
merged_groups = list(nx.connected_components(merge_graph))
# 生成合并后的实体
merged_entities = []
for group in merged_groups:
representative = choose_representative(entities, group)
merged_entities.append(representative)
return merged_entities
def choose_representative(entities, group):
"""选择最佳代表实体"""
# 优先选择:最具体名称、最长描述、首次出现
group_entities = [entities[i] for i in group]
# 按名称长度降序排序
group_entities.sort(key=lambda e: len(e["name"]), reverse=True)
return group_entities[0]2. 社区检测算法(Community Detection)
2.1 为什么需要社区检测
原始图谱: 社区划分后:
A ─ B ─ C ┌─────────────┐
│ │ │ │ A ─ B ─ C │ 社区1
D E F │ ↘ │
│ │ │ │ 社区2 │
G ─ H ─ I └─────────────┘
识别的社区结构使得:
1. 快速定位查询相关的实体群
2. 社区级别聚合提供高层语义
3. 支持层次化查询策略
2.2 Leiden算法原理
GraphRAG使用Leiden算法进行社区检测,该算法是Louvain算法的改进版:
def leiden_community_detection(graph, resolution=1.0):
"""
Leiden算法社区检测
核心思想:
1. 局部移动优化(类似Louvain)
2. 非重叠分区确保高质量
3. 快速迭代到稳定状态
Modularity公式:
Q = (1/2m) Σ_ij [A_ij - (k_i * k_j) / 2m] δ(c_i, c_j)
其中:
- A_ij: 边权重
- k_i, k_j: 节点度数
- m: 总边数
- δ: 指示函数
"""
import networkx as nx
# 初始化:每个节点独立社区
partition = {node: i for i, node in enumerate(graph.nodes())}
# 迭代优化
improved = True
while improved:
improved = local_move_phase(graph, partition, resolution)
if improved:
partition = non_overlapping_refinement(graph, partition)
return partition
def local_move_phase(graph, partition, resolution):
"""
局部移动阶段:将节点移动到能最大化模块度的社区
"""
import random
improved = False
nodes = list(graph.nodes())
random.shuffle(nodes)
for node in nodes:
current_comm = partition[node]
best_comm = current_comm
best_gain = 0
# 获取邻居社区
neighbor_comms = set(
partition[neighbor]
for neighbor in graph.neighbors(node)
)
neighbor_comms.add(current_comm)
# 计算移动到每个邻居社区的模块度增益
for comm in neighbor_comms:
gain = compute_modularity_gain(graph, partition, node, comm, resolution)
if gain > best_gain:
best_gain = gain
best_comm = comm
# 如果有正增益,执行移动
if best_gain > 0:
partition[node] = best_comm
improved = True
return improved
def non_overlapping_refinement(graph, partition):
"""
非重叠精炼阶段:移除低质量社区
"""
# 统计每个社区的规模
comm_sizes = {}
for node, comm in partition.items():
comm_sizes[comm] = comm_sizes.get(comm, 0) + 1
# 过滤小社区(可配置阈值)
min_size = 3
small_comms = {c for c, size in comm_sizes.items() if size < min_size}
# 将小社区节点合并到最近的邻居社区
for node, comm in partition.items():
if comm in small_comms:
neighbors = list(graph.neighbors(node))
if neighbors:
partition[node] = partition[neighbors[0]]
return partition2.3 层次化社区结构
def build_hierarchical_communities(graph, partition, max_levels=3):
"""
构建层次化社区结构
层级结构示例:
Level 0: 基础实体
Level 1: 小型社区(3-10个实体)
Level 2: 中型社区(10-50个实体)
Level 3: 大型社区(50+个实体)
"""
from collections import defaultdict
# 按社区组织节点
communities = defaultdict(list)
for node, comm in partition.items():
communities[comm].append(node)
# 构建树形结构
hierarchy = {
"level_0": list(graph.nodes()), # 所有实体
"level_1": {}, # 社区ID -> 实体列表
}
# 根据规模分配层级
for comm_id, nodes in communities.items():
size = len(nodes)
if size > 100:
level = 3
elif size > 30:
level = 2
elif size > 10:
level = 1
else:
level = 1
if f"level_{level}" not in hierarchy:
hierarchy[f"level_{level}"] = {}
hierarchy[f"level_{level}"][comm_id] = nodes
return hierarchy3. 社区摘要生成(Community Summarization)
3.1 摘要生成策略
# 分层摘要生成
def generate_hierarchical_summaries(hierarchy, graph, llm):
"""
为层次化社区结构生成摘要
"""
summaries = {}
for level_name, communities in hierarchy.items():
if level_name == "level_0":
continue
level_summaries = {}
for comm_id, nodes in communities.items():
# 获取社区内的实体和关系
subgraph = graph.subgraph(nodes)
entities = extract_entities_from_nodes(subgraph)
relationships = extract_relationships_from_edges(subgraph)
# 生成社区摘要
summary = generate_single_summary(
entities,
relationships,
llm
)
level_summaries[comm_id] = summary
summaries[level_name] = level_summaries
return summaries
# 社区摘要Prompt
COMMUNITY_SUMMARY_PROMPT = """
作为知识图谱分析专家,请为以下社区生成全面的摘要。
社区成员(实体):
{entities}
社区关系:
{relationships}
请按以下结构生成摘要:
1. **核心主题**(1句话):这个社区主要关于什么?
2. **关键实体**(3-5个):社区中最重要的实体是谁/什么?
3. **关系模式**(2-3句):这些实体之间如何关联?有什么重要模式?
4. **重要事实**(3-5点):这个社区传达的关键信息是什么?
请确保摘要简洁、信息丰富,便于后续查询使用。
"""3.2 Map-Reduce摘要聚合
def map_reduce_summarize(summaries, query, llm, max_tokens=8000):
"""
Map-Reduce策略聚合多个社区摘要
Map阶段:每个社区摘要独立处理
Reduce阶段:综合中间结果
"""
# ===== MAP阶段 =====
map_results = []
for comm_id, summary in summaries.items():
prompt = f"""
社区摘要:
{summary}
查询:{query}
任务:判断此社区摘要是否与查询相关。
- 如果相关:提取与查询最相关的2-3个要点
- 如果不相关:回答"不相关"
回答:
"""
result = llm.generate(prompt)
map_results.append({
"community_id": comm_id,
"result": result
})
# 过滤不相关结果
relevant_results = [
r for r in map_results
if "不相关" not in r["result"]
]
# ===== REDUCE阶段 =====
if not relevant_results:
return "没有找到与查询相关的社区信息。"
# 聚合所有相关要点
combined_points = "\n".join([
f"- {r['result']}" for r in relevant_results
])
reduce_prompt = f"""
以下是各社区中与您查询相关的信息:
{combined_points}
查询:{query}
请综合以上信息,给出一个全面、连贯的回答。
"""
final_answer = llm.generate(reduce_prompt, max_tokens=max_tokens)
return final_answer4. 图谱索引与存储
4.1 多级索引结构
class GraphRAGIndex:
"""
GraphRAG多级索引结构
"""
def __init__(self):
# Level 0: 原始实体和关系
self.entity_index = {} # entity_id -> entity
self.relation_index = {} # relation_id -> relation
# Level 1-N: 层次化社区
self.community_index = {} # level -> {community_id -> members}
self.community_summaries = {} # level -> {community_id -> summary}
# 辅助索引
self.entity_to_communities = {} # 实体所属社区
self.community_hierarchy = {} # 社区层级关系
def add_entity(self, entity):
"""添加实体"""
self.entity_index[entity["id"]] = entity
# 更新社区映射
comm_id = self.assign_to_community(entity["id"])
if comm_id not in self.entity_to_communities:
self.entity_to_communities[comm_id] = []
self.entity_to_communities[comm_id].append(entity["id"])
def add_relationship(self, relation):
"""添加关系"""
self.relation_index[relation["id"]] = relation
def build_community_summaries(self, llm):
"""为所有社区生成摘要"""
for level, communities in self.community_index.items():
for comm_id, members in communities.items():
summary = self._generate_community_summary(
members, llm
)
self.community_summaries.setdefault(level, {})[comm_id] = summary
def get_entity_neighborhood(self, entity_id, depth=2):
"""获取实体的邻居图"""
neighbors = {entity_id}
current_level = {entity_id}
for _ in range(depth):
next_level = set()
for eid in current_level:
next_level.update(self._get_neighbors(eid))
neighbors.update(next_level)
current_level = next_level - neighbors
return self._build_subgraph(neighbors)
def get_community_context(self, community_id, level):
"""获取社区的上下文信息"""
members = self.community_index[level][community_id]
summary = self.community_summaries[level][community_id]
# 收集成员实体的详细信息
entities = [self.entity_index[m] for m in members]
# 收集社区内关系
relations = [
r for rid, r in self.relation_index.items()
if r["source"] in members and r["target"] in members
]
return {
"summary": summary,
"entities": entities,
"relations": relations
}4.2 图数据库存储
# 使用Neo4j存储GraphRAG图谱
def store_in_neo4j(graph_index, neo4j_connection):
"""
将GraphRAG索引存储到Neo4j图数据库
"""
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
neo4j_connection["uri"],
auth=(neo4j_connection["user"], neo4j_connection["password"])
)
with driver.session() as session:
# 创建实体节点
for entity_id, entity in graph_index.entity_index.items():
session.run("""
MERGE (e:Entity {id: $id})
SET e.name = $name,
e.type = $type,
e.description = $description
""", **entity)
# 创建关系
for rel_id, relation in graph_index.relation_index.items():
session.run("""
MATCH (s:Entity {id: $source})
MATCH (t:Entity {id: $target})
MERGE (s)-[r:RELATES_TO {id: $id}]->(t)
SET r.description = $description,
r.type = $type
""", **relation)
# 创建社区索引(使用节点属性)
for level, communities in graph_index.community_index.items():
for comm_id, members in communities.items():
for member in members:
session.run("""
MATCH (e:Entity {id: $member})
SET e.community_level_$level = $comm_id
""", member=member, level=level, comm_id=comm_id)
driver.close()5. 查询处理优化
5.1 实体识别与链接
def entity_linking(query, graph_index, llm):
"""
将查询中的实体引用链接到图谱中的实体
"""
# 步骤1:使用LLM识别查询中的实体
entity_prompt = f"""
从以下查询中识别实体:
查询:{query}
已知实体类型:{list(ENTITY_TYPES.keys())}
输出格式(JSON):
{{"entities": [{"name": "实体名", "type": "类型"}, ...]}}
只输出JSON。
"""
response = llm.generate(entity_prompt)
query_entities = parse_json(response)["entities"]
# 步骤2:链接到图谱实体
linked_entities = []
for qe in query_entities:
# 语义相似度匹配
candidates = find_similar_entities(
qe["name"],
graph_index.entity_index,
top_k=5
)
# 选择最佳匹配
if candidates:
best_match = candidates[0]
linked_entities.append({
"query_entity": qe,
"graph_entity": best_match,
"confidence": best_match["similarity"]
})
return linked_entities
def find_similar_entities(query, entities, top_k=5):
"""基于嵌入相似度找到相似实体"""
query_embedding = embed_text(query)
entity_embeddings = {
eid: embed_text(e["name"] + " " + e.get("description", ""))
for eid, e in entities.items()
}
similarities = {
eid: cosine_similarity(query_embedding, emb)
for eid, emb in entity_embeddings.items()
}
sorted_entities = sorted(
similarities.items(),
key=lambda x: x[1],
reverse=True
)
return [
{"entity": entities[eid], "similarity": sim}
for eid, sim in sorted_entities[:top_k]
]5.2 自适应查询路由
def adaptive_query_routing(query, graph_index, llm):
"""
根据查询类型自适应选择搜索策略
"""
# 查询分类Prompt
classify_prompt = f"""
分析以下查询,确定查询类型:
查询:{query}
类型定义:
1. "local": 需要特定实体及其邻域信息的查询
- 例如:"XX公司在哪里?"
- 例如:"YY项目和ZZ有什么关系?"
2. "global": 需要综合全文/多文档信息的查询
- 例如:"公司今年的战略方向是什么?"
- 例如:"这个领域的主要趋势有哪些?"
3. "hybrid": 同时需要局部和全局信息的查询
- 例如:"XX项目的进展如何?它与公司战略的关系?"
只回答:local, global, 或 hybrid
"""
query_type = llm.generate(classify_prompt).strip().lower()
if query_type == "local":
return local_search_with_context(query, graph_index, llm)
elif query_type == "global":
return global_search_with_context(query, graph_index, llm)
else: # hybrid
# 先执行本地搜索,再执行全局搜索
local_result = local_search_with_context(query, graph_index, llm)
global_result = global_search_with_context(query, graph_index, llm)
return synthesize_hybrid_result(
query, local_result, global_result, llm
)6. 性能优化
6.1 增量索引
class IncrementalGraphRAG:
"""
支持增量更新的GraphRAG
"""
def __init__(self, base_index):
self.base_index = base_index
self.pending_updates = []
self.last_update_time = datetime.now()
def add_document(self, document):
"""增量添加文档"""
# 快速抽取新实体
new_entities, new_relations = extract_entities_fast(document)
# 检测与现有图谱的连接
connected_entities = []
isolated_entities = []
for entity in new_entities:
if self._has_connection(entity, self.base_index):
connected_entities.append(entity)
else:
isolated_entities.append(entity)
# 优先处理有连接的实体
self.pending_updates.extend(connected_entities)
# 孤立实体批处理
if len(isolated_entities) > 100:
self._process_batch(isolated_entities)
else:
self.pending_updates.extend(isolated_entities)
def flush(self, llm):
"""批量处理待更新实体"""
if not self.pending_updates:
return
# 批量更新到图谱
self._update_graph(self.pending_updates, llm)
# 重新运行社区检测
self._recompute_communities()
# 更新社区摘要
self._update_summaries(llm)
# 清空待处理队列
self.pending_updates = []
self.last_update_time = datetime.now()6.2 缓存策略
class GraphRAGCache:
"""GraphRAG查询缓存"""
def __init__(self, max_size=1000):
self.cache = {}
self.query_embeddings = {}
self.max_size = max_size
def get(self, query):
"""语义缓存查找"""
query_embedding = embed_text(query)
# 查找相似查询
for cached_query, cached_result in self.cache.items():
similarity = cosine_similarity(
query_embedding,
self.query_embeddings[cached_query]
)
if similarity > 0.95: # 高相似度阈值
return cached_result
return None
def put(self, query, result):
"""缓存查询结果"""
if len(self.cache) >= self.max_size:
# LRU淘汰
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
del self.query_embeddings[oldest_key]
self.cache[query] = result
self.query_embeddings[query] = embed_text(query)7. 评估指标
7.1 索引质量指标
def evaluate_index_quality(graph_index):
"""评估图谱索引质量"""
metrics = {}
# 实体数量
metrics["num_entities"] = len(graph_index.entity_index)
# 关系数量
metrics["num_relations"] = len(graph_index.relation_index)
# 社区数量
metrics["num_communities"] = sum(
len(communities)
for communities in graph_index.community_index.values()
)
# 平均社区规模
all_communities = [
members
for communities in graph_index.community_index.values()
for members in communities.values()
]
metrics["avg_community_size"] = np.mean([
len(c) for c in all_communities
])
# 社区摘要覆盖率
summary_count = sum(
len(summaries)
for summaries in graph_index.community_summaries.values()
)
total_communities = sum(
len(communities)
for communities in graph_index.community_index.values()
)
metrics["summary_coverage"] = summary_count / max(total_communities, 1)
# 图谱密度
max_possible_edges = len(graph_index.entity_index) * (len(graph_index.entity_index) - 1) / 2
actual_edges = len(graph_index.relation_index)
metrics["graph_density"] = actual_edges / max_possible_edges if max_possible_edges > 0 else 0
return metrics7.2 查询质量指标
def evaluate_query_quality(queries, ground_truth, graph_index, llm):
"""评估查询处理质量"""
results = []
for query, gt in zip(queries, ground_truth):
# 执行查询
answer = graph_index.query(query, llm)
# 计算指标
relevance = compute_relevance(answer, gt)
faithfulness = compute_faithfulness(answer, graph_index)
completeness = compute_completeness(answer, gt)
results.append({
"query": query,
"answer": answer,
"relevance": relevance,
"faithfulness": faithfulness,
"completeness": completeness
})
return {
"avg_relevance": np.mean([r["relevance"] for r in results]),
"avg_faithfulness": np.mean([r["faithfulness"] for r in results]),
"avg_completeness": np.mean([r["completeness"] for r in results])
}8. 实践案例
8.1 企业文档分析系统
class EnterpriseDocumentAnalyzer:
"""
企业文档GraphRAG分析系统
"""
def __init__(self):
self.graph_index = GraphRAGIndex()
self.config = {
"chunk_size": 400,
"entity_types": ["PERSON", "ORGANIZATION", "PROJECT", "METRIC", "DOCUMENT"],
"community_resolution": 1.0,
"max_levels": 3
}
def build_index(self, documents, llm):
"""构建企业文档图谱索引"""
# 阶段1:实体和关系抽取
all_entities = []
all_relations = []
for doc in documents:
chunks = self._chunk_document(doc)
for chunk in chunks:
entities, relations = self._extract_entities_relations(
chunk, llm
)
all_entities.extend(entities)
all_relations.extend(relations)
# 阶段2:实体消歧
merged_entities = self._disambiguate_entities(all_entities)
# 阶段3:构建图谱
self.graph_index = self._build_graph(
merged_entities,
all_relations
)
# 阶段4:社区检测
communities = self._detect_communities(self.graph_index)
# 阶段5:生成摘要
self.graph_index = self._generate_summaries(communities, llm)
return self.graph_index
def query(self, question, llm):
"""处理自然语言查询"""
return adaptive_query_routing(
question,
self.graph_index,
llm
)9. 总结
GraphRAG通过以下技术创新实现了从局部到全局的语义理解:
| 技术组件 | 核心创新 | 解决的问题 |
|---|---|---|
| 实体抽取 | LLM零样本抽取 | 自动构建知识图谱 |
| 社区检测 | Leiden层次化社区 | 结构化聚合语义 |
| 社区摘要 | 分层摘要生成 | 高效全局理解 |
| 查询路由 | 自适应策略选择 | 匹配查询类型 |
| 增量索引 | 批处理+缓存 | 生产环境可用性 |
这些组件共同构成了一个完整的知识图谱增强RAG系统,能够有效处理复杂的关系推理和全局性查询任务。