LLM推理优化技术综述

1. 概述

LLM推理优化是一个多层次、多维度的问题,涉及算法、系统、硬件等多个方面。

优化全景图

┌─────────────────────────────────────────────────────────────┐
│                    LLM推理优化体系                            │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                   算法级优化                          │    │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │    │
│  │  │注意力   │ │解码策略  │ │量化     │ │剪枝     │   │    │
│  │  │优化     │ │优化     │ │         │ │         │   │    │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                   系统级优化                          │    │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │    │
│  │  │批处理   │ │内存管理  │ │并行策略  │ │算子融合  │   │    │
│  │  │         │ │         │ │         │ │         │   │    │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                   硬件协同设计                        │    │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │    │
│  │  │专用芯片  │ │内存层次  │ │计算精度  │ │互连带宽  │   │    │
│  │  │         │ │优化     │ │定制     │ │利用     │   │    │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

2. 算法级优化

2.1 量化技术

量化精度级别

精度内存节省速度提升精度损失适用场景
FP161x1x0基准
BF161x~1x<0.5%训练
INT82x1.5-2x<2%通用推理
INT44x2-4x2-5%边缘部署
NF44x2-3x<3%量化友好
INT28x3-5x>10%实验性

GPTQ量化

class GPTQQuantizer:
    """
    GPTQ: Generative Pre-trained Transformer Quantization
    
    基于Hessian信息的重要性加权量化
    """
    
    def __init__(
        self,
        bits: int = 4,
        group_size: int = 128
    ):
        self.bits = bits
        self.group_size = group_size
        
    def quantize_layer(
        self,
        layer: nn.Linear,
        calibration_data: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        量化单个线性层
        
        核心思想:
        1. 按组计算Hessian的逆
        2. 对每组权重进行量化
        3. 累积量化误差并修正
        """
        W = layer.weight.data.float()
        out_features, in_features = W.shape
        
        # 准备量化参数
        num_groups = in_features // self.group_size
        scales = torch.zeros(out_features, num_groups)
        zeros = torch.zeros(out_features, num_groups)
        qweight = torch.zeros_like(W, dtype=torch.int32)
        
        # 计算Hessian近似
        H = self._compute_hessian(layer, calibration_data)
        
        # 逐组量化
        for i in range(num_groups):
            start = i * self.group_size
            end = start + self.group_size
            
            W_g = W[:, start:end]
            H_g = H[start:end, start:end]
            
            # 计算缩放和零点
            scales[:, i] = W_g.abs().max(dim=1).values / (2 ** (self.bits - 1) - 1)
            
            # 量化
            W_q = (W_g / scales[:, i].unsqueeze(1)).round()
            W_q = W_q.clamp(-2**(self.bits-1), 2**(self.bits-1)-1)
            
            # 存储
            qweight[:, start:end] = W_q.to(torch.int32)
        
        return qweight, scales
    
    def _compute_hessian(
        self,
        layer: nn.Linear,
        calibration_data: torch.Tensor
    ) -> torch.Tensor:
        """计算Hessian近似"""
        # 简化的Fisher信息计算
        num_samples = min(128, len(calibration_data))
        
        H = torch.zeros(
            layer.in_features,
            layer.in_features
        )
        
        for x in calibration_data[:num_samples]:
            x = x.unsqueeze(0)
            
            # 前向传播
            h = x @ layer.weight.data
            # 简化:使用均匀分布近似
            grad = torch.ones_like(h) / h.numel()
            
            # 累积
            H += grad.t() @ grad
        
        return H / num_samples

AWQ量化

class AWQQuantizer:
    """
    AWQ: Activation-Aware Weight Quantization
    
    基于激活分布的重要性加权
    """
    
    def __init__(self, bits: int = 4):
        self.bits = bits
        
    def quantize_layer(
        self,
        layer: nn.Linear,
        calibration_data: torch.Tensor
    ):
        """
        AWQ量化
        
        核心思想:保护重要的权重通道
        """
        W = layer.weight.data
        
        # 1. 计算激活感知的缩放
        scales = self._compute_activation_scales(
            layer, calibration_data
        )
        
        # 2. 应用缩放
        W_scaled = W * scales.unsqueeze(1)
        
        # 3. 量化缩放后的权重
        qweight, w_scales = self._quantize_tensor(W_scaled, self.bits)
        
        # 4. 合并缩放因子
        final_scales = scales.unsqueeze(1) * w_scales
        
        return qweight, final_scales
    
    def _compute_activation_scales(
        self,
        layer: nn.Linear,
        calibration_data: torch.Tensor
    ) -> torch.Tensor:
        """
        计算基于激活的缩放因子
        """
        # 收集激活分布
        activations = []
        
        def hook(module, input, output):
            act = input[0].detach()
            activations.append(act.abs().mean(dim=0))
        
        handle = layer.register_forward_hook(hook)
        
        with torch.no_grad():
            for x in calibration_data[:64]:
                layer(x.unsqueeze(0))
        
        handle.remove()
        
        # 计算缩放因子
        act_avg = torch.stack(activations).mean(dim=0)
        scales = act_avg / act_avg.mean()
        
        return scales

2.2 投机解码

基础框架

class SpeculativeDecoder:
    """
    投机解码器
    
    使用小模型(draft)生成候选
    大模型并行验证
    """
    
    def __init__(
        self,
        main_model,
        draft_model,
        max_draft_tokens: int = 6,
        temperature: float = 1.0
    ):
        self.main_model = main_model
        self.draft_model = draft_model
        self.max_draft_tokens = max_draft_tokens
        self.temperature = temperature
    
    def speculate(
        self,
        current_ids: List[int],
        num_tokens: int = None
    ) -> Tuple[List[int], List[float]]:
        """
        Draft模型生成候选
        
        Returns:
            draft_tokens: 候选token序列
            draft_probs: 对应概率
        """
        if num_tokens is None:
            num_tokens = self.max_draft_tokens
        
        draft_tokens = current_ids.copy()
        draft_probs = []
        
        with torch.no_grad():
            for _ in range(num_tokens):
                # Draft模型前向
                outputs = self.draft_model(
                    torch.tensor([draft_tokens]).cuda()
                )
                
                logits = outputs.logits[:, -1, :] / self.temperature
                
                if self.temperature > 0:
                    probs = F.softmax(logits, dim=-1)
                else:
                    probs = torch.zeros_like(logits)
                    probs[0, logits.argmax()] = 1.0
                
                # 采样
                next_token = torch.multinomial(probs, 1).item()
                draft_tokens.append(next_token)
                draft_probs.append(probs[0, next_token].item())
                
                if next_token == self.main_model.eos_token_id:
                    break
        
        return draft_tokens, draft_probs
    
    def verify(
        self,
        draft_tokens: List[int],
        main_probs: List[torch.Tensor]
    ) -> Tuple[List[int], int]:
        """
        验证候选token
        
        Returns:
            accepted_tokens: 被接受的token序列
            num_accepted: 接受的token数量
        """
        accepted = [draft_tokens[0]]  # 原始序列的token总是接受
        
        for i in range(1, len(draft_tokens)):
            # 获取大模型对这个位置的概率分布
            main_prob = main_probs[i-1]
            
            # 获取draft模型在之前位置i-1时的概率
            draft_prob_at_i = self._get_draft_prob(draft_tokens, i)
            
            # 接受准则
            threshold = draft_prob_at_i / (main_prob[draft_tokens[i]].item() + 1e-10)
            
            if threshold >= 1.0 or torch.rand(1).item() < threshold:
                accepted.append(draft_tokens[i])
            else:
                # 拒绝:使用大模型的采样
                break
        
        return accepted, len(accepted) - 1
    
    def _get_draft_prob(
        self,
        tokens: List[int],
        position: int
    ) -> float:
        """获取draft模型在position位置预测token的概率"""
        # 需要重新运行draft模型
        with torch.no_grad():
            outputs = self.draft_model(
                torch.tensor([tokens[:position]]).cuda()
            )
            probs = F.softmax(outputs.logits[0, -1], dim=-1)
            return probs[tokens[position]].item()
    
    def generate(
        self,
        prompt: str,
        max_new_tokens: int = 100
    ) -> str:
        """完整的投机解码生成"""
        input_ids = self.main_model.tokenizer.encode(
            prompt, return_tensors='pt'
        ).cuda()
        
        generated = input_ids[0].tolist()
        
        while len(generated) - len(input_ids[0]) < max_new_tokens:
            # 1. 投机
            draft_tokens, draft_probs = self.speculate(generated)
            
            # 2. 大模型并行验证
            full_tokens = torch.tensor([draft_tokens]).cuda()
            main_outputs = self.main_model.model(full_tokens)
            main_probs = F.softmax(main_outputs.logits, dim=-1)[0]
            
            # 3. 验证
            accepted, num_accepted = self.verify(draft_tokens, main_probs)
            
            # 4. 添加接受的token
            for token in accepted[-(num_accepted+1):]:
                if token != generated[-1]:  # 避免重复
                    generated.append(token)
            
            if accepted[-1] == self.main_model.tokenizer.eos_token_id:
                break
        
        return self.main_model.tokenizer.decode(generated)

2.3 前缀缓存

class PrefixCache:
    """
    前缀缓存
    
    复用多个请求共享的prompt前缀
    """
    
    def __init__(self):
        self.cache = {}
        self.access_count = defaultdict(int)
        
    def get_prefix_key(self, prompt_ids: List[int]) -> Optional[str]:
        """
        查找缓存的前缀
        
        策略:
        1. 完全匹配
        2. 最长前缀匹配
        """
        # 完全匹配
        key = tuple(prompt_ids)
        if key in self.cache:
            self.access_count[key] += 1
            return key
        
        # 最长前缀匹配
        best_match = None
        best_length = 0
        
        for cached_key in self.cache.keys():
            # 找到共同前缀长度
            common_len = 0
            for a, b in zip(cached_key, prompt_ids):
                if a == b:
                    common_len += 1
                else:
                    break
            
            if common_len > best_length:
                best_length = common_len
                best_match = cached_key
        
        if best_match:
            self.access_count[best_match] += 1
            return best_match
        
        return None
    
    def compute_prefix_kv(
        self,
        model,
        prefix_ids: List[int]
    ) -> torch.Tensor:
        """计算前缀的KV Cache"""
        key = tuple(prefix_ids)
        
        if key in self.cache:
            return self.cache[key]
        
        with torch.no_grad():
            outputs = model(
                torch.tensor([prefix_ids]).cuda(),
                use_cache=True
            )
            kv = outputs.past_key_values
        
        self.cache[key] = kv
        return kv
    
    def warmup(
        self,
        model,
        common_prefixes: List[str]
    ):
        """预热缓存"""
        for prefix in common_prefixes:
            prompt_ids = model.tokenizer.encode(prefix)
            self.compute_prefix_kv(model, prompt_ids)

3. 系统级优化

3.1 算子融合

class FusedAttention(nn.Module):
    """
    融合注意力算子
    
    将多个独立操作融合为单个kernel
    - QKV投影 + 缩放 + 注意力 + 输出投影
    """
    
    def __init__(self, hidden_size, num_heads, head_dim):
        super().__init__()
        self.qkv_proj = nn.Linear(hidden_size, 3 * hidden_size)
        self.o_proj = nn.Linear(hidden_size, hidden_size)
        
    def forward(self, x, past_key_value=None):
        """
        融合的前向传播
        
        相比分离实现:
        - 减少HBM访问
        - 提高并行度
        - 降低延迟
        """
        # 单次融合投影
        qkv = self.qkv_proj(x)
        q, k, v = qkv.chunk(3, dim=-1)
        
        # 融合的注意力计算(使用FlashAttention)
        from flash_attn import flash_attn_func
        
        # 调整维度顺序
        q = q.view(*q.shape[:-1], self.num_heads, self.head_dim).transpose(1, 2)
        k = k.view(*k.shape[:-1], self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(*v.shape[:-1], self.num_heads, self.head_dim).transpose(1, 2)
        
        # FlashAttention融合计算
        out = flash_attn_func(
            q, k, v,
            past_key_value=past_key_value,
            softmax_scale=self.head_dim ** -0.5,
            causal=True
        )
        
        return self.o_proj(out)

3.2 内存优化

class MemoryOptimizer:
    """
    内存优化工具
    """
    
    @staticmethod
    def optimize_kv_cache(
        kv_cache: torch.Tensor,
        method: str = 'dynamic'
    ) -> torch.Tensor:
        """
        KV Cache内存优化
        """
        if method == 'offload':
            # CPU卸载
            return kv_cache.cpu()
        
        elif method == 'compress':
            # 压缩
            return MemoryOptimizer._compress_kv(kv_cache)
        
        elif method == 'dynamic':
            # 动态选择
            if kv_cache.numel() > 1e8:
                return MemoryOptimizer._compress_kv(kv_cache)
            return kv_cache
    
    @staticmethod
    def _compress_kv(kv: torch.Tensor) -> torch.Tensor:
        """压缩KV Cache"""
        # 使用SVD压缩
        # 简化的实现
        return kv.float().to(torch.bfloat16)

4. 协同设计

4.1 Co-design框架

class InferenceCoDesign:
    """
    推理系统协同设计
    
    同时优化模型、系统和硬件
    """
    
    def __init__(
        self,
        workload_profile: dict,
        hardware_profile: dict
    ):
        self.workload = workload_profile
        self.hardware = hardware_profile
        
    def optimize_design(
        self,
        target_latency: float,
        target_throughput: float
    ) -> dict:
        """
        协同优化设计
        
        决策变量:
        - 批处理大小
        - 量化精度
        - 并行策略
        - KV Cache策略
        """
        best_config = None
        best_score = float('inf')
        
        for batch_size in [1, 4, 8, 16, 32]:
            for precision in ['fp16', 'bf16', 'int8', 'int4']:
                for pp_degree in [1, 2, 4, 8]:
                    for kv_cache_method in ['full', 'pyramidkv', 'h2o']:
                        config = {
                            'batch_size': batch_size,
                            'precision': precision,
                            'pp_degree': pp_degree,
                            'kv_cache_method': kv_cache_method
                        }
                        
                        # 评估配置
                        latency, throughput = self._evaluate_config(config)
                        
                        # 检查约束
                        if (latency <= target_latency and 
                            throughput >= target_throughput):
                            
                            # 优化目标:最小化资源使用
                            score = self._compute_score(config)
                            if score < best_score:
                                best_score = score
                                best_config = config
        
        return best_config
    
    def _evaluate_config(self, config: dict) -> Tuple[float, float]:
        """评估配置的性能"""
        # 简化的评估模型
        batch = config['batch_size']
        prec = config['precision']
        
        # 延迟估算
        base_latency = 100  # ms per token
        precision_factor = {'fp16': 1.0, 'bf16': 1.0, 'int8': 0.8, 'int4': 0.5}
        latency = base_latency / precision_factor[prec]
        
        # 吞吐量估算
        throughput = batch * 1000 / latency
        
        return latency, throughput
    
    def _compute_score(self, config: dict) -> float:
        """计算综合评分"""
        return (
            0.4 * config['batch_size'] +
            0.3 * (1.0 / config['pp_degree']) +
            0.3 * self._precision_score(config['precision'])
        )
    
    def _precision_score(self, precision: str) -> float:
        scores = {'fp16': 1.0, 'bf16': 1.0, 'int8': 0.8, 'int4': 0.5}
        return scores.get(precision, 0.5)

5. 工具与框架

5.1 主流框架对比

框架开发方核心优化优势劣势
vLLMUC BerkeleyPagedAttention易用、性能好功能有限
TensorRT-LLMNVIDIATensorRT优化最快难以定制
llama.cppGeorgi Gerganov量化、CPU轻量精度有限
DeepSpeedMicrosoftZeRO大模型支持复杂
SGLangLMSYSRadixAttention高吞吐新兴
TGIHuggingFaceRust实现稳定中等性能

5.2 vLLM示例

from vllm import LLM, SamplingParams
 
# 初始化vLLM
llm = LLM(
    model="meta-llama/Llama-2-70b-hf",
    tensor_parallel_size=4,  # 4 GPU并行
    max_model_len=8192,
    gpu_memory_utilization=0.9,
    quantization="awq"  # 量化
)
 
# 采样参数
sampling_params = SamplingParams(
    temperature=0.8,
    top_p=0.95,
    max_tokens=256
)
 
# 批量推理
outputs = llm.generate(
    prompts=[
        "Explain quantum computing in simple terms:",
        "Write a Python function to sort a list:",
        "What are the benefits of exercise?"
    ],
    sampling_params=sampling_params
)
 
# 输出结果
for output in outputs:
    print(f"Prompt: {output.prompt}")
    print(f"Generated: {output.outputs[0].text}")
    print("---")

6. 最佳实践

6.1 延迟优化

# 延迟优化最佳实践
 
optimizations = {
    # 1. 启用FlashAttention
    "flash_attention": True,
    
    # 2. 使用合适的精度
    "precision": "bf16",  # 训练用bf16,推理可用int8
    
    # 3. 启用KV Cache量化
    "kv_cache_quant": "int8",
    
    # 4. 合理的batch size
    "batch_size": 1,  # 延迟敏感场景用小batch
    
    # 5. 前缀缓存
    "prefix_caching": True,
    
    # 6.投机解码
    "speculative_decoding": True,
    "draft_model_size": "1b"
}

6.2 吞吐量优化

# 吞吐量优化最佳实践
 
optimizations = {
    # 1. 增大batch size
    "batch_size": 32,
    
    # 2. 连续批处理
    "continuous_batching": True,
    
    # 3. Chunked Prefill
    "prefill_chunk_size": 512,
    
    # 4. 模型并行
    "tensor_parallel_size": 8,
    
    # 5. 流水线并行
    "pipeline_parallel_size": 4,
    
    # 6. 权重量化
    "weight_quant": "int4_awq"
}

7. 总结

LLM推理优化的关键要点:

  1. 量化是基础:INT8/INT4量化显著减少内存和延迟
  2. 投机解码有效:3-4倍吞吐提升
  3. 系统优化很重要:连续批处理、PagedAttention等
  4. 协同设计最优:模型-系统-硬件联合优化
  5. 工具成熟:vLLM、TGI等框架已相当完善

参考文献