概述
Rewarding Progress 是ICLR 2025的一篇重要论文,提出了一种规模化自动化过程验证器的训练方法。该工作的核心贡献在于解决了过程奖励模型(PRM)训练中最大的瓶颈——标注成本问题,通过设计”进步感”(Progress)这一新的奖励信号,实现了大规模、低成本的自动化PRM训练。1
核心贡献:将过程奖励建模重新定义为”进步感”预测任务,使得无需人工标注即可训练过程验证器。
背景:PRM训练的标注困境
传统PRM的训练范式
传统PRM训练依赖大量的人工标注数据:
class TraditionalPRMTraining:
"""
传统PRM训练范式
问题:需要昂贵的人工标注
"""
def __init__(self):
self.data_requirements = {
'prm800k': {
'annotations': '逐步骤标注',
'cost': '约50美元/问题',
'total_problems': 12000,
'total_cost': '约60万美元',
'time': '数月标注时间'
},
'math-shepherd': {
'annotations': '逐步骤标注+结果验证',
'cost': '约30美元/问题',
'total_problems': 30000,
'total_cost': '约90万美元'
}
}
def training_pipeline(self):
"""
传统训练流程:
1. 收集问题
2. 人工标注每个推理步骤
3. 训练PRM
4. 人工验证结果
瓶颈:步骤2的标注成本极高
"""
steps = [
'1. 问题收集 (相对便宜)',
'2. 步骤标注 (非常昂贵)',
'3. 模型训练 (适中)',
'4. 结果验证 (昂贵)'
]
return steps标注成本分析
| 数据集 | 问题数 | 步骤数/问题 | 总步骤数 | 标注成本 |
|---|---|---|---|---|
| PRM800K | 12,000 | ~8 | ~96,000 | ~$60,000 |
| Math-Shepherd | 30,000 | ~10 | ~300,000 | ~$90,000 |
| HINT | 5,000 | ~15 | ~75,000 | ~$45,000 |
| 所需规模 | 1,000,000+ | ~10 | ~10,000,000 | ~$5,000,000+ |
为什么需要自动化?
- 规模化需求:要训练真正强大的PRM,需要百万级的问题
- 成本瓶颈:人工标注无法满足规模化需求
- 一致性:大规模人工标注难以保证一致性
- 速度:人工标注速度远慢于模型生成
Rewarding Progress核心思想
核心洞察:“进步感”作为奖励信号
Rewarding Progress的核心洞察是:好的推理过程会在每个步骤中展现”进步”。
class ProgressRewardSignal:
"""
进步感作为奖励信号
核心假设:
- 正确的推理会在每一步中推进问题的解决
- "进步"可以通过比较相邻步骤来衡量
- 不需要人工标注"正确性",只需标注"进步"
"""
def __init__(self):
self.progress_indicators = {
'information_gain': '信息增量',
'partial_solution': '部分解的改进',
'constraint_narrowing': '约束范围缩小',
'goal_decomposition': '目标分解',
'verification': '验证步骤'
}
def compute_progress(self, problem, steps):
"""
计算每步的进步感
进步感 = f(当前状态, 前序状态, 问题目标)
正进步:更接近正确答案
负进步:远离正确答案或原地踏步
"""
progress_scores = []
for i, step in enumerate(steps):
if i == 0:
# 第一步:与初始状态比较
progress = self.compute_initial_progress(step, problem)
else:
# 后续步骤:与前一步比较
progress = self.compute_step_progress(
step,
steps[i-1],
problem
)
progress_scores.append(progress)
return progress_scores
def compute_step_progress(self, current_step, previous_step, problem):
"""
计算步骤间的进步感
策略:
1. 计算当前步骤带来的信息增量
2. 计算部分解的改进程度
3. 综合评估进步
"""
# 信息增量
info_gain = self.compute_information_gain(
current_step, previous_step
)
# 部分解质量
partial_quality = self.compute_partial_solution(
current_step, problem
)
# 综合进步
progress = 0.6 * info_gain + 0.4 * partial_quality
return progress“进步感”的定义
论文定义了多种”进步感”指标:
class ProgressIndicators:
"""
进步感指标集合
"""
@staticmethod
def information_gain(current_step, previous_steps):
"""
信息增量指标
衡量:
- 新引入的概念/变量
- 新建立的等式/关系
- 新证明的引理
"""
# 检测新信息
new_concepts = extract_new_concepts(current_step, previous_steps)
new_relations = extract_new_relations(current_step, previous_steps)
# 计算增量
gain = len(new_concepts) * 0.5 + len(new_relations) * 0.5
return min(1.0, gain)
@staticmethod
def constraint_narrowing(step, problem):
"""
约束范围缩小指标
衡量:
- 解空间缩小了多少
- 排除的可能性
- 确定的变量值
"""
constraints_before = count_constraints(problem)
constraints_after = count_constraints(step, problem)
narrowing = (constraints_before - constraints_after) / constraints_before
return max(0.0, narrowing)
@staticmethod
def partial_solution_quality(step, problem):
"""
部分解质量指标
衡量:
- 当前步骤距完整解还有多远
- 部分解的正确程度
"""
# 简化:使用启发式方法
# 实际实现中需要更复杂的评估
quality_indicators = {
'has_value_assignment': 0.3,
'has_equation': 0.2,
'has_intermediate_result': 0.3,
'is_final_step': 0.2
}
quality = sum(
v for k, v in quality_indicators.items()
if check_indicator(step, k)
)
return quality
@staticmethod
def verification_progress(step, problem):
"""
验证进度指标
衡量:
- 是否进行了自我验证
- 验证是否成功
"""
if '验证' in step or '检查' in step:
# 检测验证结果
if '正确' in step or '成立' in step:
return 0.8
elif '错误' in step or '不成立' in step:
return 0.6 # 发现问题也是有价值的
else:
return 0.4
return 0.0自动化过程验证框架
整体框架
class RewardingProgressFramework:
"""
Rewarding Progress整体框架
核心思想:
1. 利用LLM生成多样化的推理轨迹
2. 自动检测每步的"进步感"
3. 训练PRM预测进步感
4. 使用训练好的PRM指导推理
"""
def __init__(self, config):
self.config = config
# 1. 数据生成器
self.trajectory_generator = TrajectoryGenerator(config)
# 2. 进步检测器
self.progress_detector = ProgressDetector(config)
# 3. 进步预测器(PRM)
self.progress_predictor = ProgressPredictor(config)
# 4. 验证器优化器
self.verifier_optimizer = VerifierOptimizer(config)
def train_verifier(self, problems):
"""
训练过程验证器
"""
# 阶段1:生成多样轨迹
trajectories = []
for problem in problems:
trajs = self.trajectory_generator.generate(
problem,
n_samples=self.config.n_samples,
temperature=self.config.temperature
)
trajectories.extend(trajs)
# 阶段2:自动标注进步感
progress_annotations = []
for traj in trajectories:
progress = self.progress_detector.detect(
traj['problem'],
traj['steps']
)
progress_annotations.append(progress)
# 阶段3:训练进步预测器
self.progress_predictor.train(
trajectories,
progress_annotations
)
# 阶段4:迭代优化
for iteration in range(self.config.n_iterations):
# 使用当前PRM筛选高质量轨迹
high_quality_trajs = self.filter_high_quality(trajectories)
# 在高质量数据上微调
self.progress_predictor.finetune(high_quality_trajs)
return self.progress_predictor
def filter_high_quality(self, trajectories):
"""使用PRM筛选高质量轨迹"""
scored_trajs = []
for traj in trajectories:
# 使用PRM评分
scores = []
for step in traj['steps']:
score = self.progress_predictor.predict(
traj['problem'],
step,
traj['steps'][:traj['steps'].index(step)]
)
scores.append(score)
avg_score = np.mean(scores)
scored_trajs.append((traj, avg_score))
# 选择高分轨迹
scored_trajs.sort(key=lambda x: x[1], reverse=True)
top_k = int(len(scored_trajs) * self.config.top_k_ratio)
return [t[0] for t in scored_trajs[:top_k]]轨迹生成器
class TrajectoryGenerator:
"""
多样化轨迹生成器
关键设计:
1. 多样化采样策略
2. 覆盖不同推理路径
3. 生成成功和失败的轨迹
"""
def __init__(self, config):
self.config = config
self.model = load_model(config.model_name)
def generate(self, problem, n_samples=10, temperature=1.0):
"""
生成多样化轨迹
"""
trajectories = []
# 策略1:高温采样(多样性)
high_temp_trajs = self._sample_with_temperature(
problem,
n_samples // 2,
temperature=1.2
)
trajectories.extend(high_temp_trajs)
# 策略2:低温采样(高质量)
low_temp_trajs = self._sample_with_temperature(
problem,
n_samples // 2,
temperature=0.7
)
trajectories.extend(low_temp_trajs)
# 策略3:束搜索(多样性路径)
beam_trajs = self._beam_search(
problem,
n_beams=3,
depth=10
)
trajectories.extend(beam_trajs)
return trajectories
def _sample_with_temperature(self, problem, n, temperature):
"""温度采样"""
trajectories = []
for _ in range(n):
traj = {
'problem': problem,
'steps': [],
'final_answer': None,
'is_correct': None,
'metadata': {'temperature': temperature}
}
# 逐步生成
current = problem
for step_idx in range(self.config.max_steps):
# 生成一步推理
step_text = self._generate_step(current, temperature)
traj['steps'].append(step_text)
# 检查终止
if self._is_terminal(step_text):
break
current = step_text
# 验证答案
traj['final_answer'] = self._extract_answer(traj['steps'][-1])
traj['is_correct'] = (traj['final_answer'] == problem['answer'])
trajectories.append(traj)
return trajectories
def _beam_search(self, problem, n_beams, depth):
"""束搜索生成"""
# 简化的束搜索实现
beams = [(problem, [], 0.0)] # (当前状态, 轨迹, 分数)
for _ in range(depth):
candidates = []
for state, traj, score in beams:
# 生成多个候选
for _ in range(n_beams):
next_step = self._generate_step(state, temperature=0.8)
new_traj = traj + [next_step]
new_score = score + 0.1 # 简化评分
candidates.append((next_step, new_traj, new_score))
# 选择top-k
candidates.sort(key=lambda x: x[2], reverse=True)
beams = candidates[:n_beams]
return [{
'problem': problem,
'steps': traj,
'final_answer': self._extract_answer(traj[-1]) if traj else None,
'is_correct': None,
'metadata': {'method': 'beam_search'}
} for _, traj, _ in beams]进步检测器
class ProgressDetector:
"""
自动化进步检测器
核心算法:
1. 分析相邻步骤间的变化
2. 评估变化的"价值"
3. 量化进步程度
"""
def __init__(self, config):
self.config = config
self.analyzer = StepAnalyzer(config)
self.comparator = StepComparator(config)
def detect(self, problem, steps):
"""
检测每步的进步感
"""
progress_scores = []
for i, step in enumerate(steps):
if i == 0:
# 初始步骤
score = self._detect_initial_progress(step, problem)
else:
# 后续步骤
score = self._detect_step_progress(
step,
steps[i-1],
problem
)
# 质量调整
score = self._adjust_for_quality(step, score)
progress_scores.append(score)
return progress_scores
def _detect_initial_progress(self, step, problem):
"""检测初始步骤的进步"""
# 初始步骤应该:
# 1. 理解问题
# 2. 识别关键要素
# 3. 建立初步框架
progress = 0.0
# 检查问题理解
if self._understands_problem(step, problem):
progress += 0.3
# 检查关键要素识别
if self._identifies_key_elements(step, problem):
progress += 0.3
# 检查框架建立
if self._establishes_framework(step, problem):
progress += 0.4
return progress
def _detect_step_progress(self, current_step, previous_step, problem):
"""检测步骤间的进步"""
# 分析变化
changes = self._analyze_changes(current_step, previous_step)
# 评估变化价值
progress = 0.0
for change_type, change_value in changes.items():
weight = self._get_weight(change_type)
progress += weight * change_value
return min(1.0, max(0.0, progress))
def _analyze_changes(self, current, previous):
"""分析步骤间的变化"""
changes = {}
# 1. 信息变化
info_added = self._compute_information_added(current, previous)
changes['information'] = info_added
# 2. 结构变化
structure_change = self._compute_structure_change(current, previous)
changes['structure'] = structure_change
# 3. 语义变化
semantic_change = self._compute_semantic_change(current, previous)
changes['semantic'] = semantic_change
# 4. 目标接近度变化
goal_change = self._compute_goal_change(current, previous, problem)
changes['goal'] = goal_change
return changes
def _compute_information_added(self, current, previous):
"""计算信息增量"""
current_entities = self._extract_entities(current)
previous_entities = self._extract_entities(previous)
new_entities = current_entities - previous_entities
# 归一化
if len(current_entities) == 0:
return 0.0
return len(new_entities) / len(current_entities)
def _compute_structure_change(self, current, previous):
"""计算结构变化"""
current_structure = self._analyze_structure(current)
previous_structure = self._analyze_structure(previous)
# 计算结构差异
diff = self._structure_difference(current_structure, previous_structure)
return min(1.0, diff)
def _compute_semantic_change(self, current, previous):
"""计算语义变化"""
# 使用embedding相似度
current_emb = self._get_embedding(current)
previous_emb = self._get_embedding(previous)
# 1 - 相似度 = 变化程度
similarity = F.cosine_similarity(current_emb, previous_emb, dim=0)
change = 1 - similarity.item()
return change
def _compute_goal_change(self, current, previous, problem):
"""计算目标接近度变化"""
# 简化实现
# 实际需要更复杂的goal tracking
current_distance = self._estimate_distance_to_goal(current, problem)
previous_distance = self._estimate_distance_to_goal(previous, problem)
# 进步 = 距离减少
progress = previous_distance - current_distance
return max(0.0, min(1.0, progress))
def _adjust_for_quality(self, step, raw_score):
"""根据质量调整分数"""
# 负面调整因素
if self._is_redundant(step):
raw_score *= 0.7
if self._is_backtracking(step):
raw_score *= 0.5
if self._is_gibberish(step):
raw_score = 0.0
# 正面调整因素
if self._is_verification_step(step):
raw_score += 0.1
if self._is_creative_step(step):
raw_score += 0.05
return min(1.0, max(0.0, raw_score))进步预测器训练
class ProgressPredictor(nn.Module):
"""
进步预测器(PRM)
输入:
- 问题
- 当前步骤
- 前序步骤上下文
输出:
- 进步分数
- 进步类型
- 置信度
"""
def __init__(self, config):
super().__init__()
self.config = config
# 编码器
self.encoder = TransformerEncoder(config)
# 进步预测头
self.progress_predictor = nn.Sequential(
nn.Linear(config.hidden_dim, config.hidden_dim),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(config.hidden_dim, 1),
nn.Sigmoid()
)
# 进步类型分类器
self.type_classifier = nn.Linear(config.hidden_dim, config.num_progress_types)
# 置信度估计器
self.confidence_estimator = nn.Sequential(
nn.Linear(config.hidden_dim, config.hidden_dim // 2),
nn.ReLU(),
nn.Linear(config.hidden_dim // 2, 1),
nn.Sigmoid()
)
def forward(self, problem, current_step, context_steps):
"""
预测进步感
"""
# 编码
encoding = self.encode(problem, current_step, context_steps)
# 预测进步分数
progress_score = self.progress_predictor(encoding).squeeze()
# 预测进步类型
type_logits = self.type_classifier(encoding)
progress_type = torch.argmax(type_logits, dim=-1)
# 估计置信度
confidence = self.confidence_estimator(encoding).squeeze()
return {
'score': progress_score,
'type': progress_type,
'confidence': confidence,
'type_probs': F.softmax(type_logits, dim=-1)
}
def encode(self, problem, current_step, context_steps):
"""编码输入"""
# 编码问题
problem_enc = self.encoder(problem)
# 编码当前步骤
current_enc = self.encoder(current_step)
# 编码上下文
if context_steps:
context_enc = self.encoder('\n'.join(context_steps))
else:
context_enc = torch.zeros_like(problem_enc)
# 融合
combined = problem_enc + current_enc + 0.5 * context_enc
return combined
class VerifierOptimizer:
"""
验证器优化器
"""
def __init__(self, config):
self.config = config
self.optimizer = torch.optim.AdamW(
config.model.parameters(),
lr=config.lr,
weight_decay=config.weight_decay
)
def train_step(self, batch):
"""训练步骤"""
# 前向传播
predictions = []
for problem, step, context, label in batch:
pred = self.config.model(problem, step, context)
predictions.append(pred)
# 计算损失
loss = self.compute_loss(predictions, batch['labels'])
# 反向传播
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(
self.config.model.parameters(),
self.config.max_grad_norm
)
self.optimizer.step()
return {'loss': loss.item()}
def compute_loss(self, predictions, labels):
"""计算损失"""
# MSE损失用于分数预测
scores = torch.tensor([p['score'].item() for p in predictions])
target = torch.tensor(labels)
mse_loss = F.mse_loss(scores, target)
# 辅助损失:类型分类
types = torch.stack([p['type'] for p in predictions])
type_labels = torch.tensor([l['type'] for l in labels])
ce_loss = F.cross_entropy(types, type_labels)
# 总损失
total_loss = mse_loss + 0.2 * ce_loss
return total_loss规模化策略
数据规模化
class ScalingStrategy:
"""
规模化策略
"""
def __init__(self, config):
self.config = config
def scale_training_data(self):
"""
数据规模化路径
目标:从10K扩展到10M
"""
scaling_phases = {
'phase_1_seed': {
'size': 10000,
'method': '人工标注',
'purpose': '种子模型'
},
'phase_2_synthetic': {
'size': 100000,
'method': 'LLM生成+自动标注',
'purpose': '扩展数据'
},
'phase_3_filtered': {
'size': 1000000,
'method': '高质量筛选',
'purpose': '质量提升'
},
'phase_4_full': {
'size': 10000000,
'method': '全量生成',
'purpose': '规模化'
}
}
return scaling_phases
def generate_synthetic_data(self, problems):
"""
生成合成数据
"""
synthetic_data = []
for problem in problems:
# 1. 使用不同温度生成多样轨迹
trajectories = self._diverse_generation(problem)
# 2. 自动标注进步感
for traj in trajectories:
progress = self._auto_annotate(problem, traj)
traj['progress_labels'] = progress
if self._is_valid_annotation(progress):
synthetic_data.append(traj)
return synthetic_data
def _diverse_generation(self, problem):
"""多样化生成"""
# 多种策略组合
strategies = [
('high_temp', 1.2, 5),
('low_temp', 0.7, 5),
('beam_search', None, 3),
('diverse_prompts', None, 5)
]
all_trajs = []
for method, temp, n in strategies:
trajs = self._generate_with_strategy(
problem, method, temp, n
)
all_trajs.extend(trajs)
return all_trajs
def _auto_annotate(self, problem, traj):
"""自动标注"""
detector = ProgressDetector(self.config)
return detector.detect(problem, traj['steps'])
def _is_valid_annotation(self, progress):
"""验证标注有效性"""
# 过滤无效标注
if len(progress) == 0:
return False
if all(p == 0 for p in progress):
return False
if all(p == 1 for p in progress):
return False
return True计算规模化
class ComputeScaling:
"""
计算规模化
"""
def __init__(self):
self.compute_chart = {
'flops': {
'10K_data': '10^17',
'100K_data': '10^18',
'1M_data': '10^19',
'10M_data': '10^20'
},
'training_time': {
'10K_data': '1 GPU-day',
'100K_data': '10 GPU-days',
'1M_data': '100 GPU-days',
'10M_data': '1000 GPU-days'
},
'cost': {
'10K_data': '$100',
'100K_data': '$1,000',
'1M_data': '$10,000',
'10M_data': '$100,000'
}
}
def estimate_compute(self, data_size, model_size='7B'):
"""估算计算需求"""
# FLOPs估算
# 假设每个样本需要model_size * 3次前向传播
flops_per_sample = model_size * 3 # 前向 + 反向 + 更新
total_flops = flops_per_sample * data_size * 10 # 假设10 epochs
# 训练时间估算
gpu_tflops = 1000 # 假设使用A100
training_time_hours = total_flops / (gpu_tflops * 1e12)
return {
'total_flops': total_flops,
'training_hours': training_time_hours,
'cost_estimate': training_time_hours * 2 # $2/hour per GPU
}最佳实践指南
数据质量控制
class DataQualityControl:
"""
数据质量控制
"""
@staticmethod
def filter_trajectories(trajectories, min_steps=3, max_steps=50):
"""过滤轨迹"""
filtered = []
for traj in trajectories:
n_steps = len(traj['steps'])
# 长度过滤
if n_steps < min_steps or n_steps > max_steps:
continue
# 质量过滤
if not DataQualityControl._check_quality(traj):
continue
filtered.append(traj)
return filtered
@staticmethod
def _check_quality(traj):
"""检查轨迹质量"""
# 1. 检查是否有有效内容
has_content = any(
len(step.strip()) > 10
for step in traj['steps']
)
if not has_content:
return False
# 2. 检查是否有多样性
unique_steps = len(set(traj['steps']))
if unique_steps < len(traj['steps']) * 0.5:
return False
# 3. 检查是否有重复模式
if DataQualityControl._has_repetitive_pattern(traj['steps']):
return False
return True
@staticmethod
def _has_repetitive_pattern(steps):
"""检测重复模式"""
# 简化实现
for window_size in [2, 3]:
for i in range(len(steps) - window_size):
window = tuple(steps[i:i+window_size])
# 检查是否重复出现
for j in range(i + window_size, len(steps) - window_size):
if tuple(steps[j:j+window_size]) == window:
return True
return False
class QualityMetrics:
"""
质量指标
"""
@staticmethod
def compute_diversity_score(trajectories):
"""计算多样性分数"""
# 步骤级别的多样性
all_steps = []
for traj in trajectories:
all_steps.extend(traj['steps'])
unique_steps = len(set(all_steps))
total_steps = len(all_steps)
diversity = unique_steps / total_steps
return diversity
@staticmethod
def compute_coverage_score(trajectories, problems):
"""计算覆盖度分数"""
covered_problems = set()
for traj in trajectories:
covered_problems.add(traj['problem']['id'])
coverage = len(covered_problems) / len(problems)
return coverage训练最佳实践
class TrainingBestPractices:
"""
训练最佳实践
"""
@staticmethod
def get_curriculum_schedule():
"""
课程学习安排
"""
schedule = [
# 阶段1:简单数据
{
'stage': 1,
'data': 'high_quality_only',
'lr': 1e-4,
'batch_size': 16,
'epochs': 5
},
# 阶段2:加入中等质量
{
'stage': 2,
'data': 'high_and_medium',
'lr': 5e-5,
'batch_size': 32,
'epochs': 5
},
# 阶段3:全量数据
{
'stage': 3,
'data': 'all',
'lr': 1e-5,
'batch_size': 64,
'epochs': 10
}
]
return schedule
@staticmethod
def get_regularization_config():
"""
正则化配置
"""
return {
'weight_decay': 0.01,
'dropout': 0.1,
'gradient_clipping': 1.0,
'early_stopping_patience': 3,
'label_smoothing': 0.1
}
@staticmethod
def get_validation_strategy():
"""
验证策略
"""
return {
'val_split': 0.1,
'metrics': [
'mse',
'accuracy',
'kendall_tau',
'calibration_error'
],
'checkpoints': [
'best_mse',
'best_accuracy',
'best_calibration'
]
}实验结果
主要结果
def main_results():
"""
主要实验结果
"""
results = {
# 规模化实验
'scaling_experiment': {
'data_sizes': [10000, 100000, 1000000],
'metrics': {
'10000': {'auc': 0.72, 'accuracy': 0.68},
'100000': {'auc': 0.81, 'accuracy': 0.76},
'1000000': {'auc': 0.89, 'accuracy': 0.84}
}
},
# 与现有方法对比
'comparison_with_baselines': {
'methods': [
'Human-PRM (PRM800K)',
'Self-Generated-PRM',
'Rewarding Progress (Ours)'
],
'results': {
'Human-PRM': {
'auc': 0.78,
'data_cost': '$60K',
'human_annotation': True
},
'Self-Generated-PRM': {
'auc': 0.69,
'data_cost': '$5K',
'human_annotation': False
},
'Rewarding Progress': {
'auc': 0.89,
'data_cost': '$8K',
'human_annotation': False
}
}
},
# 质量分析
'quality_analysis': {
'auto_vs_human_correlation': 0.82,
'progress_prediction_accuracy': 0.87,
'quality_diversity_score': 0.73
}
}
return results消融实验
| 组件 | AUC | 说明 |
|---|---|---|
| 完整系统 | 0.89 | - |
| - 多样化生成 | 0.84 | -5% |
| - 进步感标注 | 0.79 | -10% |
| - 质量过滤 | 0.82 | -7% |
| - 课程学习 | 0.85 | -4% |
| 仅温度采样 | 0.71 | -18% |
规模化曲线
def plot_scaling_curves():
"""
绘制规模化曲线
"""
import matplotlib.pyplot as plt
import numpy as np
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 数据量 vs 性能
ax1 = axes[0]
data_sizes = [1e4, 1e5, 1e6, 1e7]
auc_scores = [0.72, 0.81, 0.89, 0.92]
ax1.loglog(data_sizes, auc_scores, 'bo-', linewidth=2, markersize=8)
ax1.set_xlabel('Training Data Size')
ax1.set_ylabel('AUC')
ax1.set_title('Data Scaling')
ax1.grid(True, alpha=0.3)
# 计算量 vs 性能
ax2 = axes[1]
compute = [1e17, 1e18, 1e19, 1e20]
auc_scores = [0.72, 0.81, 0.89, 0.92]
ax2.loglog(compute, auc_scores, 'go-', linewidth=2, markersize=8)
ax2.set_xlabel('Compute (FLOPs)')
ax2.set_ylabel('AUC')
ax2.set_title('Compute Scaling')
ax2.grid(True, alpha=0.3)
# 成本效率对比
ax3 = axes[2]
methods = ['Human\nAnnotation', 'Self-Gen', 'Rewarding\nProgress']
costs = [60, 5, 8]
aucs = [0.78, 0.69, 0.89]
bars = ax3.bar(methods, costs, color=['gray', 'blue', 'green'])
ax3.set_ylabel('Cost ($K)')
ax3.set_title('Cost Efficiency')
# 添加AUC标注
for bar, auc in zip(bars, aucs):
height = bar.get_height()
ax3.text(bar.get_x() + bar.get_width()/2., height,
f'AUC={auc}', ha='center', va='bottom')
plt.tight_layout()
plt.savefig('scaling_results.png', dpi=150)
plt.show()与其他方法对比
方法对比总览
| 方法 | 数据来源 | 标注成本 | 可扩展性 | 性能 |
|---|---|---|---|---|
| 人工标注 | 人工 | 极高 | 差 | 中高 |
| Self-Play | 自生成 | 低 | 好 | 中 |
| Distilling | LLM | 中 | 中 | 中高 |
| Rewarding Progress | 自生成+自动标注 | 低 | 很好 | 高 |
| Hybrid | 多源 | 中 | 好 | 高 |
与Self-Play对比
def compare_with_self_play():
"""
与Self-Play方法对比
"""
comparison = {
'self_play': {
'data_generation': '对抗性博弈',
'annotation': '自动(基于胜负)',
'strengths': [
'无需人工标注',
'自然平衡探索与利用'
],
'weaknesses': [
'需要明确定义"胜负"',
'可能陷入局部最优'
]
},
'rewarding_progress': {
'data_generation': '多样化采样',
'annotation': '自动(基于进步感)',
'strengths': [
'无需人工标注',
'捕捉推理过程质量',
'更好的泛化性'
],
'weaknesses': [
'进步感定义可能不完美',
'需要额外的质量控制'
]
}
}
return comparison优势分析
def advantage_analysis():
"""
Rewarding Progress优势分析
"""
advantages = {
'cost_efficiency': {
'description': '大幅降低标注成本',
'quantitative': '比人工标注降低87%',
'breakdown': {
'data_generation': '$3K',
'auto_annotation': '$2K',
'quality_control': '$3K',
'total': '$8K vs $60K (人工)'
}
},
'scalability': {
'description': '易于规模化',
'quantitative': '可扩展到10M+样本',
'key_insight': '瓶颈从标注变为计算'
},
'quality': {
'description': '质量接近人工标注',
'quantitative': '与人工标注AUC差距<5%',
'human_correlation': '0.82'
},
'flexibility': {
'description': '可适应不同任务',
'applications': [
'数学推理',
'代码生成',
'逻辑推理',
'问答系统'
]
}
}
return advantages实现细节
完整训练流程
class CompleteTrainingPipeline:
"""
完整训练流程
"""
def __init__(self, config):
self.config = config
self.model = ProgressPredictor(config)
self.generator = TrajectoryGenerator(config)
self.detector = ProgressDetector(config)
self.optimizer = VerifierOptimizer(config)
def train(self, initial_problems, total_steps=100000):
"""
完整训练流程
"""
current_step = 0
best_model = None
best_score = 0
while current_step < total_steps:
# 1. 生成数据
if current_step % 10000 == 0:
# 定期生成新数据
new_problems = self._sample_problems(
initial_problems,
batch_size=1000
)
trajectories = []
for problem in new_problems:
trajs = self.generator.generate(problem, n_samples=10)
trajectories.extend(trajs)
# 标注
for traj in trajectories:
traj['progress_labels'] = self.detector.detect(
traj['problem'],
traj['steps']
)
self.training_data.extend(trajectories)
# 2. 质量过滤
filtered_data = DataQualityControl.filter_trajectories(
self.training_data
)
# 3. 训练步骤
batch = self._sample_batch(filtered_data, batch_size=32)
metrics = self.optimizer.train_step(batch)
# 4. 评估
if current_step % 1000 == 0:
val_metrics = self._evaluate()
if val_metrics['auc'] > best_score:
best_score = val_metrics['auc']
best_model = copy.deepcopy(self.model.state_dict())
print(f"Step {current_step}: Loss={metrics['loss']:.4f}, "
f"Val AUC={val_metrics['auc']:.4f}")
current_step += 1
# 保存最佳模型
self.model.load_state_dict(best_model)
return self.model
def _evaluate(self):
"""评估模型"""
val_data = self.validation_loader
predictions = []
labels = []
for batch in val_data:
for problem, step, context, label in batch:
pred = self.model(problem, step, context)
predictions.append(pred['score'].item())
labels.append(label)
# 计算指标
auc = compute_auc(predictions, labels)
return {'auc': auc}推理部署
class ProgressVerifierDeployment:
"""
推理部署
"""
def __init__(self, model_path):
self.model = self._load_model(model_path)
self.model.eval()
@torch.no_grad()
def verify_step(self, problem, current_step, context_steps):
"""
验证单步推理
"""
result = self.model(
problem,
current_step,
context_steps
)
return {
'progress_score': result['score'].item(),
'progress_type': self._get_type_name(result['type']),
'confidence': result['confidence'].item(),
'recommendation': self._get_recommendation(result)
}
def verify_trajectory(self, problem, steps):
"""
验证完整轨迹
"""
results = []
for i, step in enumerate(steps):
context = steps[:i] if i > 0 else []
result = self.verify_step(problem, step, context)
result['step_idx'] = i
results.append(result)
# 汇总
avg_score = np.mean([r['progress_score'] for r in results])
low_confidence_steps = [r for r in results if r['confidence'] < 0.7]
return {
'trajectory_score': avg_score,
'step_results': results,
'warnings': low_confidence_steps,
'overall_recommendation': 'accept' if avg_score > 0.6 else 'revise'
}
def _get_recommendation(self, result):
"""获取建议"""
score = result['score'].item()
confidence = result['confidence'].item()
if score > 0.8 and confidence > 0.8:
return 'excellent'
elif score > 0.6:
return 'good'
elif score > 0.4:
return 'acceptable'
else:
return 'needs_revision'总结
Rewarding Progress为PRM训练提供了一条切实可行的规模化路径:
- 突破标注瓶颈:通过”进步感”自动标注,无需人工
- 高效数据生成:多样化轨迹生成策略
- 质量控制:多层次质量过滤机制
- 成本效益:大幅降低训练成本(87%)
- 可扩展性:支持从10K到10M的规模化训练
该工作为构建真正大规模的过程奖励模型奠定了方法论基础。
参考
Footnotes
-
Rewarding Progress论文(ICLR 2025) ↩