概述

Rewarding Progress 是ICLR 2025的一篇重要论文,提出了一种规模化自动化过程验证器的训练方法。该工作的核心贡献在于解决了过程奖励模型(PRM)训练中最大的瓶颈——标注成本问题,通过设计”进步感”(Progress)这一新的奖励信号,实现了大规模、低成本的自动化PRM训练。1

核心贡献:将过程奖励建模重新定义为”进步感”预测任务,使得无需人工标注即可训练过程验证器。

背景:PRM训练的标注困境

传统PRM的训练范式

传统PRM训练依赖大量的人工标注数据:

class TraditionalPRMTraining:
    """
    传统PRM训练范式
    
    问题:需要昂贵的人工标注
    """
    def __init__(self):
        self.data_requirements = {
            'prm800k': {
                'annotations': '逐步骤标注',
                'cost': '约50美元/问题',
                'total_problems': 12000,
                'total_cost': '约60万美元',
                'time': '数月标注时间'
            },
            'math-shepherd': {
                'annotations': '逐步骤标注+结果验证',
                'cost': '约30美元/问题',
                'total_problems': 30000,
                'total_cost': '约90万美元'
            }
        }
    
    def training_pipeline(self):
        """
        传统训练流程:
        1. 收集问题
        2. 人工标注每个推理步骤
        3. 训练PRM
        4. 人工验证结果
        
        瓶颈:步骤2的标注成本极高
        """
        steps = [
            '1. 问题收集 (相对便宜)',
            '2. 步骤标注 (非常昂贵)',
            '3. 模型训练 (适中)',
            '4. 结果验证 (昂贵)'
        ]
        
        return steps

标注成本分析

数据集问题数步骤数/问题总步骤数标注成本
PRM800K12,000~8~96,000~$60,000
Math-Shepherd30,000~10~300,000~$90,000
HINT5,000~15~75,000~$45,000
所需规模1,000,000+~10~10,000,000~$5,000,000+

为什么需要自动化?

  1. 规模化需求:要训练真正强大的PRM,需要百万级的问题
  2. 成本瓶颈:人工标注无法满足规模化需求
  3. 一致性:大规模人工标注难以保证一致性
  4. 速度:人工标注速度远慢于模型生成

Rewarding Progress核心思想

核心洞察:“进步感”作为奖励信号

Rewarding Progress的核心洞察是:好的推理过程会在每个步骤中展现”进步”

class ProgressRewardSignal:
    """
    进步感作为奖励信号
    
    核心假设:
    - 正确的推理会在每一步中推进问题的解决
    - "进步"可以通过比较相邻步骤来衡量
    - 不需要人工标注"正确性",只需标注"进步"
    """
    
    def __init__(self):
        self.progress_indicators = {
            'information_gain': '信息增量',
            'partial_solution': '部分解的改进',
            'constraint_narrowing': '约束范围缩小',
            'goal_decomposition': '目标分解',
            'verification': '验证步骤'
        }
    
    def compute_progress(self, problem, steps):
        """
        计算每步的进步感
        
        进步感 = f(当前状态, 前序状态, 问题目标)
        
        正进步:更接近正确答案
        负进步:远离正确答案或原地踏步
        """
        progress_scores = []
        
        for i, step in enumerate(steps):
            if i == 0:
                # 第一步:与初始状态比较
                progress = self.compute_initial_progress(step, problem)
            else:
                # 后续步骤:与前一步比较
                progress = self.compute_step_progress(
                    step, 
                    steps[i-1], 
                    problem
                )
            
            progress_scores.append(progress)
        
        return progress_scores
    
    def compute_step_progress(self, current_step, previous_step, problem):
        """
        计算步骤间的进步感
        
        策略:
        1. 计算当前步骤带来的信息增量
        2. 计算部分解的改进程度
        3. 综合评估进步
        """
        # 信息增量
        info_gain = self.compute_information_gain(
            current_step, previous_step
        )
        
        # 部分解质量
        partial_quality = self.compute_partial_solution(
            current_step, problem
        )
        
        # 综合进步
        progress = 0.6 * info_gain + 0.4 * partial_quality
        
        return progress

“进步感”的定义

论文定义了多种”进步感”指标:

class ProgressIndicators:
    """
    进步感指标集合
    """
    
    @staticmethod
    def information_gain(current_step, previous_steps):
        """
        信息增量指标
        
        衡量:
        - 新引入的概念/变量
        - 新建立的等式/关系
        - 新证明的引理
        """
        # 检测新信息
        new_concepts = extract_new_concepts(current_step, previous_steps)
        new_relations = extract_new_relations(current_step, previous_steps)
        
        # 计算增量
        gain = len(new_concepts) * 0.5 + len(new_relations) * 0.5
        
        return min(1.0, gain)
    
    @staticmethod
    def constraint_narrowing(step, problem):
        """
        约束范围缩小指标
        
        衡量:
        - 解空间缩小了多少
        - 排除的可能性
        - 确定的变量值
        """
        constraints_before = count_constraints(problem)
        constraints_after = count_constraints(step, problem)
        
        narrowing = (constraints_before - constraints_after) / constraints_before
        
        return max(0.0, narrowing)
    
    @staticmethod
    def partial_solution_quality(step, problem):
        """
        部分解质量指标
        
        衡量:
        - 当前步骤距完整解还有多远
        - 部分解的正确程度
        """
        # 简化:使用启发式方法
        # 实际实现中需要更复杂的评估
        
        quality_indicators = {
            'has_value_assignment': 0.3,
            'has_equation': 0.2,
            'has_intermediate_result': 0.3,
            'is_final_step': 0.2
        }
        
        quality = sum(
            v for k, v in quality_indicators.items() 
            if check_indicator(step, k)
        )
        
        return quality
    
    @staticmethod
    def verification_progress(step, problem):
        """
        验证进度指标
        
        衡量:
        - 是否进行了自我验证
        - 验证是否成功
        """
        if '验证' in step or '检查' in step:
            # 检测验证结果
            if '正确' in step or '成立' in step:
                return 0.8
            elif '错误' in step or '不成立' in step:
                return 0.6  # 发现问题也是有价值的
            else:
                return 0.4
        return 0.0

自动化过程验证框架

整体框架

class RewardingProgressFramework:
    """
    Rewarding Progress整体框架
    
    核心思想:
    1. 利用LLM生成多样化的推理轨迹
    2. 自动检测每步的"进步感"
    3. 训练PRM预测进步感
    4. 使用训练好的PRM指导推理
    """
    def __init__(self, config):
        self.config = config
        
        # 1. 数据生成器
        self.trajectory_generator = TrajectoryGenerator(config)
        
        # 2. 进步检测器
        self.progress_detector = ProgressDetector(config)
        
        # 3. 进步预测器(PRM)
        self.progress_predictor = ProgressPredictor(config)
        
        # 4. 验证器优化器
        self.verifier_optimizer = VerifierOptimizer(config)
    
    def train_verifier(self, problems):
        """
        训练过程验证器
        """
        # 阶段1:生成多样轨迹
        trajectories = []
        for problem in problems:
            trajs = self.trajectory_generator.generate(
                problem,
                n_samples=self.config.n_samples,
                temperature=self.config.temperature
            )
            trajectories.extend(trajs)
        
        # 阶段2:自动标注进步感
        progress_annotations = []
        for traj in trajectories:
            progress = self.progress_detector.detect(
                traj['problem'],
                traj['steps']
            )
            progress_annotations.append(progress)
        
        # 阶段3:训练进步预测器
        self.progress_predictor.train(
            trajectories,
            progress_annotations
        )
        
        # 阶段4:迭代优化
        for iteration in range(self.config.n_iterations):
            # 使用当前PRM筛选高质量轨迹
            high_quality_trajs = self.filter_high_quality(trajectories)
            
            # 在高质量数据上微调
            self.progress_predictor.finetune(high_quality_trajs)
        
        return self.progress_predictor
    
    def filter_high_quality(self, trajectories):
        """使用PRM筛选高质量轨迹"""
        scored_trajs = []
        
        for traj in trajectories:
            # 使用PRM评分
            scores = []
            for step in traj['steps']:
                score = self.progress_predictor.predict(
                    traj['problem'],
                    step,
                    traj['steps'][:traj['steps'].index(step)]
                )
                scores.append(score)
            
            avg_score = np.mean(scores)
            scored_trajs.append((traj, avg_score))
        
        # 选择高分轨迹
        scored_trajs.sort(key=lambda x: x[1], reverse=True)
        top_k = int(len(scored_trajs) * self.config.top_k_ratio)
        
        return [t[0] for t in scored_trajs[:top_k]]

轨迹生成器

class TrajectoryGenerator:
    """
    多样化轨迹生成器
    
    关键设计:
    1. 多样化采样策略
    2. 覆盖不同推理路径
    3. 生成成功和失败的轨迹
    """
    def __init__(self, config):
        self.config = config
        self.model = load_model(config.model_name)
    
    def generate(self, problem, n_samples=10, temperature=1.0):
        """
        生成多样化轨迹
        """
        trajectories = []
        
        # 策略1:高温采样(多样性)
        high_temp_trajs = self._sample_with_temperature(
            problem, 
            n_samples // 2, 
            temperature=1.2
        )
        trajectories.extend(high_temp_trajs)
        
        # 策略2:低温采样(高质量)
        low_temp_trajs = self._sample_with_temperature(
            problem,
            n_samples // 2,
            temperature=0.7
        )
        trajectories.extend(low_temp_trajs)
        
        # 策略3:束搜索(多样性路径)
        beam_trajs = self._beam_search(
            problem,
            n_beams=3,
            depth=10
        )
        trajectories.extend(beam_trajs)
        
        return trajectories
    
    def _sample_with_temperature(self, problem, n, temperature):
        """温度采样"""
        trajectories = []
        
        for _ in range(n):
            traj = {
                'problem': problem,
                'steps': [],
                'final_answer': None,
                'is_correct': None,
                'metadata': {'temperature': temperature}
            }
            
            # 逐步生成
            current = problem
            for step_idx in range(self.config.max_steps):
                # 生成一步推理
                step_text = self._generate_step(current, temperature)
                traj['steps'].append(step_text)
                
                # 检查终止
                if self._is_terminal(step_text):
                    break
                
                current = step_text
            
            # 验证答案
            traj['final_answer'] = self._extract_answer(traj['steps'][-1])
            traj['is_correct'] = (traj['final_answer'] == problem['answer'])
            
            trajectories.append(traj)
        
        return trajectories
    
    def _beam_search(self, problem, n_beams, depth):
        """束搜索生成"""
        # 简化的束搜索实现
        beams = [(problem, [], 0.0)]  # (当前状态, 轨迹, 分数)
        
        for _ in range(depth):
            candidates = []
            
            for state, traj, score in beams:
                # 生成多个候选
                for _ in range(n_beams):
                    next_step = self._generate_step(state, temperature=0.8)
                    new_traj = traj + [next_step]
                    new_score = score + 0.1  # 简化评分
                    candidates.append((next_step, new_traj, new_score))
            
            # 选择top-k
            candidates.sort(key=lambda x: x[2], reverse=True)
            beams = candidates[:n_beams]
        
        return [{
            'problem': problem,
            'steps': traj,
            'final_answer': self._extract_answer(traj[-1]) if traj else None,
            'is_correct': None,
            'metadata': {'method': 'beam_search'}
        } for _, traj, _ in beams]

进步检测器

class ProgressDetector:
    """
    自动化进步检测器
    
    核心算法:
    1. 分析相邻步骤间的变化
    2. 评估变化的"价值"
    3. 量化进步程度
    """
    def __init__(self, config):
        self.config = config
        self.analyzer = StepAnalyzer(config)
        self.comparator = StepComparator(config)
    
    def detect(self, problem, steps):
        """
        检测每步的进步感
        """
        progress_scores = []
        
        for i, step in enumerate(steps):
            if i == 0:
                # 初始步骤
                score = self._detect_initial_progress(step, problem)
            else:
                # 后续步骤
                score = self._detect_step_progress(
                    step, 
                    steps[i-1],
                    problem
                )
            
            # 质量调整
            score = self._adjust_for_quality(step, score)
            
            progress_scores.append(score)
        
        return progress_scores
    
    def _detect_initial_progress(self, step, problem):
        """检测初始步骤的进步"""
        # 初始步骤应该:
        # 1. 理解问题
        # 2. 识别关键要素
        # 3. 建立初步框架
        
        progress = 0.0
        
        # 检查问题理解
        if self._understands_problem(step, problem):
            progress += 0.3
        
        # 检查关键要素识别
        if self._identifies_key_elements(step, problem):
            progress += 0.3
        
        # 检查框架建立
        if self._establishes_framework(step, problem):
            progress += 0.4
        
        return progress
    
    def _detect_step_progress(self, current_step, previous_step, problem):
        """检测步骤间的进步"""
        # 分析变化
        changes = self._analyze_changes(current_step, previous_step)
        
        # 评估变化价值
        progress = 0.0
        
        for change_type, change_value in changes.items():
            weight = self._get_weight(change_type)
            progress += weight * change_value
        
        return min(1.0, max(0.0, progress))
    
    def _analyze_changes(self, current, previous):
        """分析步骤间的变化"""
        changes = {}
        
        # 1. 信息变化
        info_added = self._compute_information_added(current, previous)
        changes['information'] = info_added
        
        # 2. 结构变化
        structure_change = self._compute_structure_change(current, previous)
        changes['structure'] = structure_change
        
        # 3. 语义变化
        semantic_change = self._compute_semantic_change(current, previous)
        changes['semantic'] = semantic_change
        
        # 4. 目标接近度变化
        goal_change = self._compute_goal_change(current, previous, problem)
        changes['goal'] = goal_change
        
        return changes
    
    def _compute_information_added(self, current, previous):
        """计算信息增量"""
        current_entities = self._extract_entities(current)
        previous_entities = self._extract_entities(previous)
        
        new_entities = current_entities - previous_entities
        
        # 归一化
        if len(current_entities) == 0:
            return 0.0
        
        return len(new_entities) / len(current_entities)
    
    def _compute_structure_change(self, current, previous):
        """计算结构变化"""
        current_structure = self._analyze_structure(current)
        previous_structure = self._analyze_structure(previous)
        
        # 计算结构差异
        diff = self._structure_difference(current_structure, previous_structure)
        
        return min(1.0, diff)
    
    def _compute_semantic_change(self, current, previous):
        """计算语义变化"""
        # 使用embedding相似度
        current_emb = self._get_embedding(current)
        previous_emb = self._get_embedding(previous)
        
        # 1 - 相似度 = 变化程度
        similarity = F.cosine_similarity(current_emb, previous_emb, dim=0)
        change = 1 - similarity.item()
        
        return change
    
    def _compute_goal_change(self, current, previous, problem):
        """计算目标接近度变化"""
        # 简化实现
        # 实际需要更复杂的goal tracking
        
        current_distance = self._estimate_distance_to_goal(current, problem)
        previous_distance = self._estimate_distance_to_goal(previous, problem)
        
        # 进步 = 距离减少
        progress = previous_distance - current_distance
        
        return max(0.0, min(1.0, progress))
    
    def _adjust_for_quality(self, step, raw_score):
        """根据质量调整分数"""
        # 负面调整因素
        if self._is_redundant(step):
            raw_score *= 0.7
        
        if self._is_backtracking(step):
            raw_score *= 0.5
        
        if self._is_gibberish(step):
            raw_score = 0.0
        
        # 正面调整因素
        if self._is_verification_step(step):
            raw_score += 0.1
        
        if self._is_creative_step(step):
            raw_score += 0.05
        
        return min(1.0, max(0.0, raw_score))

进步预测器训练

class ProgressPredictor(nn.Module):
    """
    进步预测器(PRM)
    
    输入:
    - 问题
    - 当前步骤
    - 前序步骤上下文
    
    输出:
    - 进步分数
    - 进步类型
    - 置信度
    """
    def __init__(self, config):
        super().__init__()
        self.config = config
        
        # 编码器
        self.encoder = TransformerEncoder(config)
        
        # 进步预测头
        self.progress_predictor = nn.Sequential(
            nn.Linear(config.hidden_dim, config.hidden_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(config.hidden_dim, 1),
            nn.Sigmoid()
        )
        
        # 进步类型分类器
        self.type_classifier = nn.Linear(config.hidden_dim, config.num_progress_types)
        
        # 置信度估计器
        self.confidence_estimator = nn.Sequential(
            nn.Linear(config.hidden_dim, config.hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(config.hidden_dim // 2, 1),
            nn.Sigmoid()
        )
    
    def forward(self, problem, current_step, context_steps):
        """
        预测进步感
        """
        # 编码
        encoding = self.encode(problem, current_step, context_steps)
        
        # 预测进步分数
        progress_score = self.progress_predictor(encoding).squeeze()
        
        # 预测进步类型
        type_logits = self.type_classifier(encoding)
        progress_type = torch.argmax(type_logits, dim=-1)
        
        # 估计置信度
        confidence = self.confidence_estimator(encoding).squeeze()
        
        return {
            'score': progress_score,
            'type': progress_type,
            'confidence': confidence,
            'type_probs': F.softmax(type_logits, dim=-1)
        }
    
    def encode(self, problem, current_step, context_steps):
        """编码输入"""
        # 编码问题
        problem_enc = self.encoder(problem)
        
        # 编码当前步骤
        current_enc = self.encoder(current_step)
        
        # 编码上下文
        if context_steps:
            context_enc = self.encoder('\n'.join(context_steps))
        else:
            context_enc = torch.zeros_like(problem_enc)
        
        # 融合
        combined = problem_enc + current_enc + 0.5 * context_enc
        
        return combined
 
 
class VerifierOptimizer:
    """
    验证器优化器
    """
    def __init__(self, config):
        self.config = config
        self.optimizer = torch.optim.AdamW(
            config.model.parameters(),
            lr=config.lr,
            weight_decay=config.weight_decay
        )
    
    def train_step(self, batch):
        """训练步骤"""
        # 前向传播
        predictions = []
        for problem, step, context, label in batch:
            pred = self.config.model(problem, step, context)
            predictions.append(pred)
        
        # 计算损失
        loss = self.compute_loss(predictions, batch['labels'])
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(
            self.config.model.parameters(),
            self.config.max_grad_norm
        )
        self.optimizer.step()
        
        return {'loss': loss.item()}
    
    def compute_loss(self, predictions, labels):
        """计算损失"""
        # MSE损失用于分数预测
        scores = torch.tensor([p['score'].item() for p in predictions])
        target = torch.tensor(labels)
        
        mse_loss = F.mse_loss(scores, target)
        
        # 辅助损失:类型分类
        types = torch.stack([p['type'] for p in predictions])
        type_labels = torch.tensor([l['type'] for l in labels])
        
        ce_loss = F.cross_entropy(types, type_labels)
        
        # 总损失
        total_loss = mse_loss + 0.2 * ce_loss
        
        return total_loss

规模化策略

数据规模化

class ScalingStrategy:
    """
    规模化策略
    """
    
    def __init__(self, config):
        self.config = config
    
    def scale_training_data(self):
        """
        数据规模化路径
        
        目标:从10K扩展到10M
        """
        scaling_phases = {
            'phase_1_seed': {
                'size': 10000,
                'method': '人工标注',
                'purpose': '种子模型'
            },
            'phase_2_synthetic': {
                'size': 100000,
                'method': 'LLM生成+自动标注',
                'purpose': '扩展数据'
            },
            'phase_3_filtered': {
                'size': 1000000,
                'method': '高质量筛选',
                'purpose': '质量提升'
            },
            'phase_4_full': {
                'size': 10000000,
                'method': '全量生成',
                'purpose': '规模化'
            }
        }
        
        return scaling_phases
    
    def generate_synthetic_data(self, problems):
        """
        生成合成数据
        """
        synthetic_data = []
        
        for problem in problems:
            # 1. 使用不同温度生成多样轨迹
            trajectories = self._diverse_generation(problem)
            
            # 2. 自动标注进步感
            for traj in trajectories:
                progress = self._auto_annotate(problem, traj)
                traj['progress_labels'] = progress
                
                if self._is_valid_annotation(progress):
                    synthetic_data.append(traj)
        
        return synthetic_data
    
    def _diverse_generation(self, problem):
        """多样化生成"""
        # 多种策略组合
        strategies = [
            ('high_temp', 1.2, 5),
            ('low_temp', 0.7, 5),
            ('beam_search', None, 3),
            ('diverse_prompts', None, 5)
        ]
        
        all_trajs = []
        for method, temp, n in strategies:
            trajs = self._generate_with_strategy(
                problem, method, temp, n
            )
            all_trajs.extend(trajs)
        
        return all_trajs
    
    def _auto_annotate(self, problem, traj):
        """自动标注"""
        detector = ProgressDetector(self.config)
        return detector.detect(problem, traj['steps'])
    
    def _is_valid_annotation(self, progress):
        """验证标注有效性"""
        # 过滤无效标注
        if len(progress) == 0:
            return False
        
        if all(p == 0 for p in progress):
            return False
        
        if all(p == 1 for p in progress):
            return False
        
        return True

计算规模化

class ComputeScaling:
    """
    计算规模化
    """
    
    def __init__(self):
        self.compute_chart = {
            'flops': {
                '10K_data': '10^17',
                '100K_data': '10^18',
                '1M_data': '10^19',
                '10M_data': '10^20'
            },
            'training_time': {
                '10K_data': '1 GPU-day',
                '100K_data': '10 GPU-days',
                '1M_data': '100 GPU-days',
                '10M_data': '1000 GPU-days'
            },
            'cost': {
                '10K_data': '$100',
                '100K_data': '$1,000',
                '1M_data': '$10,000',
                '10M_data': '$100,000'
            }
        }
    
    def estimate_compute(self, data_size, model_size='7B'):
        """估算计算需求"""
        # FLOPs估算
        # 假设每个样本需要model_size * 3次前向传播
        
        flops_per_sample = model_size * 3  # 前向 + 反向 + 更新
        
        total_flops = flops_per_sample * data_size * 10  # 假设10 epochs
        
        # 训练时间估算
        gpu_tflops = 1000  # 假设使用A100
        training_time_hours = total_flops / (gpu_tflops * 1e12)
        
        return {
            'total_flops': total_flops,
            'training_hours': training_time_hours,
            'cost_estimate': training_time_hours * 2  # $2/hour per GPU
        }

最佳实践指南

数据质量控制

class DataQualityControl:
    """
    数据质量控制
    """
    
    @staticmethod
    def filter_trajectories(trajectories, min_steps=3, max_steps=50):
        """过滤轨迹"""
        filtered = []
        
        for traj in trajectories:
            n_steps = len(traj['steps'])
            
            # 长度过滤
            if n_steps < min_steps or n_steps > max_steps:
                continue
            
            # 质量过滤
            if not DataQualityControl._check_quality(traj):
                continue
            
            filtered.append(traj)
        
        return filtered
    
    @staticmethod
    def _check_quality(traj):
        """检查轨迹质量"""
        # 1. 检查是否有有效内容
        has_content = any(
            len(step.strip()) > 10 
            for step in traj['steps']
        )
        if not has_content:
            return False
        
        # 2. 检查是否有多样性
        unique_steps = len(set(traj['steps']))
        if unique_steps < len(traj['steps']) * 0.5:
            return False
        
        # 3. 检查是否有重复模式
        if DataQualityControl._has_repetitive_pattern(traj['steps']):
            return False
        
        return True
    
    @staticmethod
    def _has_repetitive_pattern(steps):
        """检测重复模式"""
        # 简化实现
        for window_size in [2, 3]:
            for i in range(len(steps) - window_size):
                window = tuple(steps[i:i+window_size])
                # 检查是否重复出现
                for j in range(i + window_size, len(steps) - window_size):
                    if tuple(steps[j:j+window_size]) == window:
                        return True
        
        return False
 
 
class QualityMetrics:
    """
    质量指标
    """
    
    @staticmethod
    def compute_diversity_score(trajectories):
        """计算多样性分数"""
        # 步骤级别的多样性
        all_steps = []
        for traj in trajectories:
            all_steps.extend(traj['steps'])
        
        unique_steps = len(set(all_steps))
        total_steps = len(all_steps)
        
        diversity = unique_steps / total_steps
        
        return diversity
    
    @staticmethod
    def compute_coverage_score(trajectories, problems):
        """计算覆盖度分数"""
        covered_problems = set()
        
        for traj in trajectories:
            covered_problems.add(traj['problem']['id'])
        
        coverage = len(covered_problems) / len(problems)
        
        return coverage

训练最佳实践

class TrainingBestPractices:
    """
    训练最佳实践
    """
    
    @staticmethod
    def get_curriculum_schedule():
        """
        课程学习安排
        """
        schedule = [
            # 阶段1:简单数据
            {
                'stage': 1,
                'data': 'high_quality_only',
                'lr': 1e-4,
                'batch_size': 16,
                'epochs': 5
            },
            # 阶段2:加入中等质量
            {
                'stage': 2,
                'data': 'high_and_medium',
                'lr': 5e-5,
                'batch_size': 32,
                'epochs': 5
            },
            # 阶段3:全量数据
            {
                'stage': 3,
                'data': 'all',
                'lr': 1e-5,
                'batch_size': 64,
                'epochs': 10
            }
        ]
        
        return schedule
    
    @staticmethod
    def get_regularization_config():
        """
        正则化配置
        """
        return {
            'weight_decay': 0.01,
            'dropout': 0.1,
            'gradient_clipping': 1.0,
            'early_stopping_patience': 3,
            'label_smoothing': 0.1
        }
    
    @staticmethod
    def get_validation_strategy():
        """
        验证策略
        """
        return {
            'val_split': 0.1,
            'metrics': [
                'mse',
                'accuracy',
                'kendall_tau',
                'calibration_error'
            ],
            'checkpoints': [
                'best_mse',
                'best_accuracy',
                'best_calibration'
            ]
        }

实验结果

主要结果

def main_results():
    """
    主要实验结果
    """
    results = {
        # 规模化实验
        'scaling_experiment': {
            'data_sizes': [10000, 100000, 1000000],
            'metrics': {
                '10000': {'auc': 0.72, 'accuracy': 0.68},
                '100000': {'auc': 0.81, 'accuracy': 0.76},
                '1000000': {'auc': 0.89, 'accuracy': 0.84}
            }
        },
        
        # 与现有方法对比
        'comparison_with_baselines': {
            'methods': [
                'Human-PRM (PRM800K)',
                'Self-Generated-PRM',
                'Rewarding Progress (Ours)'
            ],
            'results': {
                'Human-PRM': {
                    'auc': 0.78,
                    'data_cost': '$60K',
                    'human_annotation': True
                },
                'Self-Generated-PRM': {
                    'auc': 0.69,
                    'data_cost': '$5K',
                    'human_annotation': False
                },
                'Rewarding Progress': {
                    'auc': 0.89,
                    'data_cost': '$8K',
                    'human_annotation': False
                }
            }
        },
        
        # 质量分析
        'quality_analysis': {
            'auto_vs_human_correlation': 0.82,
            'progress_prediction_accuracy': 0.87,
            'quality_diversity_score': 0.73
        }
    }
    
    return results

消融实验

组件AUC说明
完整系统0.89-
- 多样化生成0.84-5%
- 进步感标注0.79-10%
- 质量过滤0.82-7%
- 课程学习0.85-4%
仅温度采样0.71-18%

规模化曲线

def plot_scaling_curves():
    """
    绘制规模化曲线
    """
    import matplotlib.pyplot as plt
    import numpy as np
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 数据量 vs 性能
    ax1 = axes[0]
    data_sizes = [1e4, 1e5, 1e6, 1e7]
    auc_scores = [0.72, 0.81, 0.89, 0.92]
    
    ax1.loglog(data_sizes, auc_scores, 'bo-', linewidth=2, markersize=8)
    ax1.set_xlabel('Training Data Size')
    ax1.set_ylabel('AUC')
    ax1.set_title('Data Scaling')
    ax1.grid(True, alpha=0.3)
    
    # 计算量 vs 性能
    ax2 = axes[1]
    compute = [1e17, 1e18, 1e19, 1e20]
    auc_scores = [0.72, 0.81, 0.89, 0.92]
    
    ax2.loglog(compute, auc_scores, 'go-', linewidth=2, markersize=8)
    ax2.set_xlabel('Compute (FLOPs)')
    ax2.set_ylabel('AUC')
    ax2.set_title('Compute Scaling')
    ax2.grid(True, alpha=0.3)
    
    # 成本效率对比
    ax3 = axes[2]
    methods = ['Human\nAnnotation', 'Self-Gen', 'Rewarding\nProgress']
    costs = [60, 5, 8]
    aucs = [0.78, 0.69, 0.89]
    
    bars = ax3.bar(methods, costs, color=['gray', 'blue', 'green'])
    ax3.set_ylabel('Cost ($K)')
    ax3.set_title('Cost Efficiency')
    
    # 添加AUC标注
    for bar, auc in zip(bars, aucs):
        height = bar.get_height()
        ax3.text(bar.get_x() + bar.get_width()/2., height,
                f'AUC={auc}', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.savefig('scaling_results.png', dpi=150)
    plt.show()

与其他方法对比

方法对比总览

方法数据来源标注成本可扩展性性能
人工标注人工极高中高
Self-Play自生成
DistillingLLM中高
Rewarding Progress自生成+自动标注很好
Hybrid多源

与Self-Play对比

def compare_with_self_play():
    """
    与Self-Play方法对比
    """
    comparison = {
        'self_play': {
            'data_generation': '对抗性博弈',
            'annotation': '自动(基于胜负)',
            'strengths': [
                '无需人工标注',
                '自然平衡探索与利用'
            ],
            'weaknesses': [
                '需要明确定义"胜负"',
                '可能陷入局部最优'
            ]
        },
        'rewarding_progress': {
            'data_generation': '多样化采样',
            'annotation': '自动(基于进步感)',
            'strengths': [
                '无需人工标注',
                '捕捉推理过程质量',
                '更好的泛化性'
            ],
            'weaknesses': [
                '进步感定义可能不完美',
                '需要额外的质量控制'
            ]
        }
    }
    
    return comparison

优势分析

def advantage_analysis():
    """
    Rewarding Progress优势分析
    """
    advantages = {
        'cost_efficiency': {
            'description': '大幅降低标注成本',
            'quantitative': '比人工标注降低87%',
            'breakdown': {
                'data_generation': '$3K',
                'auto_annotation': '$2K',
                'quality_control': '$3K',
                'total': '$8K vs $60K (人工)'
            }
        },
        'scalability': {
            'description': '易于规模化',
            'quantitative': '可扩展到10M+样本',
            'key_insight': '瓶颈从标注变为计算'
        },
        'quality': {
            'description': '质量接近人工标注',
            'quantitative': '与人工标注AUC差距<5%',
            'human_correlation': '0.82'
        },
        'flexibility': {
            'description': '可适应不同任务',
            'applications': [
                '数学推理',
                '代码生成',
                '逻辑推理',
                '问答系统'
            ]
        }
    }
    
    return advantages

实现细节

完整训练流程

class CompleteTrainingPipeline:
    """
    完整训练流程
    """
    
    def __init__(self, config):
        self.config = config
        self.model = ProgressPredictor(config)
        self.generator = TrajectoryGenerator(config)
        self.detector = ProgressDetector(config)
        self.optimizer = VerifierOptimizer(config)
    
    def train(self, initial_problems, total_steps=100000):
        """
        完整训练流程
        """
        current_step = 0
        best_model = None
        best_score = 0
        
        while current_step < total_steps:
            # 1. 生成数据
            if current_step % 10000 == 0:
                # 定期生成新数据
                new_problems = self._sample_problems(
                    initial_problems,
                    batch_size=1000
                )
                trajectories = []
                for problem in new_problems:
                    trajs = self.generator.generate(problem, n_samples=10)
                    trajectories.extend(trajs)
                
                # 标注
                for traj in trajectories:
                    traj['progress_labels'] = self.detector.detect(
                        traj['problem'],
                        traj['steps']
                    )
                
                self.training_data.extend(trajectories)
            
            # 2. 质量过滤
            filtered_data = DataQualityControl.filter_trajectories(
                self.training_data
            )
            
            # 3. 训练步骤
            batch = self._sample_batch(filtered_data, batch_size=32)
            metrics = self.optimizer.train_step(batch)
            
            # 4. 评估
            if current_step % 1000 == 0:
                val_metrics = self._evaluate()
                
                if val_metrics['auc'] > best_score:
                    best_score = val_metrics['auc']
                    best_model = copy.deepcopy(self.model.state_dict())
                
                print(f"Step {current_step}: Loss={metrics['loss']:.4f}, "
                      f"Val AUC={val_metrics['auc']:.4f}")
            
            current_step += 1
        
        # 保存最佳模型
        self.model.load_state_dict(best_model)
        
        return self.model
    
    def _evaluate(self):
        """评估模型"""
        val_data = self.validation_loader
        
        predictions = []
        labels = []
        
        for batch in val_data:
            for problem, step, context, label in batch:
                pred = self.model(problem, step, context)
                predictions.append(pred['score'].item())
                labels.append(label)
        
        # 计算指标
        auc = compute_auc(predictions, labels)
        
        return {'auc': auc}

推理部署

class ProgressVerifierDeployment:
    """
    推理部署
    """
    
    def __init__(self, model_path):
        self.model = self._load_model(model_path)
        self.model.eval()
    
    @torch.no_grad()
    def verify_step(self, problem, current_step, context_steps):
        """
        验证单步推理
        """
        result = self.model(
            problem,
            current_step,
            context_steps
        )
        
        return {
            'progress_score': result['score'].item(),
            'progress_type': self._get_type_name(result['type']),
            'confidence': result['confidence'].item(),
            'recommendation': self._get_recommendation(result)
        }
    
    def verify_trajectory(self, problem, steps):
        """
        验证完整轨迹
        """
        results = []
        
        for i, step in enumerate(steps):
            context = steps[:i] if i > 0 else []
            result = self.verify_step(problem, step, context)
            result['step_idx'] = i
            results.append(result)
        
        # 汇总
        avg_score = np.mean([r['progress_score'] for r in results])
        low_confidence_steps = [r for r in results if r['confidence'] < 0.7]
        
        return {
            'trajectory_score': avg_score,
            'step_results': results,
            'warnings': low_confidence_steps,
            'overall_recommendation': 'accept' if avg_score > 0.6 else 'revise'
        }
    
    def _get_recommendation(self, result):
        """获取建议"""
        score = result['score'].item()
        confidence = result['confidence'].item()
        
        if score > 0.8 and confidence > 0.8:
            return 'excellent'
        elif score > 0.6:
            return 'good'
        elif score > 0.4:
            return 'acceptable'
        else:
            return 'needs_revision'

总结

Rewarding Progress为PRM训练提供了一条切实可行的规模化路径:

  1. 突破标注瓶颈:通过”进步感”自动标注,无需人工
  2. 高效数据生成:多样化轨迹生成策略
  3. 质量控制:多层次质量过滤机制
  4. 成本效益:大幅降低训练成本(87%)
  5. 可扩展性:支持从10K到10M的规模化训练

该工作为构建真正大规模的过程奖励模型奠定了方法论基础。


参考

Footnotes

  1. Rewarding Progress论文(ICLR 2025)