LLM训练流程详解

大语言模型(LLM)的训练是一个复杂的多阶段过程,从海量的原始文本预训练开始,经过有监督微调和强化学习对齐,最终形成具备强大能力的智能系统。本文详细介绍每个训练阶段的核心技术、算法原理和实践要点。


1. 预训练阶段

预训练是LLM学习语言能力和世界知识的核心阶段,通过在海量文本上执行语言建模任务,使模型习得丰富的语言表示。

1.1 数据处理流程

1.1.1 去重策略

大规模语料中存在大量重复内容,去重是保证数据质量的关键步骤。

文档级去重使用MinHash或SimHash计算文档相似度:

import numpy as np
from datasketch import MinHash
 
def minhash_deduplication(documents, threshold=0.8, num_perm=128):
    """
    MinHash去重
    
    参数:
        documents: 文档列表
        threshold: 相似度阈值,超过此值认为重复
        num_perm: 哈希函数数量
    """
    minhashes = []
    for doc in documents:
        # 分词
        tokens = set(doc.split())
        # 创建MinHash
        mh = MinHash(num_perm=num_perm)
        for token in tokens:
            mh.update(token.encode('utf8'))
        minhashes.append(mh)
    
    # 聚类去重
    duplicates = []
    for i, mh1 in enumerate(minhashes):
        for j, mh2 in enumerate(minhashes[i+1:], start=i+1):
            if mh1.jaccard(mh2) > threshold:
                duplicates.append((i, j))
    
    return duplicates

句子级去重针对段落内的重复句子:

1.1.2 质量过滤

质量过滤通常采用多维度策略:

过滤维度方法阈值建议
语言识别fastText/LangID目标语言占比>95%
困惑度基于小型语言模型PPL<50
敏感内容关键词匹配/分类器依应用场景
文本长度token数统计50-100k tokens
特殊字符正则表达式占比<10%
def quality_filter(text, config):
    """
    多维度质量过滤
    """
    # 语言检测
    lang = detect_language(text)
    if lang != config.target_lang:
        return False
    
    # 长度过滤
    token_count = len(tokenizer.encode(text))
    if token_count < config.min_length or token_count > config.max_length:
        return False
    
    # 困惑度过滤
    ppl = compute_perplexity(text, config.quality_model)
    if ppl > config.ppl_threshold:
        return False
    
    # 特殊字符比例
    special_ratio = count_special_chars(text) / len(text)
    if special_ratio > 0.1:
        return False
    
    # 敏感内容检测
    if contains_problematic_content(text, config.classifier):
        return False
    
    return True

1.2 数据配比策略

高质量的预训练需要合理的数据配比,涵盖不同领域和来源。

1.2.1 领域分布设计

典型LLM的数据配比参考:

数据来源占比代表模型
网页抓取60-70%Common Crawl
书籍/文献10-15%BooksCorpus, arXiv
代码5-15%GitHub
对话/社交5-10%Reddit, StackExchange
百科2-5%Wikipedia

1.2.2 课程学习策略

在训练过程中动态调整数据分布:

其中 可以是:

  • 难度递增:先简单后复杂
  • 主题聚焦:先广泛后专精
  • 课程调度:按特定领域优先级排序

1.3 Tokenization与数据格式化

1.3.1 Tokenizer选择

现代LLM主要使用以下tokenizer:

Tokenizer特点词表大小
BPE字节级编码,适合多语言32k-100k
WordPieceGoogle系模型采用30k-100k
SentencePiece无空格语言支持可变
from transformers import AutoTokenizer
 
# 加载预训练tokenizer
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b")
 
def prepare_training_data(texts, block_size=4096):
    """
    准备预训练数据
    
    参数:
        texts: 原始文本列表
        block_size: 上下文窗口大小(tokens)
    """
    # Tokenize
    tokenized = tokenizer(
        texts,
        truncation=True,
        max_length=block_size,
        return_overflowing_tokens=True,
        padding="max_length"
    )
    
    # 构建输入-标签对(因果语言建模)
    input_ids = tokenized["input_ids"]
    labels = input_ids.copy()  # 标签与输入相同
    
    return {
        "input_ids": input_ids,
        "labels": labels,
        "attention_mask": tokenized["attention_mask"]
    }

1.3.2 数据格式化模板

标准预训练格式(Causal LM):

[CLS] 文档1 [SEP] 文档2 [SEP] ... [SEP]
|_____|_____|_____|_____|_____|_____|
  token  token token token token ...
  
标签: 同input_ids,masked LM计算loss

交错格式(用于指令微调预训练):

[INST] 用户指令1 [/INST] 模型回复1 [INST] 用户指令2 [/INST] 模型回复2

1.4 分布式训练策略

训练数十亿参数的语言模型需要多GPU甚至多节点协作。

1.4.1 数据并行(Data Parallelism)

Naive Data Parallelism (DDP)

import torch
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
 
def setup_ddp():
    """初始化分布式训练环境"""
    dist.init_process_group(backend="nccl")
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
    return local_rank
 
def train_ddp(model, dataloader, optimizer, device):
    """数据并行训练"""
    local_rank = setup_ddp()
    model = model.to(local_rank)
    model = DDP(model, device_ids=[local_rank])
    
    model.train()
    for batch in dataloader:
        batch = {k: v.to(local_rank) for k, v in batch.items()}
        
        optimizer.zero_grad()
        outputs = model(**batch)
        loss = outputs.loss / gradient_accumulation_steps
        loss.backward()
        
        # 梯度同步(在DDP中自动处理)
        if (step + 1) % gradient_accumulation_steps == 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

ZeRO优化器(内存优化版数据并行):

ZeRO通过分片优化器状态、梯度和参数来减少内存占用:

Stage优化内容内存节省
ZeRO-1分片优化器状态~4x
ZeRO-2分片优化器状态+梯度~8x
ZeRO-3分片所有状态线性扩展
# DeepSpeed ZeRO配置示例
ds_config = {
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": True
        },
        "offload_param": {
            "device": "cpu",
            "pin_memory": True
        },
        "overlap_comm": True,
        "contiguous_gradients": True
    },
    "bf16": {"enabled": True},
    "gradient_clipping": 1.0,
    "gradient_accumulation_steps": 4
}

1.4.2 模型并行(Model Parallelism)

张量并行(Tensor Parallelism, TP)

将单个层的参数矩阵按列或行切分到不同GPU:

class ColumnParallelLinear(nn.Module):
    """列并行线性层"""
    def __init__(self, input_size, output_size, world_size):
        super().__init__()
        self.world_size = world_size
        self.output_size_per_rank = output_size // world_size
        
        # 分片权重矩阵
        self.weight = nn.Parameter(
            torch.randn(self.output_size_per_rank, input_size)
        )
        self.bias = nn.Parameter(
            torch.zeros(self.output_size_per_rank)
        )
    
    def forward(self, x):
        # x: (batch, seq, input_size)
        # All-gather合并输出(如果需要)
        output = F.linear(x, self.weight, self.bias)
        return output
    
    def parallel_forward(self, x):
        """张量并行前向"""
        # 本地计算
        local_output = F.linear(x, self.weight, self.bias)
        # All-reduce 聚合结果
        world_output = [torch.zeros_like(local_output) for _ in range(self.world_size)]
        dist.all_gather(world_output, local_output)
        return torch.cat(world_output, dim=-1)

序列并行(Sequence Parallelism)

沿序列维度切分注意力计算:

class SequenceParallelAttention(nn.Module):
    """序列并行注意力"""
    def __init__(self, config):
        super().__init__()
        self.world_size = dist.get_world_size()
        self.rank = dist.get_rank()
    
    def forward(self, x):
        """
        x: (batch, seq_len // world_size, hidden)
        """
        # 本地计算QKV
        q, k, v = self.compute_qkv(x)
        
        # 收集完整的K, V用于跨序列分片计算注意力
        # All-cattenuate K, V
        k_full = self.all_gather_kv(k)
        v_full = self.all_gather_kv(v)
        
        # 计算注意力(本地Q × 完整K/V)
        attn = self.scaled_dot_product(q, k_full, v_full)
        
        # 聚合输出
        output = self.all_reduce_attn(attn)
        
        return output
    
    def all_gather_kv(self, tensor):
        tensors_gather = [torch.empty_like(tensor) for _ in range(self.world_size)]
        dist.all_gather(tensor_gather, tensor)
        return torch.cat(tensor_gather, dim=1)

1.4.3 流水线并行(Pipeline Parallelism)

流水线并行将模型按层分割到不同设备:

class PipelineStage(nn.Module):
    """流水线并行的一个阶段"""
    def __init__(self, layers, start_layer_idx, end_layer_idx):
        super().__init__()
        self.layers = nn.ModuleList(layers[start_layer_idx:end_layer_idx])
        self.start_idx = start_layer_idx
        self.end_idx = end_layer_idx
    
    def forward(self, x, input_tensor=None):
        """
        前向传播
        如果input_tensor不为空,则这是第一个stage,需要接收原始输入
        """
        for layer in self.layers:
            x = layer(x)
        return x
 
def pipeline_schedule(stages, microbatches, num_stages):
    """
    流水线调度(1F1B - One Forward One Backward)
    
    理想情况下GPU利用率:
    - 无流水线: ~1/(P+1) (P=stage数)
    - 流水线: 接近1
    """
    num_microbatches = len(microbatches)
    
    for i in range(num_microbatches):
        # Forward
        x = microbatches[i]
        for stage in stages:
            x = stage(x)
        
        # Backward
        # ... (反向传播调度)

GPipe vs PipeDream调度对比

特性GPipePipeDream
调度方式微批次堆积1F1B
内存需求高(需保存所有微批次激活)低(流水线缓冲)
GPU利用率有气泡更高
实现复杂度简单复杂

1.5 混合精度与Gradient Checkpointing

1.5.1 混合精度训练

混合精度利用BF16/FP16加速训练,同时保持FP32精度的主权重:

from torch.cuda.amp import autocast, GradScaler
 
class MixedPrecisionTrainer:
    """混合精度训练器"""
    def __init__(self, model, optimizer, config):
        self.model = model
        self.optimizer = optimizer
        self.scaler = GradScaler()
        self.config = config
    
    def training_step(self, batch):
        # 前向传播使用BF16
        with autocast(dtype=torch.bfloat16):
            outputs = self.model(**batch)
            loss = outputs.loss / self.config.gradient_accumulation_steps
        
        # 反向传播
        self.scaler.scale(loss).backward()
        
        # 梯度裁剪
        if self.is_gradient_accumulation_step():
            self.scaler.unscale_(self.optimizer)
            torch.nn.utils.clip_grad_norm_(
                self.model.parameters(),
                self.config.max_grad_norm
            )
            self.scaler.step(self.optimizer)
            self.scaler.update()
            self.optimizer.zero_grad()
    
    def is_gradient_accumulation_step(self):
        return (self.step + 1) % self.config.gradient_accumulation_steps == 0

数值格式对比

格式符号位指数位尾数位动态范围
FP321823~1e38
BF16187~1e39
FP161510~65504

BF16相比FP16的优势在于保持与FP32相同的指数范围,避免梯度溢出。

1.5.2 Gradient Checkpointing

Gradient Checkpointing通过在前向传播时不保存中间激活,仅保存部分检查点,在反向传播时重新计算:

class GradientCheckpointingWrapper(nn.Module):
    """Gradient Checkpointing封装"""
    def __init__(self, model, checkpoint_ratio=0.5):
        super().__init__()
        self.model = model
        self.checkpoint_ratio = checkpoint_ratio
    
    def forward(self, x):
        """
        选择性梯度检查点
        内存节省: ~50-70%
        计算开销: ~20-30%
        """
        # 计算需要检查点的层
        num_layers = len(self.model.layers)
        checkpoint_every = max(1, int(num_layers * self.checkpoint_ratio))
        
        # 分块执行
        for i in range(0, num_layers, checkpoint_every):
            end_idx = min(i + checkpoint_every, num_layers)
            
            if self.training and i > 0:
                # 使用torch.utils.checkpoint
                x = torch.utils.checkpoint.checkpoint(
                    self.model.layers[i:end_idx],
                    x,
                    use_reentrant=False
                )
            else:
                # 直接前向
                for layer in self.model.layers[i:end_idx]:
                    x = layer(x)
        
        return x

内存与计算权衡

设模型有 层,批大小为 ,序列长度为 ,隐藏维度为

  • 标准前向 激活内存
  • Gradient Checkpointing 激活内存,但需要额外的次前向计算

2. 有监督微调(SFT)

有监督微调(Supervised Fine-Tuning, SFT)使用标注数据让预训练模型学习遵循指令的能力。

2.1 指令数据集构建

2.1.1 人工标注数据

高质量的人工标注数据是SFT的基础:

数据类型构建方式质量成本
专家撰写专业人员编写最高极高
众包标注多个标注者协作
自动生成+筛选LLM生成+规则过滤中-高
class InstructionDataset:
    """指令数据集"""
    def __init__(self, data_path):
        self.data = self.load_data(data_path)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        return {
            "instruction": item["instruction"],
            "input": item.get("input", ""),
            "output": item["output"],
            "category": item.get("category", "general")
        }
    
    def format_prompt(self, item):
        """
        格式化对话模板
        
        Llama-2-chat格式:
        """
        return f"""[INST] <<SYS>>
You are a helpful assistant.
<</SYS>>
 
{item['instruction']} {item.get('input', '')} [/INST]
 
{item['output']}"""

2.1.2 Self-Instruct方法

Self-Instruct1利用模型自身生成指令数据:

class SelfInstruct:
    """
    Self-Instruct: Generating Instruction Data from Language Models
    
    核心思想:让模型自己生成指令数据
    """
    def __init__(self, teacher_model):
        self.model = teacher_model
        self.instruction_template = """Generate a diverse collection of tasks 
        for instruction tuning. Include various types like:
        - Question answering
        - Text summarization
        - Code generation
        - Creative writing
        - Reasoning tasks
        
        Generate 10 new instructions:"""
    
    def generate_instructions(self, num_instructions=10):
        """生成新指令"""
        prompt = f"{self.instruction_template}\n\n"
        response = self.model.generate(prompt, max_length=500)
        instructions = self.parse_instructions(response)
        return instructions
    
    def generate_response(self, instruction, input_text=""):
        """为指令生成回复"""
        prompt = f"Instruction: {instruction}\n"
        if input_text:
            prompt += f"Input: {input_text}\n"
        prompt += "Response:"
        return self.model.generate(prompt, max_length=1000)
    
    def filter_quality(self, instructions, responses):
        """
        质量过滤
        
        过滤标准:
        1. 指令不为空
        2. 回复与指令相关
        3. 回复长度适中
        4. 无明显错误
        """
        filtered = []
        for inst, resp in zip(instructions, responses):
            if not inst or not resp:
                continue
            if len(resp) < 50 or len(resp) > 2000:
                continue
            if not self.is_relevant(inst, resp):
                continue
            filtered.append((inst, resp))
        return filtered

2.2 训练策略

2.2.1 学习率调度

SFT常用的学习率调度:

def get_sft_scheduler(optimizer, num_training_steps, warmup_ratio=0.1):
    """
    SFT学习率调度
    
    采用Warmup + Cosine Decay
    """
    warmup_steps = int(num_training_steps * warmup_ratio)
    
    def lr_lambda(current_step):
        if current_step < warmup_steps:
            # Linear warmup
            return float(current_step) / float(max(1, warmup_steps))
        else:
            # Cosine decay
            progress = float(current_step - warmup_steps) / float(
                max(1, num_training_steps - warmup_steps)
            )
            return max(0.1, 0.5 * (1.0 + np.cos(np.pi * progress)))
    
    return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
 
# SFT推荐配置
sft_config = {
    "learning_rate": 2e-5,          # 预训练的1/10~1/100
    "warmup_ratio": 0.03,            # 较小warmup
    "num_epochs": 3-5,
    "batch_size": 4-16,              # per device
    "weight_decay": 0.01,
    "max_grad_norm": 1.0,
    "lr_scheduler_type": "cosine"
}

2.2.2 过拟合处理

SFT中常见的过拟合问题和解决方案:

问题表现解决方案
记忆化逐字重复训练数据增加数据多样性、使用dropout
模式崩溃回复过于简短/模板化增加回复长度惩罚、混合训练
能力退化预训练能力下降保留预训练数据联合训练
class SFTLoss:
    """SFT损失函数"""
    def __init__(self, model, config):
        self.model = model
        self.config = config
    
    def compute_loss(self, batch):
        """
        计算SFT损失
        
        只在assistant回复部分计算loss
        (用户输入部分mask)
        """
        outputs = self.model(**batch)
        
        # 移位后的损失计算
        # logits: (B, L, V), labels: (B, L)
        # shift_logits和labels对齐
        shift_logits = outputs.logits[..., :-1, :].contiguous()
        shift_labels = batch["labels"][..., 1:].contiguous()
        
        # 计算交叉熵损失
        loss = F.cross_entropy(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1),
            ignore_index=-100  # mask掉用户输入部分
        )
        
        # 添加长度正则化(防止过短回复)
        if self.config.get("length_penalty", 0) > 0:
            response_lengths = (batch["labels"] != -100).sum(dim=-1)
            length_penalty = self.config.length_penalty * torch.mean(
                1.0 / torch.log(response_lengths.float() + 1)
            )
            loss = loss + length_penalty
        
        return loss

2.3 多任务微调vs选择性微调

2.3.1 多任务微调(MTF)

多任务微调在混合了多种任务的指令数据上训练:

class MultiTaskFineTuner:
    """多任务微调"""
    def __init__(self, tasks_data, model):
        self.model = model
        self.tasks = tasks_data
    
    def create_multitask_batch(self, batch_size=32):
        """
        从不同任务采样构建batch
        
        采样策略:
        1. 均匀采样
        2. 任务比例采样
        3. 难度感知的课程采样
        """
        tasks = list(self.tasks.keys())
        # 均匀采样
        task = random.choice(tasks)
        batch = random.sample(self.tasks[task], batch_size)
        return self.collate(batch)
    
    def train(self, num_steps):
        """多任务训练"""
        for step in range(num_steps):
            batch = self.create_multitask_batch()
            loss = self.compute_loss(batch)
            loss.backward()
            self.optimizer.step()
            self.scheduler.step()

2.3.2 选择性微调(Selective Fine-Tuning)

选择性微调只微调部分参数,保持模型整体能力:

class SelectiveFineTuner:
    """
    选择性微调
    
    只微调模型的特定层或模块
    """
    def __init__(self, model, train_layers=["layer.23", "layer.24", "lm_head"]):
        self.model = model
        
        # 冻结大部分参数
        for name, param in model.named_parameters():
            if not any(layer in name for layer in train_layers):
                param.requires_grad = False
        
        # 统计可训练参数
        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        total_params = sum(p.numel() for p in model.parameters())
        print(f"可训练参数: {trainable_params:,} / {total_params:,} ({100*trainable_params/total_params:.2f}%)")
    
    def apply_lora(self, rank=8, alpha=16, dropout=0.05):
        """
        结合LoRA的选择性微调
        
        详见第5节
        """
        from peft import get_peft_model, LoraConfig
        
        lora_config = LoraConfig(
            r=rank,
            lora_alpha=alpha,
            target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
            lora_dropout=dropout,
            task_type="CAUSAL_LM"
        )
        self.model = get_peft_model(self.model, lora_config)

3. 人类反馈强化学习(RLHF)

RLHF(Reinforcement Learning from Human Feedback)通过人类偏好信号对齐模型行为,是现代LLM对齐的核心技术。

3.1 Reward Model训练

3.1.1 Bradley-Terry模型

Reward Model基于Bradley-Terry模型2建模人类偏好:

偏好概率模型

其中 是回复 相对于输入 的奖励值, 是sigmoid函数。

损失函数

class RewardModel(nn.Module):
    """
    Reward Model (Bradley-Terry)
    
    结构与语言模型相同,但输出单个标量reward
    """
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        self.reward_head = nn.Linear(base_model.config.hidden_size, 1)
    
    def forward(self, input_ids, attention_mask):
        """
        计算回复的奖励值
        
        返回:
            rewards: (batch_size, seq_len)
            last_reward: (batch_size,) - 最后一个token的奖励(代表整个回复)
        """
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        hidden_states = outputs.last_hidden_state  # (B, L, H)
        
        # 计算每个token的reward
        rewards = self.reward_head(hidden_states).squeeze(-1)  # (B, L)
        
        # 使用最后一个非pad token的reward作为整条回复的reward
        sequence_lengths = attention_mask.sum(dim=1) - 1
        last_rewards = rewards.gather(1, sequence_lengths.unsqueeze(1)).squeeze(-1)
        
        return rewards, last_rewards
 
def compute_reward_loss(reward_model, chosen_ids, chosen_mask, 
                       rejected_ids, rejected_mask):
    """
    计算Bradley-Terry偏好损失
    """
    # 编码chosen和rejected回复
    # 注意:通常chosen和rejected拼接在同一个prompt后面
    
    # 分离计算chosen和rejected的reward
    chosen_rewards = reward_model(chosen_ids, chosen_mask)[1]
    rejected_rewards = reward_model(rejected_ids, rejected_mask)[1]
    
    # Bradley-Terry损失
    loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()
    
    # 奖励边际正则化(可选)
    margin = F.relu(1.0 - (chosen_rewards - rejected_rewards)).mean()
    
    return loss + 0.1 * margin

3.1.2 Reward Model训练实践

class RewardModelTrainer:
    """Reward Model训练器"""
    def __init__(self, model, config):
        self.model = model
        self.config = config
        self.optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=config.learning_rate,
            weight_decay=config.weight_decay
        )
    
    def train_step(self, batch):
        """
        一个训练步骤
        
        batch包含:
        - prompt: 问题
        - chosen: 偏好的回复
        - rejected: 不偏好的回复
        """
        # 构建输入
        chosen_inputs = self.concat_prompt_response(
            batch["prompt"], batch["chosen"]
        )
        rejected_inputs = self.concat_prompt_response(
            batch["prompt"], batch["rejected"]
        )
        
        # 计算损失
        loss = compute_reward_loss(
            self.model,
            chosen_inputs["ids"],
            chosen_inputs["mask"],
            rejected_inputs["ids"],
            rejected_inputs["mask"]
        )
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
        self.optimizer.step()
        
        return {"loss": loss.item()}

3.2 PPO算法详解

Proximal Policy Optimization(PPO)3是RLHF的核心算法,用于优化语言模型策略。

3.2.1 优势估计(GAE)

Generalized Advantage Estimation (GAE) 提供了一种偏差-方差权衡的优势估计:

其中 是TD残差。

def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
    """
    计算GAE优势估计
    
    参数:
        rewards: (T,) 奖励序列
        values: (T+1,) 价值估计(包括最后一个状态)
        dones: (T,) 是否终止
    
    返回:
        advantages: (T,) 优势估计
        returns: (T,) 回报(用于价值函数训练)
    """
    T = len(rewards)
    advantages = torch.zeros(T)
    
    gae = 0
    for t in reversed(range(T)):
        # TD残差
        delta = rewards[t] + gamma * values[t+1] * (1 - dones[t]) - values[t]
        # GAE累加
        gae = delta + gamma * lam * (1 - dones[t]) * gae
        advantages[t] = gae
    
    # 回报 = 优势 + 价值基线
    returns = advantages + values[:-1]
    
    # 标准化优势(稳定训练)
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
    
    return advantages, returns

3.2.2 PPO裁剪机制

PPO的核心创新是裁剪机制,防止策略更新过大:

PPO-Clip目标函数

其中概率比率

class PPOLanguageModel:
    """
    PPO for Language Models (PPO-PTX)
    
    论文: Learning to summarize with human feedback (Stiennon et al., 2020)
    """
    def __init__(self, actor_model, ref_model, reward_model, value_model, config):
        self.actor = actor_model          # 待优化的策略
        self.ref_model = ref_model         # 参考模型(SFT模型)
        self.reward_model = reward_model   # 奖励模型
        self.value_model = value_model     # 价值模型
        self.config = config
    
    def compute_log_probs(self, model, input_ids, attention_mask, action_ids):
        """
        计算action_ids对应的log概率
        
        使用log_prob而不是prob防止数值下溢
        """
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits  # (B, L, V)
        
        # 计算action的log概率
        # logits[:, :-1] 对应 action_ids[:, 1:]
        log_probs = F.log_softmax(logits, dim=-1)
        
        # gather得到每个action的log_prob
        action_log_probs = log_probs.gather(
            dim=-1,
            index=action_ids.unsqueeze(-1)
        ).squeeze(-1)
        
        return action_log_probs
    
    def ppo_loss(self, old_log_probs, new_log_probs, advantages, epsilon=0.2):
        """
        计算PPO裁剪损失
        
        $L^{CLIP} = \min(r \cdot A, \text{clip}(r, 1-\epsilon, 1+\epsilon) \cdot A)$
        
        其中 $r = \exp(\log \pi_{new} - \log \pi_{old})$
        """
        # 概率比的对数
        log_ratio = new_log_probs - old_log_probs
        ratio = torch.exp(log_ratio)
        
        # 未裁剪的代理损失
        surr1 = ratio * advantages
        
        # 裁剪后的代理损失
        surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * advantages
        
        # 取较小的值(外层min)
        ppo_loss = -torch.min(surr1, surr2).mean()
        
        # 熵奖励(鼓励探索)
        entropy_bonus = self.entropy_coef * self.compute_entropy(logits)
        
        return ppo_loss - entropy_bonus
    
    def step(self, prompts, responses):
        """
        PPO训练步骤
        
        1. 计算参考模型和当前模型的log_probs
        2. 使用reward_model计算奖励
        3. 使用GAE计算优势
        4. 更新策略
        """
        # 参考模型的log_probs
        with torch.no_grad():
            ref_log_probs = self.compute_log_probs(
                self.ref_model,
                prompts, responses
            )
        
        # 当前模型的log_probs
        new_log_probs = self.compute_log_probs(
            self.actor,
            prompts, responses
        )
        
        # 奖励计算
        rewards = self.reward_model(prompts, responses)
        
        # GAE优势估计
        advantages, returns = compute_gae(
            rewards,
            self.value_model(prompts, responses),
            dones=torch.zeros_like(rewards),
            gamma=self.config.gamma,
            lam=self.config.lam
        )
        
        # PPO损失
        ppo_loss = self.ppo_loss(
            ref_log_probs,
            new_log_probs,
            advantages
        )
        
        # 价值函数损失
        values = self.value_model(prompts, responses)
        value_loss = F.mse_loss(values, returns)
        
        # 预训练损失(PPO-PTX)
        if self.config.pretrain_loss_coef > 0:
            pt_loss = self.compute_pretrain_loss(prompts, responses)
            total_loss = ppo_loss + 0.5 * value_loss + self.config.pretrain_loss_coef * pt_loss
        else:
            total_loss = ppo_loss + 0.5 * value_loss
        
        return {"loss": total_loss, "ppo_loss": ppo_loss, "value_loss": value_loss}

3.3 DPO(Direct Preference Optimization)

DPO4是一种无需显式训练 Reward Model 和执行 PPO 的对齐方法。

3.3.1 DPO原理

DPO将RLHF的优化目标重新参数化:

RLHF原始目标(带KL约束的reward最大化):

DPO闭式解

DPO损失函数

简写为:

class DPO:
    """
    Direct Preference Optimization (DPO)
    
    论文: DPO: Direct Preference Optimization for Language Models
    """
    def __init__(self, policy_model, ref_model, beta=0.1):
        self.policy = policy_model
        self.ref_model = ref_model
        self.beta = beta  # KL惩罚系数
    
    def dpo_loss(self, prompt_chosen, prompt_rejected, 
                 chosen_logps, rejected_logps,
                 ref_chosen_logps, ref_rejected_logps):
        """
        计算DPO损失
        
        参数:
            prompt_chosen: 偏好回复的输入
            prompt_rejected: 不偏好回复的输入
            chosen_logps: 策略模型对偏好回复的log概率
            rejected_logps: 策略模型对不偏好回复的log概率
            ref_chosen_logps: 参考模型对偏好回复的log概率
            ref_rejected_logps: 参考模型对不偏好回复的log概率
        """
        # 策略相对于参考模型的log概率差
        policy_chosen_logps = chosen_logps
        policy_rejected_logps = rejected_logps
        
        # 计算相对对数概率(相当于隐式的reward差)
        logits = self.beta * (
            (policy_chosen_logps - ref_chosen_logps) -
            (policy_rejected_logps - ref_rejected_logps)
        )
        
        # DPO损失:类似Bradley-Terry,但直接在策略空间操作
        loss = -F.logsigmoid(logits).mean()
        
        # 可选:添加KL正则化
        kl_chosen = (policy_chosen_logps - ref_chosen_logps).mean()
        kl_rejected = (policy_rejected_logps - ref_rejected_logps).mean()
        kl_loss = (kl_chosen + kl_rejected) / 2
        
        return loss, {"dpo_loss": loss, "kl_loss": kl_loss}
    
    def train_step(self, batch):
        """DPO训练步骤"""
        # 获取序列(已包含prompt)
        chosen_sequences = batch["chosen"]
        rejected_sequences = batch["rejected"]
        
        # 计算策略模型的log概率
        chosen_logps = self.compute_sequence_logps(
            self.policy, chosen_sequences
        )
        rejected_logps = self.compute_sequence_logps(
            self.policy, rejected_sequences
        )
        
        # 计算参考模型的log概率
        with torch.no_grad():
            ref_chosen_logps = self.compute_sequence_logps(
                self.ref_model, chosen_sequences
            )
            ref_rejected_logps = self.compute_sequence_logps(
                self.ref_model, rejected_sequences
            )
        
        # 计算DPO损失
        loss, loss_info = self.dpo_loss(
            batch["prompt"], batch["prompt"],
            chosen_logps, rejected_logps,
            ref_chosen_logps, ref_rejected_logps
        )
        
        return loss, loss_info

3.3.2 DPO vs PPO对比

特性PPO + RMDPO
需要训练的模型Policy + Reward + Value仅Policy
需要额外数据偏好数据(训练RM)偏好数据(直接使用)
计算成本高(需要PPO采样)中等
训练稳定性需要KL约束调参相对稳定
理论保证有RL理论支持经验性方法
大规模训练更易扩展

3.4 KTO(Kahneman-Tversky Optimization)

KTO5基于前景理论(Prospect Theory),将人类偏好建模为损失厌恶和确定性效应。

3.4.1 Kahneman-Tversky价值函数

KTO使用分段价值函数建模人类偏好:

其中 表示损失厌恶系数, 表示敏感性递减。

3.4.2 KTO损失函数

class KTO:
    """
    KTO: Kahneman-Tversky Optimization
    
    论文: KTO: Model Alignment as Prospect Theoretic Optimization
    """
    def __init__(self, policy_model, ref_model, beta=0.1, lambda_loss=1.0):
        self.policy = policy_model
        self.ref_model = ref_model
        self.beta = beta
        self.lambda_loss = lambda_loss  # 损失厌恶系数
    
    def kto_loss(self, chosen_logps, rejected_logps,
                ref_chosen_logps, ref_rejected_logps):
        """
        计算KTO损失
        
        KTO将每个样本建模为"中性"或"偏离",而不需要成对偏好
        这使其对标注噪声更鲁棒
        """
        # 计算隐式reward(相对于参考模型的偏好强度)
        chosen_rewards = self.beta * (chosen_logps - ref_chosen_logps)
        rejected_rewards = self.beta * (rejected_logps - ref_rejected_logps)
        
        # 优势估计(偏离程度)
        # 正值表示样本符合人类偏好
        advantages = chosen_rewards - rejected_rewards
        
        # Kahneman-Tversky价值函数(简化为指数形式)
        # 对于正优势(符合偏好):v(a) = a
        # 对于负优势(偏离偏好):v(a) = -lambda * |a|
        
        # 使用sigmoid加权组合模拟价值函数
        # 这允许KTO处理非成对数据
        
        # 计算KTO特有的损失
        # 最大化正样本的相对偏好,最小化负样本
        
        # 简化的KTO损失
        # 基于参考模型定义"期望"响应
        
        # P(chosen is preferred) = sigmoid(beta * (log pi - log pi_ref))
        # P(rejected is preferred) = 1 - P(chosen is preferred)
        
        chosen_prob = torch.sigmoid(advantages)
        rejected_prob = 1 - chosen_prob
        
        # Kahneman-Tversky权重函数
        # w+(p) = p^0.5, w-(p) = lambda * p^0.5
        # 但实践中使用更简单的形式
        
        # KT-inspired 损失
        # 正样本损失(sigmoid cross-entropy style)
        loss_chosen = -F.logsigmoid(advantages)
        
        # 负样本损失(加权)
        loss_rejected = -self.lambda_loss * F.logsigmoid(-advantages)
        
        # 总损失
        loss = (loss_chosen + loss_rejected).mean()
        
        return loss, {
            "kto_loss": loss,
            "avg_advantage": advantages.mean().item(),
            "avg_prob_preferred": chosen_prob.mean().item()
        }

3.5 RLHF实践问题

3.5.1 模式崩溃(Mode Collapse)

问题表现

模型开始产生重复、简短或缺乏多样性的回复。

原因分析

  1. KL散度惩罚过强:策略偏离参考模型过多时,奖励被过度利用
  2. 奖励信号稀疏:模型找到”作弊”方式满足奖励
  3. 过拟合到特定模式:PPO过度优化

解决方案

# 解决方案1: 调整KL系数
config = {
    "beta": 0.01,  # 从0.1降低,观察效果
    "kl_penalty": "full",  # 或 "short"
}
 
# 解决方案2: 混合预训练损失
# PPO-PTX: 添加预训练语言模型损失
loss = ppo_loss + 0.1 * pretrain_loss
 
# 解决方案3: 对抗性训练
class对抗性PPO:
    """
    对抗性PPO增加判别器鼓励多样性
    """
    def __init__(self):
        self.discriminator = Discriminator()
    
    def diversity_loss(self, responses):
        """
        基于判别器的多样性损失
        """
        logits = self.discriminator(responses)
        # 鼓励模型产生"真实"的回复
        # 同时判别器学习区分不同风格
        return -F.cross_entropy(logits, torch.zeros_like(logits))

3.5.2 奖励黑客(Reward Hacking)

问题表现

模型学会”欺骗”奖励模型,而不是真正学习目标行为。

典型例子

  • 对话模型学会使用特定词汇但语义不正确
  • 代码生成模型产生可运行但不正确的代码
  • 摘要模型过度压缩或包含虚假信息

检测与缓解

class RewardHackingDetector:
    """奖励黑客检测"""
    def __init__(self, reward_model, config):
        self.reward_model = reward_model
        self.baseline_responses = []
        self.baseline_rewards = []
    
    def update_baseline(self, new_responses, new_rewards):
        """更新baseline用于检测"""
        self.baseline_responses.extend(new_responses)
        self.baseline_rewards.extend(new_rewards)
    
    def detect_hacking(self, current_responses):
        """
        检测奖励黑客
        
        指标:
        1. 奖励增加但质量指标下降
        2. 响应长度异常变化
        3. 词汇分布变化
        """
        # 1. 计算与baseline的分布距离
        current_rewards = self.reward_model(current_responses)
        reward_increase = torch.mean(current_rewards) - torch.mean(
            torch.tensor(self.baseline_rewards)
        )
        
        # 2. 质量指标
        diversity_score = self.compute_diversity(current_responses)
        length_change = self.compute_length_change(current_responses)
        
        # 3. 词汇分布(使用JS散度)
        vocab_dist = self.compute_vocab_distribution(current_responses)
        baseline_dist = self.compute_vocab_distribution(self.baseline_responses)
        js_divergence = 0.5 * (kl_div(vocab_dist, baseline_dist) + 
                              kl_div(baseline_dist, vocab_dist))
        
        # 检测结果
        hacking_detected = (
            reward_increase > 0.5 and
            diversity_score < 0.3 and
            js_divergence > 0.2
        )
        
        return {
            "hacking_detected": hacking_detected,
            "reward_increase": reward_increase,
            "diversity_score": diversity_score,
            "js_divergence": js_divergence
        }
    
    def mitigation_strategies(self):
        """
        缓解策略
        
        1. 组合奖励:Reward + Quality + Safety
        2. 课程RL:逐步增加KL约束
        3. 对抗性RM:训练判别器检测黑客
        """
        pass

4. 训练稳定性与优化

4.1 权重初始化策略

4.1.1 Transformer初始化

现代Transformer的初始化策略:

def initialize_transformer(model, config):
    """
    Transformer权重初始化
    
    关键原则:
    1. 残差分支使用缩放初始化
    2. embedding使用较小初始化
    3. 输出层使用适当缩放
    """
    for name, param in model.named_parameters():
        if "embedding" in name:
            # Embedding: 使用较小初始化
            nn.init.normal_(param, mean=0, std=0.02)
        
        elif "q_proj" in name or "k_proj" in name or "v_proj" in name:
            # QKV投影: 标准初始化
            nn.init.normal_(param, mean=0, std=0.02)
        
        elif "o_proj" in name:
            # 输出投影: 缩放以保持残差路径稳定
            nn.init.normal_(param, mean=0, std=0.02 / np.sqrt(2 * config.num_layers))
        
        elif "gate_proj" in name or "up_proj" in name:
            # SwiGLU门控: 较小初始化
            nn.init.normal_(param, mean=0, std=0.02)
        
        elif "down_proj" in name:
            # SwiGLU下投影
            nn.init.normal_(param, mean=0, std=0.02 / np.sqrt(2 * config.num_layers))
        
        elif "lm_head" in name:
            # LM Head: 输出层适当缩放
            nn.init.normal_(param, mean=0, std=0.02 / np.sqrt(config.hidden_size))
        
        elif "LayerNorm" in name:
            # LayerNorm: 保持默认(weight=1, bias=0)
            if param.dim() > 1:
                nn.init.normal_(param, mean=1, std=0.02)
        
        elif "bias" in name:
            # 偏置: 零初始化
            nn.init.zeros_(param)

4.1.2 残差缩放

残差累积可能导致训练不稳定,需要适当缩放:

class ScaledResidualBlock(nn.Module):
    """
    带缩放的残差块
    
    每个残差分支乘以 $1/\sqrt{2}$ 防止残差累积过大
    """
    def __init__(self, layer, num_layers):
        super().__init__()
        self.layer = layer
        self.scale_factor = 1.0 / np.sqrt(2 * num_layers)
    
    def forward(self, x, **kwargs):
        return (self.layer(x, **kwargs) + x) * self.scale_factor

4.2 Learning Rate Schedule

4.2.1 Warmup与Decay策略

标准Transformer训练的学习率调度:

class TransformerScheduler:
    """
    Transformer学习率调度
    
    公式: lr = d_model^{-0.5} * min(step^{-0.5}, step * warmup_steps^{-1.5})
    """
    def __init__(self, optimizer, d_model, warmup_steps):
        self.optimizer = optimizer
        self.d_model = d_model
        self.warmup_steps = warmup_steps
        self.current_step = 0
        self.base_lrs = [group["lr"] for group in optimizer.param_groups]
    
    def step(self):
        self.current_step += 1
        lr = self.get_lr()
        for i, group in enumerate(self.optimizer.param_groups):
            group["lr"] = lr * self.base_lrs[i] / self.base_lrs[0] if i > 0 else lr
    
    def get_lr(self):
        """计算当前学习率"""
        step = max(1, self.current_step)
        # 基础缩放因子
        scale = self.d_model ** (-0.5)
        
        if step <= self.warmup_steps:
            # Warmup阶段: 线性增加
            return scale * step / self.warmup_steps
        else:
            # 衰减阶段: 逆平方根衰减
            return scale * step ** (-0.5)
 
# 常用调度配置
scheduler_configs = {
    "llama": {
        "lr": 1e-3,
        "warmup_ratio": 0.05,
        "scheduler": "cosine",
        "min_lr_ratio": 0.1
    },
    "bert": {
        "lr": 1e-4,
        "warmup_ratio": 0.1,
        "scheduler": "linear",
        "min_lr_ratio": 0.0
    },
    "gpt": {
        "lr": 6e-4,
        "warmup_ratio": 0.01,
        "scheduler": "cosine",
        "min_lr_ratio": 0.1
    }
}

4.2.2 Cosine Decay with Warm Restarts

class CosineAnnealingWarmRestarts(Scheduler):
    """带Warm Restarts的余弦退火"""
    def __init__(self, optimizer, T_0, T_mult=1, eta_min=0):
        self.T_0 = T_0
        self.T_i = T_0
        self.T_mult = T_mult
        self.eta_min = eta_min
        self.T_cur = 0
        super().__init__(optimizer)
    
    def get_lr(self):
        return [
            self.eta_min + (base_lr - self.eta_min) *
            (1 + np.cos(np.pi * self.T_cur / self.T_i)) / 2
            for base_lr in self.base_lrs
        ]
    
    def step(self):
        self.T_cur += 1
        if self.T_cur >= self.T_i:
            self.T_cur = 0
            self.T_i *= self.T_mult
        super().step()

4.3 Gradient Clipping与正则化

4.3.1 梯度裁剪

def clip_gradients(model, max_norm=1.0, norm_type=2.0):
    """
    梯度裁剪
    
    公式: grad = grad * min(1, max_norm / ||grad||_norm_type)
    
    参数:
        max_norm: 最大梯度范数
        norm_type: 范数类型(1=L1, 2=L2)
    """
    torch.nn.utils.clip_grad_norm_(
        model.parameters(),
        max_norm=max_norm,
        norm_type=norm_type
    )
 
# 不同训练阶段的裁剪策略
clip_strategies = {
    "pretraining": {
        "max_norm": 1.0,
        "condition": "always"  # 或 "when_loss_spike"
    },
    "sft": {
        "max_norm": 1.0,
        "condition": "always"
    },
    "rlhf_ppo": {
        "max_norm": 0.5,  # 更保守
        "condition": "always"
    }
}

4.3.2 正则化技术

技术作用LLM中的典型值
Weight Decay防止权重过大0.01-0.1
Dropout防止过拟合0.0-0.1
R-Drop一致性正则化
Label Smoothing软化标签0.1
class RDropLoss(nn.Module):
    """
    R-Drop: Regularized Dropout for Neural Networks
    
    对同一输入做两次前向传播,鼓励两次输出的一致性
    """
    def __init__(self, kl_loss_weight=0.5):
        super().__init__()
        self.kl_loss_weight = kl_loss_weight
    
    def forward(self, logits1, logits2, labels):
        """
        logits1, logits2: 两次前向的logits
        """
        # 标准CE损失
        ce_loss = (F.cross_entropy(logits1, labels) + 
                  F.cross_entropy(logits2, labels)) / 2
        
        # KL散度正则化
        p = F.log_softmax(logits1, dim=-1)
        q = F.log_softmax(logits2, dim=-1)
        kl_loss = (F.kl_div(p, q, reduction='batchmean') +
                  F.kl_div(q, p, reduction='batchmean')) / 2
        
        return ce_loss + self.kl_loss_weight * kl_loss

4.4 训练监控与异常检测

4.4.1 关键指标监控

class TrainingMonitor:
    """训练监控"""
    def __init__(self, config):
        self.metrics = defaultdict(list)
        self.alert_thresholds = {
            "loss_spike": 2.0,  # 比EMA高的倍数
            "grad_norm": 100.0,
            "nan_ratio": 0.0,
            "learning_rate_abnormal": 1e-6
        }
        self.ema_window = 100
    
    def compute_ema(self, values, alpha=0.95):
        """计算指数移动平均"""
        if not values:
            return 0
        ema = values[0]
        for v in values[1:]:
            ema = alpha * ema + (1 - alpha) * v
        return ema
    
    def check_anomalies(self, metrics):
        """检测训练异常"""
        alerts = []
        
        # Loss spike检测
        recent_losses = self.metrics["loss"][-self.ema_window:]
        ema_loss = self.compute_ema(recent_losses)
        current_loss = metrics.get("loss", ema_loss)
        
        if current_loss > ema_loss * self.alert_thresholds["loss_spike"]:
            alerts.append({
                "type": "loss_spike",
                "current": current_loss,
                "ema": ema_loss,
                "severity": "high"
            })
        
        # 梯度爆炸检测
        grad_norm = metrics.get("grad_norm", 0)
        if grad_norm > self.alert_thresholds["grad_norm"]:
            alerts.append({
                "type": "grad_explosion",
                "grad_norm": grad_norm,
                "severity": "critical"
            })
        
        # NaN/Inf检测
        if metrics.get("has_nan", False) or metrics.get("has_inf", False):
            alerts.append({
                "type": "numerical_error",
                "severity": "critical"
            })
        
        return alerts
    
    def should_stop(self, metrics):
        """判断是否应该停止训练"""
        alerts = self.check_anomalies(metrics)
        
        for alert in alerts:
            if alert["severity"] == "critical":
                # 连续critical告警超过阈值
                self.critical_count += 1
                if self.critical_count > 3:
                    return True, "Too many critical anomalies"
        
        return False, None

4.4.2 Loss Spike恢复策略

class LossSpikeRecovery:
    """Loss Spike恢复机制"""
    def __init__(self, model, optimizer):
        self.model = model
        self.optimizer = optimizer
        self.best_loss = float('inf')
        self.checkpoint_path = "best_model.pt"
        self.spike_recovery_count = 0
    
    def check_and_recover(self, loss, grad_norm):
        """检测并恢复"""
        is_spike = (
            loss > self.best_loss * 2.0 and
            grad_norm > 50.0
        )
        
        if is_spike:
            self.spike_recovery_count += 1
            
            # 保存当前状态
            self.save_checkpoint("spike_backup.pt")
            
            # 恢复最佳状态
            self.restore_checkpoint(self.checkpoint_path)
            
            # 降低学习率
            for param_group in self.optimizer.param_groups:
                param_group['lr'] *= 0.5
            
            return {
                "recovered": True,
                "spike_loss": loss,
                "recovery_action": "lr_reduction",
                "recovery_count": self.spike_recovery_count
            }
        
        # 更新最佳loss
        if loss < self.best_loss:
            self.best_loss = loss
            self.save_checkpoint(self.checkpoint_path)
        
        return {"recovered": False}

5. 最新训练技术

5.1 LoRA/QLoRA高效微调

5.1.1 LoRA原理

LoRA(Low-Rank Adaptation)6通过低秩分解冻结预训练权重,仅训练低秩矩阵:

核心假设:预训练语言模型的权重更新具有低内在秩。

参数化:对于预训练权重 ,LoRA添加低秩分解:

其中 是秩。

class LoRALinear(nn.Module):
    """
    LoRA线性层
    
    前向: h = W_0x + BAx
    """
    def __init__(self, in_features, out_features, rank=4, alpha=16, dropout=0.0):
        super().__init__()
        self.rank = rank
        self.alpha = alpha
        self.scaling = alpha / rank
        
        # 冻结预训练权重
        self.weight = nn.Parameter(
            torch.randn(out_features, in_features),
            requires_grad=False
        )
        
        # LoRA可训练参数
        self.lora_A = nn.Parameter(torch.randn(rank, in_features))
        self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
        
        if dropout > 0:
            self.lora_dropout = nn.Dropout(p=dropout)
        else:
            self.lora_dropout = nn.Identity()
        
        self.reset_parameters()
    
    def reset_parameters(self):
        """LoRA参数初始化"""
        nn.init.kaiming_uniform_(self.lora_A, a=np.sqrt(5))
        nn.init.zeros_(self.lora_B)
    
    def forward(self, x):
        """
        前向传播
        
        h = Wx + s * BAx
        """
        # 冻结部分
        base_output = F.linear(x, self.weight, bias=None)
        
        # LoRA部分
        lora_output = F.linear(
            self.lora_dropout(x) @ self.lora_A.T,
            self.lora_B
        )
        
        return base_output + self.scaling * lora_output
 
class LoRAConfig:
    """LoRA配置"""
    def __init__(self, rank=8, alpha=16, dropout=0.05,
                 target_modules=None, bias="none", task_type="CAUSAL_LM"):
        self.r = rank
        self.lora_alpha = alpha
        self.lora_dropout = dropout
        self.target_modules = target_modules or ["q_proj", "v_proj"]
        self.bias = bias
        self.task_type = task_type
 
def apply_lora_to_model(model, config):
    """将LoRA应用到模型"""
    for name, module in model.named_modules():
        if any(target in name for target in config.target_modules):
            # 获取原始层参数
            original_linear = module
            in_features = original_linear.in_features
            out_features = original_linear.out_features
            
            # 替换为LoRA层
            lora_linear = LoRALinear(
                in_features, out_features,
                rank=config.r,
                alpha=config.lora_alpha,
                dropout=config.lora_dropout
            )
            
            # 复制预训练权重
            lora_linear.weight.data = original_linear.weight.data.clone()
            if original_linear.bias is not None:
                lora_linear.bias = original_linear.bias
                original_linear.bias = None
            
            # 替换
            set_module(model, name, lora_linear)
    
    return model

5.1.2 QLoRA实现

QLoRA(Quantized LoRA)7结合4位量化与LoRA,大幅降低显存:

class QLoRALinear(nn.Module):
    """
    QLoRA: 4位量化 + LoRA
    
    核心思想:
    1. 将预训练权重量化为NF4格式
    2. 以脱_quant形式存储用于计算
    3. LoRA在反量化空间训练
    """
    def __init__(self, weight, bias=None, rank=8, alpha=16, quant_type="nf4"):
        super().__init__()
        
        # 量化权重
        self.weight = self.quantize(weight, quant_type)
        self.weight_float = None  # 用于恢复
        
        # 量化参数
        self.quant_type = quant_type
        self.scale = self.compute_scale(weight)
        
        # LoRA参数
        self.lora_A = nn.Parameter(torch.randn(rank, weight.shape[1]))
        self.lora_B = nn.Parameter(torch.zeros(weight.shape[0], rank))
        self.scaling = alpha / rank
        
        if bias is not None:
            self.bias = nn.Parameter(bias)
        else:
            self.bias = None
    
    def quantize(self, weight, quant_type="nf4"):
        """4位量化"""
        if quant_type == "nf4":
            # Normal Float 4-bit
            # 量化到-1到1范围内的16个值
            return self.nf4_quantize(weight)
        elif quant_type == "fp4":
            # Float 4-bit
            return self.fp4_quantize(weight)
        else:
            raise ValueError(f"Unknown quant type: {quant_type}")
    
    def nf4_quantize(self, weight):
        """NF4量化"""
        # NF4的量化中心点
        nf4_map = torch.tensor([
            -1.0, -0.6961928, -0.52507305, -0.3949172,
            -0.28489186, -0.18415245, -0.0900261, 0.0,
            0.0900261, 0.18415245, 0.28489186, 0.3949172,
            0.52507305, 0.6961928, 0.905123, 1.0
        ])
        
        # 计算每个权重的最近NF4值
        weight_flat = weight.flatten()
        distances = torch.cdist(
            weight_flat.unsqueeze(0).cuda(),
            nf4_map.cuda().unsqueeze(0)
        )
        indices = distances.argmin(dim=1)
        
        return indices.view(weight.shape)
    
    def dequantize(self):
        """反量化"""
        if self.weight_float is not None:
            return self.weight_float
        
        # 从量化形式反量化
        weight_flat = self.weight.flatten()
        dequantized = torch.zeros_like(weight_flat)
        
        nf4_map = torch.tensor([...])  # 同上
        for i, idx in enumerate(weight_flat):
            dequantized[i] = nf4_map[idx] * self.scale
        
        self.weight_float = dequantized.view_as(self.weight)
        return self.weight_float
    
    def forward(self, x):
        # 反量化权重(只执行一次,缓存)
        if self.weight_float is None:
            self.weight_float = self.dequantize()
        
        # 基座输出
        base_output = F.linear(x, self.weight_float, self.bias)
        
        # LoRA输出
        lora_output = F.linear(x @ self.lora_A.T, self.lora_B)
        
        return base_output + self.scaling * lora_output

5.2 课程学习策略

课程学习(Curriculum Learning)按难度递增组织训练样本:

class CurriculumScheduler:
    """
    课程学习调度器
    
    难度评估方式:
    1. 样本长度
    2. 困惑度
    3. 任务复杂度
    4. 教师模型的一致性
    """
    def __init__(self, dataset, difficulty_fn, strategy="linear"):
        self.dataset = dataset
        self.difficulty_fn = difficulty_fn
        self.strategy = strategy
        self.num_samples = len(dataset)
        
        # 计算所有样本的难度
        self.difficulties = self.compute_difficulties()
        self.sorted_indices = np.argsort(self.difficulties)
    
    def compute_difficulties(self):
        """计算样本难度"""
        difficulties = []
        for i in range(len(self.dataset)):
            sample = self.dataset[i]
            difficulty = self.difficulty_fn(sample)
            difficulties.append(difficulty)
        return np.array(difficulties)
    
    def get_batch(self, training_step, batch_size):
        """
        获取课程批次
        
        采样策略:
        1. Linear: 逐步增加当前阶段样本比例
        2. Exp: 指数增长
        3. Stage: 分阶段切换
        """
        if self.strategy == "linear":
            # 线性增加:当前难度阈值
            progress = training_step / self.total_steps
            max_difficulty_idx = int(progress * self.num_samples)
            available_indices = self.sorted_indices[:max(max_difficulty_idx, batch_size)]
        
        elif self.strategy == "exp":
            # 指数增长
            progress = training_step / self.total_steps
            num_easy = int(self.num_samples ** progress)
            available_indices = self.sorted_indices[:max(num_easy, batch_size)]
        
        elif self.strategy == "stage":
            # 分阶段
            stages = [0.2, 0.4, 0.6, 0.8, 1.0]
            current_stage = min(int(progress * len(stages)), len(stages) - 1)
            threshold = stages[current_stage] * self.num_samples
            available_indices = self.sorted_indices[:int(threshold)]
        
        # 从可用样本中采样
        selected = np.random.choice(available_indices, size=batch_size, replace=False)
        return [self.dataset[i] for i in selected]
    
    def difficulty_fn_example(self, sample):
        """
        示例难度评估函数
        
        综合考虑多个因素
        """
        # 1. 长度难度
        length_score = np.log(sample["num_tokens"])
        
        # 2. 任务复杂度(如果有)
        task_score = self.task_complexity(sample.get("task_type", "simple"))
        
        # 3. 教师模型不确定性
        teacher_score = sample.get("teacher_uncertainty", 0)
        
        return length_score + 0.5 * task_score + 0.3 * teacher_score

5.3 数据课程设计

数据课程(Data Curriculum)关注训练数据的组织策略:

class DataCurriculum:
    """
    数据课程设计
    
    核心思想:
    1. 什么数据先学
    2. 不同阶段用什么数据配比
    3. 如何平衡广度和深度
    """
    def __init__(self, domains_data, config):
        self.domains = domains_data
        self.config = config
        self.current_phase = 0
    
    def get_phase_schedule(self):
        """
        定义训练阶段的数据配比
        
        典型阶段:
        1. 通用能力:广泛但简单
        2. 专业技能:深入特定领域
        3. 对齐优化:偏好数据
        """
        return [
            {"phase": 0, "domains": {"web": 0.6, "books": 0.3, "code": 0.1}},
            {"phase": 1, "domains": {"web": 0.4, "books": 0.3, "code": 0.2, "math": 0.1}},
            {"phase": 2, "domains": {"web": 0.3, "books": 0.2, "code": 0.2, "math": 0.15, "sft": 0.15}},
            {"phase": 3, "domains": {"sft": 0.5, "rlhf": 0.3, "web": 0.2}}
        ]
    
    def sample_curriculum_batch(self, training_step, batch_size):
        """从当前课程采样批次"""
        schedule = self.get_phase_schedule()
        
        # 确定当前阶段
        for i, phase in enumerate(schedule):
            if training_step >= phase["step_threshold"]:
                current_schedule = phase
        
        # 按比例采样
        batch = []
        for domain, ratio in current_schedule["domains"].items():
            domain_samples = int(batch_size * ratio)
            domain_batch = self.sample_from_domain(domain, domain_samples)
            batch.extend(domain_batch)
        
        # 打乱顺序
        random.shuffle(batch)
        return batch[:batch_size]
    
    def adaptive_resampling(self, loss_by_domain):
        """
        自适应重采样
        
        根据各领域损失动态调整采样权重
        """
        adjusted_weights = {}
        base_weights = self.get_phase_schedule()[self.current_phase]["domains"]
        
        # 损失高的领域增加采样
        for domain, base_weight in base_weights.items():
            domain_loss = loss_by_domain.get(domain, 1.0)
            # 逆损失加权
            adjusted_weights[domain] = base_weight / (domain_loss + 0.1)
        
        # 归一化
        total = sum(adjusted_weights.values())
        for domain in adjusted_weights:
            adjusted_weights[domain] /= total
        
        return adjusted_weights
 
# 完整训练流程整合
class LLMTrainingPipeline:
    """
    完整LLM训练流程
    """
    def __init__(self, config):
        self.config = config
        self.pretrain_stage = PretrainingStage(config.pretrain)
        self.sft_stage = SFTStage(config.sft)
        self.rlhf_stage = RLHFStage(config.rlhf)
        self.curriculum = DataCurriculum(config.domains)
    
    def train(self):
        # 阶段1: 预训练
        logger.info("Starting pretraining...")
        self.pretrain_stage.train()
        
        # 阶段2: SFT
        logger.info("Starting SFT...")
        self.sft_stage.train()
        
        # 阶段3: RLHF (PPO/DPO/KTO)
        logger.info("Starting RLHF alignment...")
        if self.config.rlhf.method == "dpo":
            self.train_dpo()
        elif self.config.rlhf.method == "ppo":
            self.train_ppo()
        elif self.config.rlhf.method == "kto":
            self.train_kto()
        
        logger.info("Training complete!")
        self.save_model()

6. 参考资料

扩展阅读:

Footnotes

  1. Wang Y, Kordi Y, Mishra S, et al. Self-Instruct: Aligning Language Models with Self-Generated Instructions. ACL, 2023. arXiv:2212.10560

  2. Bradley R A, Terry M E. Rank Analysis of Incomplete Block Designs: I. The Method of Paired Comparisons. Biometrika, 1952.

  3. Schulman J, Wolski F, Dhariwal P, et al. Proximal Policy Optimization Algorithms. arXiv:1707.06347, 2017.

  4. Rafailov R, Sharma A, Mitchell E, et al. Direct Preference Optimization: Your Language Model is Secretly a Reward Model. NeurIPS, 2023. arXiv:2305.18290

  5. Ethayarajh K, Kwon Y H, Gimpel K, et al. KTO: Model Alignment as Prospect Theoretic Optimization. arXiv:2402.01306, 2024.

  6. Hu E J, Shen Y, Wallis P, et al. LoRA: Low-Rank Adaptation of Large Language Models. ICLR, 2022. arXiv:2106.09685

  7. Dettmers T, Pagnoni A, Holtzman A, et al. QLoRA: Efficient Finetuning of Quantized LLMs. NeurIPS, 2023. arXiv:2305.14314