LLM推理优化技术综述
1. 概述
LLM推理优化是一个多层次、多维度的问题,涉及算法、系统、硬件等多个方面。
优化全景图
┌─────────────────────────────────────────────────────────────┐
│ LLM推理优化体系 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 算法级优化 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │注意力 │ │解码策略 │ │量化 │ │剪枝 │ │ │
│ │ │优化 │ │优化 │ │ │ │ │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 系统级优化 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │批处理 │ │内存管理 │ │并行策略 │ │算子融合 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 硬件协同设计 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │专用芯片 │ │内存层次 │ │计算精度 │ │互连带宽 │ │ │
│ │ │ │ │优化 │ │定制 │ │利用 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2. 算法级优化
2.1 量化技术
量化精度级别
| 精度 | 内存节省 | 速度提升 | 精度损失 | 适用场景 |
|---|---|---|---|---|
| FP16 | 1x | 1x | 0 | 基准 |
| BF16 | 1x | ~1x | <0.5% | 训练 |
| INT8 | 2x | 1.5-2x | <2% | 通用推理 |
| INT4 | 4x | 2-4x | 2-5% | 边缘部署 |
| NF4 | 4x | 2-3x | <3% | 量化友好 |
| INT2 | 8x | 3-5x | >10% | 实验性 |
GPTQ量化
class GPTQQuantizer:
"""
GPTQ: Generative Pre-trained Transformer Quantization
基于Hessian信息的重要性加权量化
"""
def __init__(
self,
bits: int = 4,
group_size: int = 128
):
self.bits = bits
self.group_size = group_size
def quantize_layer(
self,
layer: nn.Linear,
calibration_data: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
量化单个线性层
核心思想:
1. 按组计算Hessian的逆
2. 对每组权重进行量化
3. 累积量化误差并修正
"""
W = layer.weight.data.float()
out_features, in_features = W.shape
# 准备量化参数
num_groups = in_features // self.group_size
scales = torch.zeros(out_features, num_groups)
zeros = torch.zeros(out_features, num_groups)
qweight = torch.zeros_like(W, dtype=torch.int32)
# 计算Hessian近似
H = self._compute_hessian(layer, calibration_data)
# 逐组量化
for i in range(num_groups):
start = i * self.group_size
end = start + self.group_size
W_g = W[:, start:end]
H_g = H[start:end, start:end]
# 计算缩放和零点
scales[:, i] = W_g.abs().max(dim=1).values / (2 ** (self.bits - 1) - 1)
# 量化
W_q = (W_g / scales[:, i].unsqueeze(1)).round()
W_q = W_q.clamp(-2**(self.bits-1), 2**(self.bits-1)-1)
# 存储
qweight[:, start:end] = W_q.to(torch.int32)
return qweight, scales
def _compute_hessian(
self,
layer: nn.Linear,
calibration_data: torch.Tensor
) -> torch.Tensor:
"""计算Hessian近似"""
# 简化的Fisher信息计算
num_samples = min(128, len(calibration_data))
H = torch.zeros(
layer.in_features,
layer.in_features
)
for x in calibration_data[:num_samples]:
x = x.unsqueeze(0)
# 前向传播
h = x @ layer.weight.data
# 简化:使用均匀分布近似
grad = torch.ones_like(h) / h.numel()
# 累积
H += grad.t() @ grad
return H / num_samplesAWQ量化
class AWQQuantizer:
"""
AWQ: Activation-Aware Weight Quantization
基于激活分布的重要性加权
"""
def __init__(self, bits: int = 4):
self.bits = bits
def quantize_layer(
self,
layer: nn.Linear,
calibration_data: torch.Tensor
):
"""
AWQ量化
核心思想:保护重要的权重通道
"""
W = layer.weight.data
# 1. 计算激活感知的缩放
scales = self._compute_activation_scales(
layer, calibration_data
)
# 2. 应用缩放
W_scaled = W * scales.unsqueeze(1)
# 3. 量化缩放后的权重
qweight, w_scales = self._quantize_tensor(W_scaled, self.bits)
# 4. 合并缩放因子
final_scales = scales.unsqueeze(1) * w_scales
return qweight, final_scales
def _compute_activation_scales(
self,
layer: nn.Linear,
calibration_data: torch.Tensor
) -> torch.Tensor:
"""
计算基于激活的缩放因子
"""
# 收集激活分布
activations = []
def hook(module, input, output):
act = input[0].detach()
activations.append(act.abs().mean(dim=0))
handle = layer.register_forward_hook(hook)
with torch.no_grad():
for x in calibration_data[:64]:
layer(x.unsqueeze(0))
handle.remove()
# 计算缩放因子
act_avg = torch.stack(activations).mean(dim=0)
scales = act_avg / act_avg.mean()
return scales2.2 投机解码
基础框架
class SpeculativeDecoder:
"""
投机解码器
使用小模型(draft)生成候选
大模型并行验证
"""
def __init__(
self,
main_model,
draft_model,
max_draft_tokens: int = 6,
temperature: float = 1.0
):
self.main_model = main_model
self.draft_model = draft_model
self.max_draft_tokens = max_draft_tokens
self.temperature = temperature
def speculate(
self,
current_ids: List[int],
num_tokens: int = None
) -> Tuple[List[int], List[float]]:
"""
Draft模型生成候选
Returns:
draft_tokens: 候选token序列
draft_probs: 对应概率
"""
if num_tokens is None:
num_tokens = self.max_draft_tokens
draft_tokens = current_ids.copy()
draft_probs = []
with torch.no_grad():
for _ in range(num_tokens):
# Draft模型前向
outputs = self.draft_model(
torch.tensor([draft_tokens]).cuda()
)
logits = outputs.logits[:, -1, :] / self.temperature
if self.temperature > 0:
probs = F.softmax(logits, dim=-1)
else:
probs = torch.zeros_like(logits)
probs[0, logits.argmax()] = 1.0
# 采样
next_token = torch.multinomial(probs, 1).item()
draft_tokens.append(next_token)
draft_probs.append(probs[0, next_token].item())
if next_token == self.main_model.eos_token_id:
break
return draft_tokens, draft_probs
def verify(
self,
draft_tokens: List[int],
main_probs: List[torch.Tensor]
) -> Tuple[List[int], int]:
"""
验证候选token
Returns:
accepted_tokens: 被接受的token序列
num_accepted: 接受的token数量
"""
accepted = [draft_tokens[0]] # 原始序列的token总是接受
for i in range(1, len(draft_tokens)):
# 获取大模型对这个位置的概率分布
main_prob = main_probs[i-1]
# 获取draft模型在之前位置i-1时的概率
draft_prob_at_i = self._get_draft_prob(draft_tokens, i)
# 接受准则
threshold = draft_prob_at_i / (main_prob[draft_tokens[i]].item() + 1e-10)
if threshold >= 1.0 or torch.rand(1).item() < threshold:
accepted.append(draft_tokens[i])
else:
# 拒绝:使用大模型的采样
break
return accepted, len(accepted) - 1
def _get_draft_prob(
self,
tokens: List[int],
position: int
) -> float:
"""获取draft模型在position位置预测token的概率"""
# 需要重新运行draft模型
with torch.no_grad():
outputs = self.draft_model(
torch.tensor([tokens[:position]]).cuda()
)
probs = F.softmax(outputs.logits[0, -1], dim=-1)
return probs[tokens[position]].item()
def generate(
self,
prompt: str,
max_new_tokens: int = 100
) -> str:
"""完整的投机解码生成"""
input_ids = self.main_model.tokenizer.encode(
prompt, return_tensors='pt'
).cuda()
generated = input_ids[0].tolist()
while len(generated) - len(input_ids[0]) < max_new_tokens:
# 1. 投机
draft_tokens, draft_probs = self.speculate(generated)
# 2. 大模型并行验证
full_tokens = torch.tensor([draft_tokens]).cuda()
main_outputs = self.main_model.model(full_tokens)
main_probs = F.softmax(main_outputs.logits, dim=-1)[0]
# 3. 验证
accepted, num_accepted = self.verify(draft_tokens, main_probs)
# 4. 添加接受的token
for token in accepted[-(num_accepted+1):]:
if token != generated[-1]: # 避免重复
generated.append(token)
if accepted[-1] == self.main_model.tokenizer.eos_token_id:
break
return self.main_model.tokenizer.decode(generated)2.3 前缀缓存
class PrefixCache:
"""
前缀缓存
复用多个请求共享的prompt前缀
"""
def __init__(self):
self.cache = {}
self.access_count = defaultdict(int)
def get_prefix_key(self, prompt_ids: List[int]) -> Optional[str]:
"""
查找缓存的前缀
策略:
1. 完全匹配
2. 最长前缀匹配
"""
# 完全匹配
key = tuple(prompt_ids)
if key in self.cache:
self.access_count[key] += 1
return key
# 最长前缀匹配
best_match = None
best_length = 0
for cached_key in self.cache.keys():
# 找到共同前缀长度
common_len = 0
for a, b in zip(cached_key, prompt_ids):
if a == b:
common_len += 1
else:
break
if common_len > best_length:
best_length = common_len
best_match = cached_key
if best_match:
self.access_count[best_match] += 1
return best_match
return None
def compute_prefix_kv(
self,
model,
prefix_ids: List[int]
) -> torch.Tensor:
"""计算前缀的KV Cache"""
key = tuple(prefix_ids)
if key in self.cache:
return self.cache[key]
with torch.no_grad():
outputs = model(
torch.tensor([prefix_ids]).cuda(),
use_cache=True
)
kv = outputs.past_key_values
self.cache[key] = kv
return kv
def warmup(
self,
model,
common_prefixes: List[str]
):
"""预热缓存"""
for prefix in common_prefixes:
prompt_ids = model.tokenizer.encode(prefix)
self.compute_prefix_kv(model, prompt_ids)3. 系统级优化
3.1 算子融合
class FusedAttention(nn.Module):
"""
融合注意力算子
将多个独立操作融合为单个kernel
- QKV投影 + 缩放 + 注意力 + 输出投影
"""
def __init__(self, hidden_size, num_heads, head_dim):
super().__init__()
self.qkv_proj = nn.Linear(hidden_size, 3 * hidden_size)
self.o_proj = nn.Linear(hidden_size, hidden_size)
def forward(self, x, past_key_value=None):
"""
融合的前向传播
相比分离实现:
- 减少HBM访问
- 提高并行度
- 降低延迟
"""
# 单次融合投影
qkv = self.qkv_proj(x)
q, k, v = qkv.chunk(3, dim=-1)
# 融合的注意力计算(使用FlashAttention)
from flash_attn import flash_attn_func
# 调整维度顺序
q = q.view(*q.shape[:-1], self.num_heads, self.head_dim).transpose(1, 2)
k = k.view(*k.shape[:-1], self.num_heads, self.head_dim).transpose(1, 2)
v = v.view(*v.shape[:-1], self.num_heads, self.head_dim).transpose(1, 2)
# FlashAttention融合计算
out = flash_attn_func(
q, k, v,
past_key_value=past_key_value,
softmax_scale=self.head_dim ** -0.5,
causal=True
)
return self.o_proj(out)3.2 内存优化
class MemoryOptimizer:
"""
内存优化工具
"""
@staticmethod
def optimize_kv_cache(
kv_cache: torch.Tensor,
method: str = 'dynamic'
) -> torch.Tensor:
"""
KV Cache内存优化
"""
if method == 'offload':
# CPU卸载
return kv_cache.cpu()
elif method == 'compress':
# 压缩
return MemoryOptimizer._compress_kv(kv_cache)
elif method == 'dynamic':
# 动态选择
if kv_cache.numel() > 1e8:
return MemoryOptimizer._compress_kv(kv_cache)
return kv_cache
@staticmethod
def _compress_kv(kv: torch.Tensor) -> torch.Tensor:
"""压缩KV Cache"""
# 使用SVD压缩
# 简化的实现
return kv.float().to(torch.bfloat16)4. 协同设计
4.1 Co-design框架
class InferenceCoDesign:
"""
推理系统协同设计
同时优化模型、系统和硬件
"""
def __init__(
self,
workload_profile: dict,
hardware_profile: dict
):
self.workload = workload_profile
self.hardware = hardware_profile
def optimize_design(
self,
target_latency: float,
target_throughput: float
) -> dict:
"""
协同优化设计
决策变量:
- 批处理大小
- 量化精度
- 并行策略
- KV Cache策略
"""
best_config = None
best_score = float('inf')
for batch_size in [1, 4, 8, 16, 32]:
for precision in ['fp16', 'bf16', 'int8', 'int4']:
for pp_degree in [1, 2, 4, 8]:
for kv_cache_method in ['full', 'pyramidkv', 'h2o']:
config = {
'batch_size': batch_size,
'precision': precision,
'pp_degree': pp_degree,
'kv_cache_method': kv_cache_method
}
# 评估配置
latency, throughput = self._evaluate_config(config)
# 检查约束
if (latency <= target_latency and
throughput >= target_throughput):
# 优化目标:最小化资源使用
score = self._compute_score(config)
if score < best_score:
best_score = score
best_config = config
return best_config
def _evaluate_config(self, config: dict) -> Tuple[float, float]:
"""评估配置的性能"""
# 简化的评估模型
batch = config['batch_size']
prec = config['precision']
# 延迟估算
base_latency = 100 # ms per token
precision_factor = {'fp16': 1.0, 'bf16': 1.0, 'int8': 0.8, 'int4': 0.5}
latency = base_latency / precision_factor[prec]
# 吞吐量估算
throughput = batch * 1000 / latency
return latency, throughput
def _compute_score(self, config: dict) -> float:
"""计算综合评分"""
return (
0.4 * config['batch_size'] +
0.3 * (1.0 / config['pp_degree']) +
0.3 * self._precision_score(config['precision'])
)
def _precision_score(self, precision: str) -> float:
scores = {'fp16': 1.0, 'bf16': 1.0, 'int8': 0.8, 'int4': 0.5}
return scores.get(precision, 0.5)5. 工具与框架
5.1 主流框架对比
| 框架 | 开发方 | 核心优化 | 优势 | 劣势 |
|---|---|---|---|---|
| vLLM | UC Berkeley | PagedAttention | 易用、性能好 | 功能有限 |
| TensorRT-LLM | NVIDIA | TensorRT优化 | 最快 | 难以定制 |
| llama.cpp | Georgi Gerganov | 量化、CPU | 轻量 | 精度有限 |
| DeepSpeed | Microsoft | ZeRO | 大模型支持 | 复杂 |
| SGLang | LMSYS | RadixAttention | 高吞吐 | 新兴 |
| TGI | HuggingFace | Rust实现 | 稳定 | 中等性能 |
5.2 vLLM示例
from vllm import LLM, SamplingParams
# 初始化vLLM
llm = LLM(
model="meta-llama/Llama-2-70b-hf",
tensor_parallel_size=4, # 4 GPU并行
max_model_len=8192,
gpu_memory_utilization=0.9,
quantization="awq" # 量化
)
# 采样参数
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=256
)
# 批量推理
outputs = llm.generate(
prompts=[
"Explain quantum computing in simple terms:",
"Write a Python function to sort a list:",
"What are the benefits of exercise?"
],
sampling_params=sampling_params
)
# 输出结果
for output in outputs:
print(f"Prompt: {output.prompt}")
print(f"Generated: {output.outputs[0].text}")
print("---")6. 最佳实践
6.1 延迟优化
# 延迟优化最佳实践
optimizations = {
# 1. 启用FlashAttention
"flash_attention": True,
# 2. 使用合适的精度
"precision": "bf16", # 训练用bf16,推理可用int8
# 3. 启用KV Cache量化
"kv_cache_quant": "int8",
# 4. 合理的batch size
"batch_size": 1, # 延迟敏感场景用小batch
# 5. 前缀缓存
"prefix_caching": True,
# 6.投机解码
"speculative_decoding": True,
"draft_model_size": "1b"
}6.2 吞吐量优化
# 吞吐量优化最佳实践
optimizations = {
# 1. 增大batch size
"batch_size": 32,
# 2. 连续批处理
"continuous_batching": True,
# 3. Chunked Prefill
"prefill_chunk_size": 512,
# 4. 模型并行
"tensor_parallel_size": 8,
# 5. 流水线并行
"pipeline_parallel_size": 4,
# 6. 权重量化
"weight_quant": "int4_awq"
}7. 总结
LLM推理优化的关键要点:
- 量化是基础:INT8/INT4量化显著减少内存和延迟
- 投机解码有效:3-4倍吞吐提升
- 系统优化很重要:连续批处理、PagedAttention等
- 协同设计最优:模型-系统-硬件联合优化
- 工具成熟:vLLM、TGI等框架已相当完善