LLM训练流程详解
大语言模型(LLM)的训练是一个复杂的多阶段过程,从海量的原始文本预训练开始,经过有监督微调和强化学习对齐,最终形成具备强大能力的智能系统。本文详细介绍每个训练阶段的核心技术、算法原理和实践要点。
1. 预训练阶段
预训练是LLM学习语言能力和世界知识的核心阶段,通过在海量文本上执行语言建模任务,使模型习得丰富的语言表示。
1.1 数据处理流程
1.1.1 去重策略
大规模语料中存在大量重复内容,去重是保证数据质量的关键步骤。
文档级去重使用MinHash或SimHash计算文档相似度:
import numpy as np
from datasketch import MinHash
def minhash_deduplication(documents, threshold=0.8, num_perm=128):
"""
MinHash去重
参数:
documents: 文档列表
threshold: 相似度阈值,超过此值认为重复
num_perm: 哈希函数数量
"""
minhashes = []
for doc in documents:
# 分词
tokens = set(doc.split())
# 创建MinHash
mh = MinHash(num_perm=num_perm)
for token in tokens:
mh.update(token.encode('utf8'))
minhashes.append(mh)
# 聚类去重
duplicates = []
for i, mh1 in enumerate(minhashes):
for j, mh2 in enumerate(minhashes[i+1:], start=i+1):
if mh1.jaccard(mh2) > threshold:
duplicates.append((i, j))
return duplicates句子级去重针对段落内的重复句子:
1.1.2 质量过滤
质量过滤通常采用多维度策略:
| 过滤维度 | 方法 | 阈值建议 |
|---|---|---|
| 语言识别 | fastText/LangID | 目标语言占比>95% |
| 困惑度 | 基于小型语言模型 | PPL<50 |
| 敏感内容 | 关键词匹配/分类器 | 依应用场景 |
| 文本长度 | token数统计 | 50-100k tokens |
| 特殊字符 | 正则表达式 | 占比<10% |
def quality_filter(text, config):
"""
多维度质量过滤
"""
# 语言检测
lang = detect_language(text)
if lang != config.target_lang:
return False
# 长度过滤
token_count = len(tokenizer.encode(text))
if token_count < config.min_length or token_count > config.max_length:
return False
# 困惑度过滤
ppl = compute_perplexity(text, config.quality_model)
if ppl > config.ppl_threshold:
return False
# 特殊字符比例
special_ratio = count_special_chars(text) / len(text)
if special_ratio > 0.1:
return False
# 敏感内容检测
if contains_problematic_content(text, config.classifier):
return False
return True1.2 数据配比策略
高质量的预训练需要合理的数据配比,涵盖不同领域和来源。
1.2.1 领域分布设计
典型LLM的数据配比参考:
| 数据来源 | 占比 | 代表模型 |
|---|---|---|
| 网页抓取 | 60-70% | Common Crawl |
| 书籍/文献 | 10-15% | BooksCorpus, arXiv |
| 代码 | 5-15% | GitHub |
| 对话/社交 | 5-10% | Reddit, StackExchange |
| 百科 | 2-5% | Wikipedia |
1.2.2 课程学习策略
在训练过程中动态调整数据分布:
其中 可以是:
- 难度递增:先简单后复杂
- 主题聚焦:先广泛后专精
- 课程调度:按特定领域优先级排序
1.3 Tokenization与数据格式化
1.3.1 Tokenizer选择
现代LLM主要使用以下tokenizer:
| Tokenizer | 特点 | 词表大小 |
|---|---|---|
| BPE | 字节级编码,适合多语言 | 32k-100k |
| WordPiece | Google系模型采用 | 30k-100k |
| SentencePiece | 无空格语言支持 | 可变 |
from transformers import AutoTokenizer
# 加载预训练tokenizer
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b")
def prepare_training_data(texts, block_size=4096):
"""
准备预训练数据
参数:
texts: 原始文本列表
block_size: 上下文窗口大小(tokens)
"""
# Tokenize
tokenized = tokenizer(
texts,
truncation=True,
max_length=block_size,
return_overflowing_tokens=True,
padding="max_length"
)
# 构建输入-标签对(因果语言建模)
input_ids = tokenized["input_ids"]
labels = input_ids.copy() # 标签与输入相同
return {
"input_ids": input_ids,
"labels": labels,
"attention_mask": tokenized["attention_mask"]
}1.3.2 数据格式化模板
标准预训练格式(Causal LM):
[CLS] 文档1 [SEP] 文档2 [SEP] ... [SEP]
|_____|_____|_____|_____|_____|_____|
token token token token token ...
标签: 同input_ids,masked LM计算loss
交错格式(用于指令微调预训练):
[INST] 用户指令1 [/INST] 模型回复1 [INST] 用户指令2 [/INST] 模型回复2
1.4 分布式训练策略
训练数十亿参数的语言模型需要多GPU甚至多节点协作。
1.4.1 数据并行(Data Parallelism)
Naive Data Parallelism (DDP):
import torch
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
def setup_ddp():
"""初始化分布式训练环境"""
dist.init_process_group(backend="nccl")
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
return local_rank
def train_ddp(model, dataloader, optimizer, device):
"""数据并行训练"""
local_rank = setup_ddp()
model = model.to(local_rank)
model = DDP(model, device_ids=[local_rank])
model.train()
for batch in dataloader:
batch = {k: v.to(local_rank) for k, v in batch.items()}
optimizer.zero_grad()
outputs = model(**batch)
loss = outputs.loss / gradient_accumulation_steps
loss.backward()
# 梯度同步(在DDP中自动处理)
if (step + 1) % gradient_accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()ZeRO优化器(内存优化版数据并行):
ZeRO通过分片优化器状态、梯度和参数来减少内存占用:
| Stage | 优化内容 | 内存节省 |
|---|---|---|
| ZeRO-1 | 分片优化器状态 | ~4x |
| ZeRO-2 | 分片优化器状态+梯度 | ~8x |
| ZeRO-3 | 分片所有状态 | 线性扩展 |
# DeepSpeed ZeRO配置示例
ds_config = {
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": True
},
"offload_param": {
"device": "cpu",
"pin_memory": True
},
"overlap_comm": True,
"contiguous_gradients": True
},
"bf16": {"enabled": True},
"gradient_clipping": 1.0,
"gradient_accumulation_steps": 4
}1.4.2 模型并行(Model Parallelism)
张量并行(Tensor Parallelism, TP):
将单个层的参数矩阵按列或行切分到不同GPU:
class ColumnParallelLinear(nn.Module):
"""列并行线性层"""
def __init__(self, input_size, output_size, world_size):
super().__init__()
self.world_size = world_size
self.output_size_per_rank = output_size // world_size
# 分片权重矩阵
self.weight = nn.Parameter(
torch.randn(self.output_size_per_rank, input_size)
)
self.bias = nn.Parameter(
torch.zeros(self.output_size_per_rank)
)
def forward(self, x):
# x: (batch, seq, input_size)
# All-gather合并输出(如果需要)
output = F.linear(x, self.weight, self.bias)
return output
def parallel_forward(self, x):
"""张量并行前向"""
# 本地计算
local_output = F.linear(x, self.weight, self.bias)
# All-reduce 聚合结果
world_output = [torch.zeros_like(local_output) for _ in range(self.world_size)]
dist.all_gather(world_output, local_output)
return torch.cat(world_output, dim=-1)序列并行(Sequence Parallelism):
沿序列维度切分注意力计算:
class SequenceParallelAttention(nn.Module):
"""序列并行注意力"""
def __init__(self, config):
super().__init__()
self.world_size = dist.get_world_size()
self.rank = dist.get_rank()
def forward(self, x):
"""
x: (batch, seq_len // world_size, hidden)
"""
# 本地计算QKV
q, k, v = self.compute_qkv(x)
# 收集完整的K, V用于跨序列分片计算注意力
# All-cattenuate K, V
k_full = self.all_gather_kv(k)
v_full = self.all_gather_kv(v)
# 计算注意力(本地Q × 完整K/V)
attn = self.scaled_dot_product(q, k_full, v_full)
# 聚合输出
output = self.all_reduce_attn(attn)
return output
def all_gather_kv(self, tensor):
tensors_gather = [torch.empty_like(tensor) for _ in range(self.world_size)]
dist.all_gather(tensor_gather, tensor)
return torch.cat(tensor_gather, dim=1)1.4.3 流水线并行(Pipeline Parallelism)
流水线并行将模型按层分割到不同设备:
class PipelineStage(nn.Module):
"""流水线并行的一个阶段"""
def __init__(self, layers, start_layer_idx, end_layer_idx):
super().__init__()
self.layers = nn.ModuleList(layers[start_layer_idx:end_layer_idx])
self.start_idx = start_layer_idx
self.end_idx = end_layer_idx
def forward(self, x, input_tensor=None):
"""
前向传播
如果input_tensor不为空,则这是第一个stage,需要接收原始输入
"""
for layer in self.layers:
x = layer(x)
return x
def pipeline_schedule(stages, microbatches, num_stages):
"""
流水线调度(1F1B - One Forward One Backward)
理想情况下GPU利用率:
- 无流水线: ~1/(P+1) (P=stage数)
- 流水线: 接近1
"""
num_microbatches = len(microbatches)
for i in range(num_microbatches):
# Forward
x = microbatches[i]
for stage in stages:
x = stage(x)
# Backward
# ... (反向传播调度)GPipe vs PipeDream调度对比:
| 特性 | GPipe | PipeDream |
|---|---|---|
| 调度方式 | 微批次堆积 | 1F1B |
| 内存需求 | 高(需保存所有微批次激活) | 低(流水线缓冲) |
| GPU利用率 | 有气泡 | 更高 |
| 实现复杂度 | 简单 | 复杂 |
1.5 混合精度与Gradient Checkpointing
1.5.1 混合精度训练
混合精度利用BF16/FP16加速训练,同时保持FP32精度的主权重:
from torch.cuda.amp import autocast, GradScaler
class MixedPrecisionTrainer:
"""混合精度训练器"""
def __init__(self, model, optimizer, config):
self.model = model
self.optimizer = optimizer
self.scaler = GradScaler()
self.config = config
def training_step(self, batch):
# 前向传播使用BF16
with autocast(dtype=torch.bfloat16):
outputs = self.model(**batch)
loss = outputs.loss / self.config.gradient_accumulation_steps
# 反向传播
self.scaler.scale(loss).backward()
# 梯度裁剪
if self.is_gradient_accumulation_step():
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(
self.model.parameters(),
self.config.max_grad_norm
)
self.scaler.step(self.optimizer)
self.scaler.update()
self.optimizer.zero_grad()
def is_gradient_accumulation_step(self):
return (self.step + 1) % self.config.gradient_accumulation_steps == 0数值格式对比:
| 格式 | 符号位 | 指数位 | 尾数位 | 动态范围 |
|---|---|---|---|---|
| FP32 | 1 | 8 | 23 | ~1e38 |
| BF16 | 1 | 8 | 7 | ~1e39 |
| FP16 | 1 | 5 | 10 | ~65504 |
BF16相比FP16的优势在于保持与FP32相同的指数范围,避免梯度溢出。
1.5.2 Gradient Checkpointing
Gradient Checkpointing通过在前向传播时不保存中间激活,仅保存部分检查点,在反向传播时重新计算:
class GradientCheckpointingWrapper(nn.Module):
"""Gradient Checkpointing封装"""
def __init__(self, model, checkpoint_ratio=0.5):
super().__init__()
self.model = model
self.checkpoint_ratio = checkpoint_ratio
def forward(self, x):
"""
选择性梯度检查点
内存节省: ~50-70%
计算开销: ~20-30%
"""
# 计算需要检查点的层
num_layers = len(self.model.layers)
checkpoint_every = max(1, int(num_layers * self.checkpoint_ratio))
# 分块执行
for i in range(0, num_layers, checkpoint_every):
end_idx = min(i + checkpoint_every, num_layers)
if self.training and i > 0:
# 使用torch.utils.checkpoint
x = torch.utils.checkpoint.checkpoint(
self.model.layers[i:end_idx],
x,
use_reentrant=False
)
else:
# 直接前向
for layer in self.model.layers[i:end_idx]:
x = layer(x)
return x内存与计算权衡:
设模型有 层,批大小为 ,序列长度为 ,隐藏维度为 :
- 标准前向: 激活内存
- Gradient Checkpointing: 激活内存,但需要额外的次前向计算
2. 有监督微调(SFT)
有监督微调(Supervised Fine-Tuning, SFT)使用标注数据让预训练模型学习遵循指令的能力。
2.1 指令数据集构建
2.1.1 人工标注数据
高质量的人工标注数据是SFT的基础:
| 数据类型 | 构建方式 | 质量 | 成本 |
|---|---|---|---|
| 专家撰写 | 专业人员编写 | 最高 | 极高 |
| 众包标注 | 多个标注者协作 | 高 | 高 |
| 自动生成+筛选 | LLM生成+规则过滤 | 中-高 | 低 |
class InstructionDataset:
"""指令数据集"""
def __init__(self, data_path):
self.data = self.load_data(data_path)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
return {
"instruction": item["instruction"],
"input": item.get("input", ""),
"output": item["output"],
"category": item.get("category", "general")
}
def format_prompt(self, item):
"""
格式化对话模板
Llama-2-chat格式:
"""
return f"""[INST] <<SYS>>
You are a helpful assistant.
<</SYS>>
{item['instruction']} {item.get('input', '')} [/INST]
{item['output']}"""2.1.2 Self-Instruct方法
Self-Instruct1利用模型自身生成指令数据:
class SelfInstruct:
"""
Self-Instruct: Generating Instruction Data from Language Models
核心思想:让模型自己生成指令数据
"""
def __init__(self, teacher_model):
self.model = teacher_model
self.instruction_template = """Generate a diverse collection of tasks
for instruction tuning. Include various types like:
- Question answering
- Text summarization
- Code generation
- Creative writing
- Reasoning tasks
Generate 10 new instructions:"""
def generate_instructions(self, num_instructions=10):
"""生成新指令"""
prompt = f"{self.instruction_template}\n\n"
response = self.model.generate(prompt, max_length=500)
instructions = self.parse_instructions(response)
return instructions
def generate_response(self, instruction, input_text=""):
"""为指令生成回复"""
prompt = f"Instruction: {instruction}\n"
if input_text:
prompt += f"Input: {input_text}\n"
prompt += "Response:"
return self.model.generate(prompt, max_length=1000)
def filter_quality(self, instructions, responses):
"""
质量过滤
过滤标准:
1. 指令不为空
2. 回复与指令相关
3. 回复长度适中
4. 无明显错误
"""
filtered = []
for inst, resp in zip(instructions, responses):
if not inst or not resp:
continue
if len(resp) < 50 or len(resp) > 2000:
continue
if not self.is_relevant(inst, resp):
continue
filtered.append((inst, resp))
return filtered2.2 训练策略
2.2.1 学习率调度
SFT常用的学习率调度:
def get_sft_scheduler(optimizer, num_training_steps, warmup_ratio=0.1):
"""
SFT学习率调度
采用Warmup + Cosine Decay
"""
warmup_steps = int(num_training_steps * warmup_ratio)
def lr_lambda(current_step):
if current_step < warmup_steps:
# Linear warmup
return float(current_step) / float(max(1, warmup_steps))
else:
# Cosine decay
progress = float(current_step - warmup_steps) / float(
max(1, num_training_steps - warmup_steps)
)
return max(0.1, 0.5 * (1.0 + np.cos(np.pi * progress)))
return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
# SFT推荐配置
sft_config = {
"learning_rate": 2e-5, # 预训练的1/10~1/100
"warmup_ratio": 0.03, # 较小warmup
"num_epochs": 3-5,
"batch_size": 4-16, # per device
"weight_decay": 0.01,
"max_grad_norm": 1.0,
"lr_scheduler_type": "cosine"
}2.2.2 过拟合处理
SFT中常见的过拟合问题和解决方案:
| 问题 | 表现 | 解决方案 |
|---|---|---|
| 记忆化 | 逐字重复训练数据 | 增加数据多样性、使用dropout |
| 模式崩溃 | 回复过于简短/模板化 | 增加回复长度惩罚、混合训练 |
| 能力退化 | 预训练能力下降 | 保留预训练数据联合训练 |
class SFTLoss:
"""SFT损失函数"""
def __init__(self, model, config):
self.model = model
self.config = config
def compute_loss(self, batch):
"""
计算SFT损失
只在assistant回复部分计算loss
(用户输入部分mask)
"""
outputs = self.model(**batch)
# 移位后的损失计算
# logits: (B, L, V), labels: (B, L)
# shift_logits和labels对齐
shift_logits = outputs.logits[..., :-1, :].contiguous()
shift_labels = batch["labels"][..., 1:].contiguous()
# 计算交叉熵损失
loss = F.cross_entropy(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1),
ignore_index=-100 # mask掉用户输入部分
)
# 添加长度正则化(防止过短回复)
if self.config.get("length_penalty", 0) > 0:
response_lengths = (batch["labels"] != -100).sum(dim=-1)
length_penalty = self.config.length_penalty * torch.mean(
1.0 / torch.log(response_lengths.float() + 1)
)
loss = loss + length_penalty
return loss2.3 多任务微调vs选择性微调
2.3.1 多任务微调(MTF)
多任务微调在混合了多种任务的指令数据上训练:
class MultiTaskFineTuner:
"""多任务微调"""
def __init__(self, tasks_data, model):
self.model = model
self.tasks = tasks_data
def create_multitask_batch(self, batch_size=32):
"""
从不同任务采样构建batch
采样策略:
1. 均匀采样
2. 任务比例采样
3. 难度感知的课程采样
"""
tasks = list(self.tasks.keys())
# 均匀采样
task = random.choice(tasks)
batch = random.sample(self.tasks[task], batch_size)
return self.collate(batch)
def train(self, num_steps):
"""多任务训练"""
for step in range(num_steps):
batch = self.create_multitask_batch()
loss = self.compute_loss(batch)
loss.backward()
self.optimizer.step()
self.scheduler.step()2.3.2 选择性微调(Selective Fine-Tuning)
选择性微调只微调部分参数,保持模型整体能力:
class SelectiveFineTuner:
"""
选择性微调
只微调模型的特定层或模块
"""
def __init__(self, model, train_layers=["layer.23", "layer.24", "lm_head"]):
self.model = model
# 冻结大部分参数
for name, param in model.named_parameters():
if not any(layer in name for layer in train_layers):
param.requires_grad = False
# 统计可训练参数
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model.parameters())
print(f"可训练参数: {trainable_params:,} / {total_params:,} ({100*trainable_params/total_params:.2f}%)")
def apply_lora(self, rank=8, alpha=16, dropout=0.05):
"""
结合LoRA的选择性微调
详见第5节
"""
from peft import get_peft_model, LoraConfig
lora_config = LoraConfig(
r=rank,
lora_alpha=alpha,
target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
lora_dropout=dropout,
task_type="CAUSAL_LM"
)
self.model = get_peft_model(self.model, lora_config)3. 人类反馈强化学习(RLHF)
RLHF(Reinforcement Learning from Human Feedback)通过人类偏好信号对齐模型行为,是现代LLM对齐的核心技术。
3.1 Reward Model训练
3.1.1 Bradley-Terry模型
Reward Model基于Bradley-Terry模型2建模人类偏好:
偏好概率模型:
其中 是回复 相对于输入 的奖励值, 是sigmoid函数。
损失函数:
class RewardModel(nn.Module):
"""
Reward Model (Bradley-Terry)
结构与语言模型相同,但输出单个标量reward
"""
def __init__(self, base_model):
super().__init__()
self.base_model = base_model
self.reward_head = nn.Linear(base_model.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
"""
计算回复的奖励值
返回:
rewards: (batch_size, seq_len)
last_reward: (batch_size,) - 最后一个token的奖励(代表整个回复)
"""
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
hidden_states = outputs.last_hidden_state # (B, L, H)
# 计算每个token的reward
rewards = self.reward_head(hidden_states).squeeze(-1) # (B, L)
# 使用最后一个非pad token的reward作为整条回复的reward
sequence_lengths = attention_mask.sum(dim=1) - 1
last_rewards = rewards.gather(1, sequence_lengths.unsqueeze(1)).squeeze(-1)
return rewards, last_rewards
def compute_reward_loss(reward_model, chosen_ids, chosen_mask,
rejected_ids, rejected_mask):
"""
计算Bradley-Terry偏好损失
"""
# 编码chosen和rejected回复
# 注意:通常chosen和rejected拼接在同一个prompt后面
# 分离计算chosen和rejected的reward
chosen_rewards = reward_model(chosen_ids, chosen_mask)[1]
rejected_rewards = reward_model(rejected_ids, rejected_mask)[1]
# Bradley-Terry损失
loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()
# 奖励边际正则化(可选)
margin = F.relu(1.0 - (chosen_rewards - rejected_rewards)).mean()
return loss + 0.1 * margin3.1.2 Reward Model训练实践
class RewardModelTrainer:
"""Reward Model训练器"""
def __init__(self, model, config):
self.model = model
self.config = config
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay
)
def train_step(self, batch):
"""
一个训练步骤
batch包含:
- prompt: 问题
- chosen: 偏好的回复
- rejected: 不偏好的回复
"""
# 构建输入
chosen_inputs = self.concat_prompt_response(
batch["prompt"], batch["chosen"]
)
rejected_inputs = self.concat_prompt_response(
batch["prompt"], batch["rejected"]
)
# 计算损失
loss = compute_reward_loss(
self.model,
chosen_inputs["ids"],
chosen_inputs["mask"],
rejected_inputs["ids"],
rejected_inputs["mask"]
)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
return {"loss": loss.item()}3.2 PPO算法详解
Proximal Policy Optimization(PPO)3是RLHF的核心算法,用于优化语言模型策略。
3.2.1 优势估计(GAE)
Generalized Advantage Estimation (GAE) 提供了一种偏差-方差权衡的优势估计:
其中 是TD残差。
def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
"""
计算GAE优势估计
参数:
rewards: (T,) 奖励序列
values: (T+1,) 价值估计(包括最后一个状态)
dones: (T,) 是否终止
返回:
advantages: (T,) 优势估计
returns: (T,) 回报(用于价值函数训练)
"""
T = len(rewards)
advantages = torch.zeros(T)
gae = 0
for t in reversed(range(T)):
# TD残差
delta = rewards[t] + gamma * values[t+1] * (1 - dones[t]) - values[t]
# GAE累加
gae = delta + gamma * lam * (1 - dones[t]) * gae
advantages[t] = gae
# 回报 = 优势 + 价值基线
returns = advantages + values[:-1]
# 标准化优势(稳定训练)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
return advantages, returns3.2.2 PPO裁剪机制
PPO的核心创新是裁剪机制,防止策略更新过大:
PPO-Clip目标函数:
其中概率比率 。
class PPOLanguageModel:
"""
PPO for Language Models (PPO-PTX)
论文: Learning to summarize with human feedback (Stiennon et al., 2020)
"""
def __init__(self, actor_model, ref_model, reward_model, value_model, config):
self.actor = actor_model # 待优化的策略
self.ref_model = ref_model # 参考模型(SFT模型)
self.reward_model = reward_model # 奖励模型
self.value_model = value_model # 价值模型
self.config = config
def compute_log_probs(self, model, input_ids, attention_mask, action_ids):
"""
计算action_ids对应的log概率
使用log_prob而不是prob防止数值下溢
"""
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
logits = outputs.logits # (B, L, V)
# 计算action的log概率
# logits[:, :-1] 对应 action_ids[:, 1:]
log_probs = F.log_softmax(logits, dim=-1)
# gather得到每个action的log_prob
action_log_probs = log_probs.gather(
dim=-1,
index=action_ids.unsqueeze(-1)
).squeeze(-1)
return action_log_probs
def ppo_loss(self, old_log_probs, new_log_probs, advantages, epsilon=0.2):
"""
计算PPO裁剪损失
$L^{CLIP} = \min(r \cdot A, \text{clip}(r, 1-\epsilon, 1+\epsilon) \cdot A)$
其中 $r = \exp(\log \pi_{new} - \log \pi_{old})$
"""
# 概率比的对数
log_ratio = new_log_probs - old_log_probs
ratio = torch.exp(log_ratio)
# 未裁剪的代理损失
surr1 = ratio * advantages
# 裁剪后的代理损失
surr2 = torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * advantages
# 取较小的值(外层min)
ppo_loss = -torch.min(surr1, surr2).mean()
# 熵奖励(鼓励探索)
entropy_bonus = self.entropy_coef * self.compute_entropy(logits)
return ppo_loss - entropy_bonus
def step(self, prompts, responses):
"""
PPO训练步骤
1. 计算参考模型和当前模型的log_probs
2. 使用reward_model计算奖励
3. 使用GAE计算优势
4. 更新策略
"""
# 参考模型的log_probs
with torch.no_grad():
ref_log_probs = self.compute_log_probs(
self.ref_model,
prompts, responses
)
# 当前模型的log_probs
new_log_probs = self.compute_log_probs(
self.actor,
prompts, responses
)
# 奖励计算
rewards = self.reward_model(prompts, responses)
# GAE优势估计
advantages, returns = compute_gae(
rewards,
self.value_model(prompts, responses),
dones=torch.zeros_like(rewards),
gamma=self.config.gamma,
lam=self.config.lam
)
# PPO损失
ppo_loss = self.ppo_loss(
ref_log_probs,
new_log_probs,
advantages
)
# 价值函数损失
values = self.value_model(prompts, responses)
value_loss = F.mse_loss(values, returns)
# 预训练损失(PPO-PTX)
if self.config.pretrain_loss_coef > 0:
pt_loss = self.compute_pretrain_loss(prompts, responses)
total_loss = ppo_loss + 0.5 * value_loss + self.config.pretrain_loss_coef * pt_loss
else:
total_loss = ppo_loss + 0.5 * value_loss
return {"loss": total_loss, "ppo_loss": ppo_loss, "value_loss": value_loss}3.3 DPO(Direct Preference Optimization)
DPO4是一种无需显式训练 Reward Model 和执行 PPO 的对齐方法。
3.3.1 DPO原理
DPO将RLHF的优化目标重新参数化:
RLHF原始目标(带KL约束的reward最大化):
DPO闭式解:
DPO损失函数:
简写为:
class DPO:
"""
Direct Preference Optimization (DPO)
论文: DPO: Direct Preference Optimization for Language Models
"""
def __init__(self, policy_model, ref_model, beta=0.1):
self.policy = policy_model
self.ref_model = ref_model
self.beta = beta # KL惩罚系数
def dpo_loss(self, prompt_chosen, prompt_rejected,
chosen_logps, rejected_logps,
ref_chosen_logps, ref_rejected_logps):
"""
计算DPO损失
参数:
prompt_chosen: 偏好回复的输入
prompt_rejected: 不偏好回复的输入
chosen_logps: 策略模型对偏好回复的log概率
rejected_logps: 策略模型对不偏好回复的log概率
ref_chosen_logps: 参考模型对偏好回复的log概率
ref_rejected_logps: 参考模型对不偏好回复的log概率
"""
# 策略相对于参考模型的log概率差
policy_chosen_logps = chosen_logps
policy_rejected_logps = rejected_logps
# 计算相对对数概率(相当于隐式的reward差)
logits = self.beta * (
(policy_chosen_logps - ref_chosen_logps) -
(policy_rejected_logps - ref_rejected_logps)
)
# DPO损失:类似Bradley-Terry,但直接在策略空间操作
loss = -F.logsigmoid(logits).mean()
# 可选:添加KL正则化
kl_chosen = (policy_chosen_logps - ref_chosen_logps).mean()
kl_rejected = (policy_rejected_logps - ref_rejected_logps).mean()
kl_loss = (kl_chosen + kl_rejected) / 2
return loss, {"dpo_loss": loss, "kl_loss": kl_loss}
def train_step(self, batch):
"""DPO训练步骤"""
# 获取序列(已包含prompt)
chosen_sequences = batch["chosen"]
rejected_sequences = batch["rejected"]
# 计算策略模型的log概率
chosen_logps = self.compute_sequence_logps(
self.policy, chosen_sequences
)
rejected_logps = self.compute_sequence_logps(
self.policy, rejected_sequences
)
# 计算参考模型的log概率
with torch.no_grad():
ref_chosen_logps = self.compute_sequence_logps(
self.ref_model, chosen_sequences
)
ref_rejected_logps = self.compute_sequence_logps(
self.ref_model, rejected_sequences
)
# 计算DPO损失
loss, loss_info = self.dpo_loss(
batch["prompt"], batch["prompt"],
chosen_logps, rejected_logps,
ref_chosen_logps, ref_rejected_logps
)
return loss, loss_info3.3.2 DPO vs PPO对比
| 特性 | PPO + RM | DPO |
|---|---|---|
| 需要训练的模型 | Policy + Reward + Value | 仅Policy |
| 需要额外数据 | 偏好数据(训练RM) | 偏好数据(直接使用) |
| 计算成本 | 高(需要PPO采样) | 中等 |
| 训练稳定性 | 需要KL约束调参 | 相对稳定 |
| 理论保证 | 有RL理论支持 | 经验性方法 |
| 大规模训练 | 难 | 更易扩展 |
3.4 KTO(Kahneman-Tversky Optimization)
KTO5基于前景理论(Prospect Theory),将人类偏好建模为损失厌恶和确定性效应。
3.4.1 Kahneman-Tversky价值函数
KTO使用分段价值函数建模人类偏好:
其中 表示损失厌恶系数, 表示敏感性递减。
3.4.2 KTO损失函数
class KTO:
"""
KTO: Kahneman-Tversky Optimization
论文: KTO: Model Alignment as Prospect Theoretic Optimization
"""
def __init__(self, policy_model, ref_model, beta=0.1, lambda_loss=1.0):
self.policy = policy_model
self.ref_model = ref_model
self.beta = beta
self.lambda_loss = lambda_loss # 损失厌恶系数
def kto_loss(self, chosen_logps, rejected_logps,
ref_chosen_logps, ref_rejected_logps):
"""
计算KTO损失
KTO将每个样本建模为"中性"或"偏离",而不需要成对偏好
这使其对标注噪声更鲁棒
"""
# 计算隐式reward(相对于参考模型的偏好强度)
chosen_rewards = self.beta * (chosen_logps - ref_chosen_logps)
rejected_rewards = self.beta * (rejected_logps - ref_rejected_logps)
# 优势估计(偏离程度)
# 正值表示样本符合人类偏好
advantages = chosen_rewards - rejected_rewards
# Kahneman-Tversky价值函数(简化为指数形式)
# 对于正优势(符合偏好):v(a) = a
# 对于负优势(偏离偏好):v(a) = -lambda * |a|
# 使用sigmoid加权组合模拟价值函数
# 这允许KTO处理非成对数据
# 计算KTO特有的损失
# 最大化正样本的相对偏好,最小化负样本
# 简化的KTO损失
# 基于参考模型定义"期望"响应
# P(chosen is preferred) = sigmoid(beta * (log pi - log pi_ref))
# P(rejected is preferred) = 1 - P(chosen is preferred)
chosen_prob = torch.sigmoid(advantages)
rejected_prob = 1 - chosen_prob
# Kahneman-Tversky权重函数
# w+(p) = p^0.5, w-(p) = lambda * p^0.5
# 但实践中使用更简单的形式
# KT-inspired 损失
# 正样本损失(sigmoid cross-entropy style)
loss_chosen = -F.logsigmoid(advantages)
# 负样本损失(加权)
loss_rejected = -self.lambda_loss * F.logsigmoid(-advantages)
# 总损失
loss = (loss_chosen + loss_rejected).mean()
return loss, {
"kto_loss": loss,
"avg_advantage": advantages.mean().item(),
"avg_prob_preferred": chosen_prob.mean().item()
}3.5 RLHF实践问题
3.5.1 模式崩溃(Mode Collapse)
问题表现:
模型开始产生重复、简短或缺乏多样性的回复。
原因分析:
- KL散度惩罚过强:策略偏离参考模型过多时,奖励被过度利用
- 奖励信号稀疏:模型找到”作弊”方式满足奖励
- 过拟合到特定模式:PPO过度优化
解决方案:
# 解决方案1: 调整KL系数
config = {
"beta": 0.01, # 从0.1降低,观察效果
"kl_penalty": "full", # 或 "short"
}
# 解决方案2: 混合预训练损失
# PPO-PTX: 添加预训练语言模型损失
loss = ppo_loss + 0.1 * pretrain_loss
# 解决方案3: 对抗性训练
class对抗性PPO:
"""
对抗性PPO增加判别器鼓励多样性
"""
def __init__(self):
self.discriminator = Discriminator()
def diversity_loss(self, responses):
"""
基于判别器的多样性损失
"""
logits = self.discriminator(responses)
# 鼓励模型产生"真实"的回复
# 同时判别器学习区分不同风格
return -F.cross_entropy(logits, torch.zeros_like(logits))3.5.2 奖励黑客(Reward Hacking)
问题表现:
模型学会”欺骗”奖励模型,而不是真正学习目标行为。
典型例子:
- 对话模型学会使用特定词汇但语义不正确
- 代码生成模型产生可运行但不正确的代码
- 摘要模型过度压缩或包含虚假信息
检测与缓解:
class RewardHackingDetector:
"""奖励黑客检测"""
def __init__(self, reward_model, config):
self.reward_model = reward_model
self.baseline_responses = []
self.baseline_rewards = []
def update_baseline(self, new_responses, new_rewards):
"""更新baseline用于检测"""
self.baseline_responses.extend(new_responses)
self.baseline_rewards.extend(new_rewards)
def detect_hacking(self, current_responses):
"""
检测奖励黑客
指标:
1. 奖励增加但质量指标下降
2. 响应长度异常变化
3. 词汇分布变化
"""
# 1. 计算与baseline的分布距离
current_rewards = self.reward_model(current_responses)
reward_increase = torch.mean(current_rewards) - torch.mean(
torch.tensor(self.baseline_rewards)
)
# 2. 质量指标
diversity_score = self.compute_diversity(current_responses)
length_change = self.compute_length_change(current_responses)
# 3. 词汇分布(使用JS散度)
vocab_dist = self.compute_vocab_distribution(current_responses)
baseline_dist = self.compute_vocab_distribution(self.baseline_responses)
js_divergence = 0.5 * (kl_div(vocab_dist, baseline_dist) +
kl_div(baseline_dist, vocab_dist))
# 检测结果
hacking_detected = (
reward_increase > 0.5 and
diversity_score < 0.3 and
js_divergence > 0.2
)
return {
"hacking_detected": hacking_detected,
"reward_increase": reward_increase,
"diversity_score": diversity_score,
"js_divergence": js_divergence
}
def mitigation_strategies(self):
"""
缓解策略
1. 组合奖励:Reward + Quality + Safety
2. 课程RL:逐步增加KL约束
3. 对抗性RM:训练判别器检测黑客
"""
pass4. 训练稳定性与优化
4.1 权重初始化策略
4.1.1 Transformer初始化
现代Transformer的初始化策略:
def initialize_transformer(model, config):
"""
Transformer权重初始化
关键原则:
1. 残差分支使用缩放初始化
2. embedding使用较小初始化
3. 输出层使用适当缩放
"""
for name, param in model.named_parameters():
if "embedding" in name:
# Embedding: 使用较小初始化
nn.init.normal_(param, mean=0, std=0.02)
elif "q_proj" in name or "k_proj" in name or "v_proj" in name:
# QKV投影: 标准初始化
nn.init.normal_(param, mean=0, std=0.02)
elif "o_proj" in name:
# 输出投影: 缩放以保持残差路径稳定
nn.init.normal_(param, mean=0, std=0.02 / np.sqrt(2 * config.num_layers))
elif "gate_proj" in name or "up_proj" in name:
# SwiGLU门控: 较小初始化
nn.init.normal_(param, mean=0, std=0.02)
elif "down_proj" in name:
# SwiGLU下投影
nn.init.normal_(param, mean=0, std=0.02 / np.sqrt(2 * config.num_layers))
elif "lm_head" in name:
# LM Head: 输出层适当缩放
nn.init.normal_(param, mean=0, std=0.02 / np.sqrt(config.hidden_size))
elif "LayerNorm" in name:
# LayerNorm: 保持默认(weight=1, bias=0)
if param.dim() > 1:
nn.init.normal_(param, mean=1, std=0.02)
elif "bias" in name:
# 偏置: 零初始化
nn.init.zeros_(param)4.1.2 残差缩放
残差累积可能导致训练不稳定,需要适当缩放:
class ScaledResidualBlock(nn.Module):
"""
带缩放的残差块
每个残差分支乘以 $1/\sqrt{2}$ 防止残差累积过大
"""
def __init__(self, layer, num_layers):
super().__init__()
self.layer = layer
self.scale_factor = 1.0 / np.sqrt(2 * num_layers)
def forward(self, x, **kwargs):
return (self.layer(x, **kwargs) + x) * self.scale_factor4.2 Learning Rate Schedule
4.2.1 Warmup与Decay策略
标准Transformer训练的学习率调度:
class TransformerScheduler:
"""
Transformer学习率调度
公式: lr = d_model^{-0.5} * min(step^{-0.5}, step * warmup_steps^{-1.5})
"""
def __init__(self, optimizer, d_model, warmup_steps):
self.optimizer = optimizer
self.d_model = d_model
self.warmup_steps = warmup_steps
self.current_step = 0
self.base_lrs = [group["lr"] for group in optimizer.param_groups]
def step(self):
self.current_step += 1
lr = self.get_lr()
for i, group in enumerate(self.optimizer.param_groups):
group["lr"] = lr * self.base_lrs[i] / self.base_lrs[0] if i > 0 else lr
def get_lr(self):
"""计算当前学习率"""
step = max(1, self.current_step)
# 基础缩放因子
scale = self.d_model ** (-0.5)
if step <= self.warmup_steps:
# Warmup阶段: 线性增加
return scale * step / self.warmup_steps
else:
# 衰减阶段: 逆平方根衰减
return scale * step ** (-0.5)
# 常用调度配置
scheduler_configs = {
"llama": {
"lr": 1e-3,
"warmup_ratio": 0.05,
"scheduler": "cosine",
"min_lr_ratio": 0.1
},
"bert": {
"lr": 1e-4,
"warmup_ratio": 0.1,
"scheduler": "linear",
"min_lr_ratio": 0.0
},
"gpt": {
"lr": 6e-4,
"warmup_ratio": 0.01,
"scheduler": "cosine",
"min_lr_ratio": 0.1
}
}4.2.2 Cosine Decay with Warm Restarts
class CosineAnnealingWarmRestarts(Scheduler):
"""带Warm Restarts的余弦退火"""
def __init__(self, optimizer, T_0, T_mult=1, eta_min=0):
self.T_0 = T_0
self.T_i = T_0
self.T_mult = T_mult
self.eta_min = eta_min
self.T_cur = 0
super().__init__(optimizer)
def get_lr(self):
return [
self.eta_min + (base_lr - self.eta_min) *
(1 + np.cos(np.pi * self.T_cur / self.T_i)) / 2
for base_lr in self.base_lrs
]
def step(self):
self.T_cur += 1
if self.T_cur >= self.T_i:
self.T_cur = 0
self.T_i *= self.T_mult
super().step()4.3 Gradient Clipping与正则化
4.3.1 梯度裁剪
def clip_gradients(model, max_norm=1.0, norm_type=2.0):
"""
梯度裁剪
公式: grad = grad * min(1, max_norm / ||grad||_norm_type)
参数:
max_norm: 最大梯度范数
norm_type: 范数类型(1=L1, 2=L2)
"""
torch.nn.utils.clip_grad_norm_(
model.parameters(),
max_norm=max_norm,
norm_type=norm_type
)
# 不同训练阶段的裁剪策略
clip_strategies = {
"pretraining": {
"max_norm": 1.0,
"condition": "always" # 或 "when_loss_spike"
},
"sft": {
"max_norm": 1.0,
"condition": "always"
},
"rlhf_ppo": {
"max_norm": 0.5, # 更保守
"condition": "always"
}
}4.3.2 正则化技术
| 技术 | 作用 | LLM中的典型值 |
|---|---|---|
| Weight Decay | 防止权重过大 | 0.01-0.1 |
| Dropout | 防止过拟合 | 0.0-0.1 |
| R-Drop | 一致性正则化 | |
| Label Smoothing | 软化标签 | 0.1 |
class RDropLoss(nn.Module):
"""
R-Drop: Regularized Dropout for Neural Networks
对同一输入做两次前向传播,鼓励两次输出的一致性
"""
def __init__(self, kl_loss_weight=0.5):
super().__init__()
self.kl_loss_weight = kl_loss_weight
def forward(self, logits1, logits2, labels):
"""
logits1, logits2: 两次前向的logits
"""
# 标准CE损失
ce_loss = (F.cross_entropy(logits1, labels) +
F.cross_entropy(logits2, labels)) / 2
# KL散度正则化
p = F.log_softmax(logits1, dim=-1)
q = F.log_softmax(logits2, dim=-1)
kl_loss = (F.kl_div(p, q, reduction='batchmean') +
F.kl_div(q, p, reduction='batchmean')) / 2
return ce_loss + self.kl_loss_weight * kl_loss4.4 训练监控与异常检测
4.4.1 关键指标监控
class TrainingMonitor:
"""训练监控"""
def __init__(self, config):
self.metrics = defaultdict(list)
self.alert_thresholds = {
"loss_spike": 2.0, # 比EMA高的倍数
"grad_norm": 100.0,
"nan_ratio": 0.0,
"learning_rate_abnormal": 1e-6
}
self.ema_window = 100
def compute_ema(self, values, alpha=0.95):
"""计算指数移动平均"""
if not values:
return 0
ema = values[0]
for v in values[1:]:
ema = alpha * ema + (1 - alpha) * v
return ema
def check_anomalies(self, metrics):
"""检测训练异常"""
alerts = []
# Loss spike检测
recent_losses = self.metrics["loss"][-self.ema_window:]
ema_loss = self.compute_ema(recent_losses)
current_loss = metrics.get("loss", ema_loss)
if current_loss > ema_loss * self.alert_thresholds["loss_spike"]:
alerts.append({
"type": "loss_spike",
"current": current_loss,
"ema": ema_loss,
"severity": "high"
})
# 梯度爆炸检测
grad_norm = metrics.get("grad_norm", 0)
if grad_norm > self.alert_thresholds["grad_norm"]:
alerts.append({
"type": "grad_explosion",
"grad_norm": grad_norm,
"severity": "critical"
})
# NaN/Inf检测
if metrics.get("has_nan", False) or metrics.get("has_inf", False):
alerts.append({
"type": "numerical_error",
"severity": "critical"
})
return alerts
def should_stop(self, metrics):
"""判断是否应该停止训练"""
alerts = self.check_anomalies(metrics)
for alert in alerts:
if alert["severity"] == "critical":
# 连续critical告警超过阈值
self.critical_count += 1
if self.critical_count > 3:
return True, "Too many critical anomalies"
return False, None4.4.2 Loss Spike恢复策略
class LossSpikeRecovery:
"""Loss Spike恢复机制"""
def __init__(self, model, optimizer):
self.model = model
self.optimizer = optimizer
self.best_loss = float('inf')
self.checkpoint_path = "best_model.pt"
self.spike_recovery_count = 0
def check_and_recover(self, loss, grad_norm):
"""检测并恢复"""
is_spike = (
loss > self.best_loss * 2.0 and
grad_norm > 50.0
)
if is_spike:
self.spike_recovery_count += 1
# 保存当前状态
self.save_checkpoint("spike_backup.pt")
# 恢复最佳状态
self.restore_checkpoint(self.checkpoint_path)
# 降低学习率
for param_group in self.optimizer.param_groups:
param_group['lr'] *= 0.5
return {
"recovered": True,
"spike_loss": loss,
"recovery_action": "lr_reduction",
"recovery_count": self.spike_recovery_count
}
# 更新最佳loss
if loss < self.best_loss:
self.best_loss = loss
self.save_checkpoint(self.checkpoint_path)
return {"recovered": False}5. 最新训练技术
5.1 LoRA/QLoRA高效微调
5.1.1 LoRA原理
LoRA(Low-Rank Adaptation)6通过低秩分解冻结预训练权重,仅训练低秩矩阵:
核心假设:预训练语言模型的权重更新具有低内在秩。
参数化:对于预训练权重 ,LoRA添加低秩分解:
其中 是秩。
class LoRALinear(nn.Module):
"""
LoRA线性层
前向: h = W_0x + BAx
"""
def __init__(self, in_features, out_features, rank=4, alpha=16, dropout=0.0):
super().__init__()
self.rank = rank
self.alpha = alpha
self.scaling = alpha / rank
# 冻结预训练权重
self.weight = nn.Parameter(
torch.randn(out_features, in_features),
requires_grad=False
)
# LoRA可训练参数
self.lora_A = nn.Parameter(torch.randn(rank, in_features))
self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
if dropout > 0:
self.lora_dropout = nn.Dropout(p=dropout)
else:
self.lora_dropout = nn.Identity()
self.reset_parameters()
def reset_parameters(self):
"""LoRA参数初始化"""
nn.init.kaiming_uniform_(self.lora_A, a=np.sqrt(5))
nn.init.zeros_(self.lora_B)
def forward(self, x):
"""
前向传播
h = Wx + s * BAx
"""
# 冻结部分
base_output = F.linear(x, self.weight, bias=None)
# LoRA部分
lora_output = F.linear(
self.lora_dropout(x) @ self.lora_A.T,
self.lora_B
)
return base_output + self.scaling * lora_output
class LoRAConfig:
"""LoRA配置"""
def __init__(self, rank=8, alpha=16, dropout=0.05,
target_modules=None, bias="none", task_type="CAUSAL_LM"):
self.r = rank
self.lora_alpha = alpha
self.lora_dropout = dropout
self.target_modules = target_modules or ["q_proj", "v_proj"]
self.bias = bias
self.task_type = task_type
def apply_lora_to_model(model, config):
"""将LoRA应用到模型"""
for name, module in model.named_modules():
if any(target in name for target in config.target_modules):
# 获取原始层参数
original_linear = module
in_features = original_linear.in_features
out_features = original_linear.out_features
# 替换为LoRA层
lora_linear = LoRALinear(
in_features, out_features,
rank=config.r,
alpha=config.lora_alpha,
dropout=config.lora_dropout
)
# 复制预训练权重
lora_linear.weight.data = original_linear.weight.data.clone()
if original_linear.bias is not None:
lora_linear.bias = original_linear.bias
original_linear.bias = None
# 替换
set_module(model, name, lora_linear)
return model5.1.2 QLoRA实现
QLoRA(Quantized LoRA)7结合4位量化与LoRA,大幅降低显存:
class QLoRALinear(nn.Module):
"""
QLoRA: 4位量化 + LoRA
核心思想:
1. 将预训练权重量化为NF4格式
2. 以脱_quant形式存储用于计算
3. LoRA在反量化空间训练
"""
def __init__(self, weight, bias=None, rank=8, alpha=16, quant_type="nf4"):
super().__init__()
# 量化权重
self.weight = self.quantize(weight, quant_type)
self.weight_float = None # 用于恢复
# 量化参数
self.quant_type = quant_type
self.scale = self.compute_scale(weight)
# LoRA参数
self.lora_A = nn.Parameter(torch.randn(rank, weight.shape[1]))
self.lora_B = nn.Parameter(torch.zeros(weight.shape[0], rank))
self.scaling = alpha / rank
if bias is not None:
self.bias = nn.Parameter(bias)
else:
self.bias = None
def quantize(self, weight, quant_type="nf4"):
"""4位量化"""
if quant_type == "nf4":
# Normal Float 4-bit
# 量化到-1到1范围内的16个值
return self.nf4_quantize(weight)
elif quant_type == "fp4":
# Float 4-bit
return self.fp4_quantize(weight)
else:
raise ValueError(f"Unknown quant type: {quant_type}")
def nf4_quantize(self, weight):
"""NF4量化"""
# NF4的量化中心点
nf4_map = torch.tensor([
-1.0, -0.6961928, -0.52507305, -0.3949172,
-0.28489186, -0.18415245, -0.0900261, 0.0,
0.0900261, 0.18415245, 0.28489186, 0.3949172,
0.52507305, 0.6961928, 0.905123, 1.0
])
# 计算每个权重的最近NF4值
weight_flat = weight.flatten()
distances = torch.cdist(
weight_flat.unsqueeze(0).cuda(),
nf4_map.cuda().unsqueeze(0)
)
indices = distances.argmin(dim=1)
return indices.view(weight.shape)
def dequantize(self):
"""反量化"""
if self.weight_float is not None:
return self.weight_float
# 从量化形式反量化
weight_flat = self.weight.flatten()
dequantized = torch.zeros_like(weight_flat)
nf4_map = torch.tensor([...]) # 同上
for i, idx in enumerate(weight_flat):
dequantized[i] = nf4_map[idx] * self.scale
self.weight_float = dequantized.view_as(self.weight)
return self.weight_float
def forward(self, x):
# 反量化权重(只执行一次,缓存)
if self.weight_float is None:
self.weight_float = self.dequantize()
# 基座输出
base_output = F.linear(x, self.weight_float, self.bias)
# LoRA输出
lora_output = F.linear(x @ self.lora_A.T, self.lora_B)
return base_output + self.scaling * lora_output5.2 课程学习策略
课程学习(Curriculum Learning)按难度递增组织训练样本:
class CurriculumScheduler:
"""
课程学习调度器
难度评估方式:
1. 样本长度
2. 困惑度
3. 任务复杂度
4. 教师模型的一致性
"""
def __init__(self, dataset, difficulty_fn, strategy="linear"):
self.dataset = dataset
self.difficulty_fn = difficulty_fn
self.strategy = strategy
self.num_samples = len(dataset)
# 计算所有样本的难度
self.difficulties = self.compute_difficulties()
self.sorted_indices = np.argsort(self.difficulties)
def compute_difficulties(self):
"""计算样本难度"""
difficulties = []
for i in range(len(self.dataset)):
sample = self.dataset[i]
difficulty = self.difficulty_fn(sample)
difficulties.append(difficulty)
return np.array(difficulties)
def get_batch(self, training_step, batch_size):
"""
获取课程批次
采样策略:
1. Linear: 逐步增加当前阶段样本比例
2. Exp: 指数增长
3. Stage: 分阶段切换
"""
if self.strategy == "linear":
# 线性增加:当前难度阈值
progress = training_step / self.total_steps
max_difficulty_idx = int(progress * self.num_samples)
available_indices = self.sorted_indices[:max(max_difficulty_idx, batch_size)]
elif self.strategy == "exp":
# 指数增长
progress = training_step / self.total_steps
num_easy = int(self.num_samples ** progress)
available_indices = self.sorted_indices[:max(num_easy, batch_size)]
elif self.strategy == "stage":
# 分阶段
stages = [0.2, 0.4, 0.6, 0.8, 1.0]
current_stage = min(int(progress * len(stages)), len(stages) - 1)
threshold = stages[current_stage] * self.num_samples
available_indices = self.sorted_indices[:int(threshold)]
# 从可用样本中采样
selected = np.random.choice(available_indices, size=batch_size, replace=False)
return [self.dataset[i] for i in selected]
def difficulty_fn_example(self, sample):
"""
示例难度评估函数
综合考虑多个因素
"""
# 1. 长度难度
length_score = np.log(sample["num_tokens"])
# 2. 任务复杂度(如果有)
task_score = self.task_complexity(sample.get("task_type", "simple"))
# 3. 教师模型不确定性
teacher_score = sample.get("teacher_uncertainty", 0)
return length_score + 0.5 * task_score + 0.3 * teacher_score5.3 数据课程设计
数据课程(Data Curriculum)关注训练数据的组织策略:
class DataCurriculum:
"""
数据课程设计
核心思想:
1. 什么数据先学
2. 不同阶段用什么数据配比
3. 如何平衡广度和深度
"""
def __init__(self, domains_data, config):
self.domains = domains_data
self.config = config
self.current_phase = 0
def get_phase_schedule(self):
"""
定义训练阶段的数据配比
典型阶段:
1. 通用能力:广泛但简单
2. 专业技能:深入特定领域
3. 对齐优化:偏好数据
"""
return [
{"phase": 0, "domains": {"web": 0.6, "books": 0.3, "code": 0.1}},
{"phase": 1, "domains": {"web": 0.4, "books": 0.3, "code": 0.2, "math": 0.1}},
{"phase": 2, "domains": {"web": 0.3, "books": 0.2, "code": 0.2, "math": 0.15, "sft": 0.15}},
{"phase": 3, "domains": {"sft": 0.5, "rlhf": 0.3, "web": 0.2}}
]
def sample_curriculum_batch(self, training_step, batch_size):
"""从当前课程采样批次"""
schedule = self.get_phase_schedule()
# 确定当前阶段
for i, phase in enumerate(schedule):
if training_step >= phase["step_threshold"]:
current_schedule = phase
# 按比例采样
batch = []
for domain, ratio in current_schedule["domains"].items():
domain_samples = int(batch_size * ratio)
domain_batch = self.sample_from_domain(domain, domain_samples)
batch.extend(domain_batch)
# 打乱顺序
random.shuffle(batch)
return batch[:batch_size]
def adaptive_resampling(self, loss_by_domain):
"""
自适应重采样
根据各领域损失动态调整采样权重
"""
adjusted_weights = {}
base_weights = self.get_phase_schedule()[self.current_phase]["domains"]
# 损失高的领域增加采样
for domain, base_weight in base_weights.items():
domain_loss = loss_by_domain.get(domain, 1.0)
# 逆损失加权
adjusted_weights[domain] = base_weight / (domain_loss + 0.1)
# 归一化
total = sum(adjusted_weights.values())
for domain in adjusted_weights:
adjusted_weights[domain] /= total
return adjusted_weights
# 完整训练流程整合
class LLMTrainingPipeline:
"""
完整LLM训练流程
"""
def __init__(self, config):
self.config = config
self.pretrain_stage = PretrainingStage(config.pretrain)
self.sft_stage = SFTStage(config.sft)
self.rlhf_stage = RLHFStage(config.rlhf)
self.curriculum = DataCurriculum(config.domains)
def train(self):
# 阶段1: 预训练
logger.info("Starting pretraining...")
self.pretrain_stage.train()
# 阶段2: SFT
logger.info("Starting SFT...")
self.sft_stage.train()
# 阶段3: RLHF (PPO/DPO/KTO)
logger.info("Starting RLHF alignment...")
if self.config.rlhf.method == "dpo":
self.train_dpo()
elif self.config.rlhf.method == "ppo":
self.train_ppo()
elif self.config.rlhf.method == "kto":
self.train_kto()
logger.info("Training complete!")
self.save_model()6. 参考资料
扩展阅读:
Footnotes
-
Wang Y, Kordi Y, Mishra S, et al. Self-Instruct: Aligning Language Models with Self-Generated Instructions. ACL, 2023. arXiv:2212.10560 ↩
-
Bradley R A, Terry M E. Rank Analysis of Incomplete Block Designs: I. The Method of Paired Comparisons. Biometrika, 1952. ↩
-
Schulman J, Wolski F, Dhariwal P, et al. Proximal Policy Optimization Algorithms. arXiv:1707.06347, 2017. ↩
-
Rafailov R, Sharma A, Mitchell E, et al. Direct Preference Optimization: Your Language Model is Secretly a Reward Model. NeurIPS, 2023. arXiv:2305.18290 ↩
-
Ethayarajh K, Kwon Y H, Gimpel K, et al. KTO: Model Alignment as Prospect Theoretic Optimization. arXiv:2402.01306, 2024. ↩
-
Hu E J, Shen Y, Wallis P, et al. LoRA: Low-Rank Adaptation of Large Language Models. ICLR, 2022. arXiv:2106.09685 ↩
-
Dettmers T, Pagnoni A, Holtzman A, et al. QLoRA: Efficient Finetuning of Quantized LLMs. NeurIPS, 2023. arXiv:2305.14314 ↩