一、RLinf使用Sidecar文件连接三个处理阶段与RECAP数据处理相关的代码主要位于examples/recap/ ├── process/ │ ├── compute_returns.py │ ├── compute_advantages.py │ ├── run_compute_returns.sh │ ├── run_compute_advantages.sh │ └── config/ └── value/ ├── train_value.py ├── run_value_sft.sh └── config/libero_sft_value.yaml完整数据流如下LeRobot轨迹 - compute_returns.py - returns_{tag}.parquet - Value Model SFT - Value Model checkpoint - compute_advantages.py - advantages_{tag}.parquetRLinf不会把Return和Advantage直接写回原始轨迹文件而是保存成独立的Sidecar Parquet。例如,dataset/ ├── data/ │ └── chunk-000/ ├── meta/ │ ├── info.json │ ├── stats.json │ ├── returns_fail300.parquet │ └── advantages_fail300_N10_q30.parquet └── mixture_config.yamlReturn和Advantage依靠episode_index、frame_index字段与原始轨迹对齐。注意三个阶段必须使用相同的数据版本。重新过滤轨迹、删除帧或者重新编号后旧的Sidecar文件通常不能继续使用。SFT和Rollout数据采用不同的处理方式数据配置示例如下data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft weight: 1.0 - dataset_path: /path/to/rollout_dataset type: rollout weight: 1.0其中sft一般表示人工示范rollout表示策略自主运行产生的数据。二者差别如下SFT所有轨迹默认成功最终所有Advantage标签强制设为True。Rollout根据is_success区分成功和失败根据连续Advantage和阈值生成正负标签。二、compute_returns.py将轨迹结果转换为逐帧回报Return的计算入口是examples/recap/process/compute_returns.py启动命令为bash examples/recap/process/run_compute_returns.sh compute_returns核心配置如下data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft - dataset_path: /path/to/rollout_dataset type: rollout gamma: 1.0 failure_reward: -300.0 tag: fail300 num_workers: 128compute_returns_for_episode负责计算单条轨迹单条轨迹的Reward和Return由下面的函数计算def compute_returns_for_episode( episode_length, is_success, gamma, failure_reward, ): rewards np.full( episode_length, -1.0, dtypenp.float32, ) rewards[-1] ( 0.0 if is_success else failure_reward ) returns np.zeros( episode_length, dtypenp.float32, ) returns[-1] rewards[-1] for t in range( episode_length - 2, -1, -1, ): returns[t] ( rewards[t] gamma * returns[t 1] ) return returns, rewards奖励规则为任务仍在执行Reward -1 成功终止帧Reward 0 失败终止帧Reward failure_reward其中-1表示每执行一步产生一次时间成本较大的失败惩罚则用于保证失败轨迹的回报明显低于成功轨迹。Return从轨迹末尾向前累计当gamma1时当前帧Return 当前帧Reward 下一帧Return。假设一条成功轨迹包含三个普通步骤和一个成功终止帧帧 Reward Return 当前帧 -1 -3 下一帧 -1 -2 再下一帧 -1 -1 成功终止帧 0 0计算过程为成功终止帧Return 0 再下一帧Return -1 0 -1 下一帧Return -1 -1 -2 当前帧Return -1 -2 -3因此成功轨迹中的Return会逐渐接近0-3 → -2 → -1 → 0_process_single_parquet负责处理数据文件RLinf不会加载图像数据只读取计算Return所需的元数据列_READ_COLUMNS [ episode_index, frame_index, is_success, task_index, task, ]实际读取时还会检查当前Parquet中是否存在这些字段pf pq.ParquetFile(pq_file) available set(pf.schema_arrow.names) cols_to_read [ c for c in _READ_COLUMNS if c in available ] table pq.read_table( pq_file, columnscols_to_read, )由于没有读取图像Return计算主要是Parquet I/O和数组运算内存占用相对较低。代码根据episode_index的变化找到轨迹边界change_mask ( np.diff(episode_indices) ! 0 ) change_positions ( np.where(change_mask)[0] 1 )每条轨迹分别调用compute_returns_for_episode(...)处理SFT数据时代码直接将轨迹设为成功if dataset_type sft: is_success True处理Rollout数据时则读取轨迹最后一帧的is_successis_success bool( is_success_col[ep_end - 1] )多个Parquet文件通过线程并行处理process_dataset会递归查找数据目录中的所有Parquetparquet_files sorted( str(p) for p in data_dir.rglob(*.parquet) )随后使用线程池并行处理with ThreadPoolExecutor( max_workerseffective_workers ) as pool: fut pool.submit( _process_single_parquet, pq_file, dataset_type, gamma, failure_reward, tasks, )PyArrow文件读取时能够释放GIL因此这里使用线程也可以获得较好的并行效果。所有结果最终合并为一个Arrow Tablecombined pa.concat_tables( result_tables )当配置为tag: fail300输出文件为meta/returns_fail300.parquet主要字段包括episode_index frame_index return reward prompt脚本还会更新meta/stats.json meta/info.jsonstats.json中会记录{ return: { mean: ..., std: ..., min: ..., max: ... }, reward: { mean: ..., std: ..., min: ..., max: ... } }后面的Value Model和Advantage计算都需要使用这里的Return范围。三、Value Model将逐帧Return转换为状态价值价值模型训练入口是examples/recap/value/train_value.py运行命令为bash examples/recap/value/run_value_sft.sh libero_sft_valuetrain_value.py本身没有实现完整训练循环而是创建RLinf的SFT Worker和Runneractor_group ( FSDPValueSftWorker .create_group(cfg) .launch( cluster, namecfg.actor.group_name, placement_strategyactor_placement, ) ) runner SFTRunner( cfgcfg, actoractor_group, ) runner.init_workers() runner.run()ValueDataset根据帧索引读取Return标签Value Model的数据集实现位于rlinf/data/datasets/recap/value_model.py每次读取样本时代码先从原始LeRobot数据中获得episode_index、frame_index、图像、任务文本。然后使用episode_index和frame_index从Return Sidecar中读取标签raw float( self._sidecar[ep][return][fr] )如果找不到对应Episode代码会提示Sidecar或Tag不匹配if ep not in self._sidecar: raise KeyError( The sidecar/tag may not match the dataset )最终返回给模型的数据结构为result { images: images, prompt: prompt, target_values: target_value, actions: None, }Value Model不需要动作标签因此actions NoneReturnNormalizer将原始Return归一化到-1到0配置默认开启data: normalize_to_minus_one_zero: true对应实现为def normalize_value(self, value): denom ( abs(self.return_min) if self.return_min ! 0 else 1.0 ) return value / denom假设全局最小Return为-300return_max 0原始 Return -300 归一化 Value -1.0 原始 Return -150 归一化 Value -0.5 原始 Return 0 归一化 Value 0如果不同任务的最大轨迹长度差异很大全局最小Return可能主要由最长任务或失败惩罚决定。这样会把短任务的价值压缩到靠近0的较小区间内因此多任务训练时需要分别观察各任务的价值分布。Value Head使用201个离散区间预测价值默认配置为num_bins: 201 v_min: -1.0 v_max: 0.0模型输出201个Logitslogit_0, logit_1, ..., logit_200每一个Logit对应-1到0之间的一个价值区间。推理时先计算概率probs F.softmax( logits, dim-1, )再计算期望价值values ( probs * self.value_head.atoms ).sum(dim-1)可以理解为预测价值 每个价值区间 × 该区间的预测概率连续标签被分配到相邻的两个区间训练时连续目标值通常不会刚好落在某一个离散区间上。代码先计算目标值在离散区间中的位置b ( target_values - self.v_min ) / self.delta_z然后找到左右两个区间l b.floor().long() u b.ceil().long()监督概率按照距离分配target_probs[batch_idx, l] d_to_u target_probs[batch_idx, u] d_to_l假设目标值位于两个区间之间并且更靠近左区间左区间监督概率0.8 右区间监督概率0.2这种方式比直接选择最近区间更平滑也保留了连续Return中的相对距离信息。最终损失为loss -( target_probs * F.log_softmax(logits, dim-1) ).sum(dim-1)训练日志中除了Loss还会记录cat_acc_best cat_acc_neighbor mae value_spearman其中value_spearman用于衡量预测价值和真实Return的排序一致性。四、compute_advantages.py将价值变化转换为正负标签Advantage的计算入口为examples/recap/process/compute_advantages.py运行命令为bash examples/recap/process/run_compute_advantages.sh compute_advantages主要配置如下advantage: value_checkpoint: /path/to/value_checkpoint batch_size: 1024 flush_interval: 256 num_dataloader_workers_per_gpu: 12 prefetch_factor: 2 discount_next_value: true positive_quantile: 0.3 returns_tag: fail300 tag: fail300_N10_q30 data: advantage_lookahead_step: 10 gamma: 1.0ValueInferenceDataset统一构造模型输入不同机器人数据集使用的字段名称可能不同。例如LIBERO可能使用observation.image observation.wrist_image observation.stateFranka数据可能使用observation.images.front_cam observation.images.wrist_cam observation.state.tcp_poseRLinf通过KEY_MAPPINGS将它们转换成Value Model使用的统一格式KEY_MAPPINGS { libero: { observation.image: observation/image, observation.wrist_image: observation/wrist_image, observation.state: observation/state, task: prompt, } }ValueInferenceDataset每次返回{ obs: obs, global_idx: idx, episode_index: ep_idx, frame_index: frame_idx, true_return: true_return, reward: reward, }代码分两个阶段完成价值推理和优势计算第一阶段批量推理所有状态的价值batch_results value_model.infer_batch( obs_list, batch_sizebatch_size, )预测结果保存到数组v_values[local_idx] float( result[value] )第二阶段不再调用模型而是通过数组下标获取当前状态价值V(o_t)和N步之后的状态价值 V(o_tN)每一帧只需要执行一次Value Model推理。如果直接针对每一个样本分别推理当前状态和未来状态大部分中间帧会被重复计算两次。RLinf先统一推理再按下标复用可以显著减少计算量。分片推理会额外读取N个未来样本多GPU模式下每个进程只负责数据集的一部分shard_start, shard_end ( get_shard_indices( total_samples, rank, world_size, ) )但计算当前分片末尾样本时仍然需要访问未来N步的Value。因此代码会将推理范围向后扩展extended_end min( shard_end action_horizon, len(dataset), )例如当前GPU负责样本 0 到样本 999并设置action_horizon 10那么该GPU最多会推理到样本 1009多出来的10个样本只用于查询分片末尾位置的未来价值不会重复写入最终结果。N步Advantage同时考虑状态变化和动作成本核心代码为reward_sum normalize( reward_sum_raw ) gamma_k ( gamma**num_valid if discount_next_value else 1.0 ) advantage ( reward_sum gamma_k * v_next - v_curr )直接写成容易理解的形式Advantage 归一化后的未来N步累计奖励 折扣后的未来状态价值 - 当前状态价值其中v_curr为当前状态价值v_next为N步之后的状态价值reward_sum_raw为中间N步的原始 Reward总和num_valid为当前轨迹中实际可用的未来步数假设当前价值v_curr -0.40 10步后价值v_next -0.30 10步Reward总和 -10 全局Return范围 [-300, 0]归一化后的累计Reward为reward_sum -10 / 300 ≈ -0.033因此Advantage -0.033 (-0.30) - (-0.40) 0.067结果为正说明状态价值的提升超过了中间10步产生的时间成本。如果10步后状态反而变差v_curr -0.30 v_next -0.35则Advantage -0.033 (-0.35) - (-0.30) -0.083结果为负说明动作既消耗了时间又没有推动任务进展。gamma等于1时直接使用Return差值计算累计奖励当gamma1.0代码不需要重新读取并累计每一步Reward。如果N步后的状态仍在当前轨迹中reward_sum_raw ( true_return - next_return )原因是当前Return 未来N步Reward总和 N步后的Return移项后是未来N步Reward总和 当前Return - N步后的Return如果N步后已经超出轨迹末尾reward_sum_raw true_return v_next 0.0此时直接使用当前帧到轨迹结束的完整Return。当gamma不等于1时代码才会显式读取Reward序列reward_sum_raw np.sum( gamma_powers[:num_valid] * reward_slice )轨迹末尾不会跨Episode读取未来状态未来位置通过下面的方式计算next_gidx ( gidx action_horizon ) is_next_pad ( next_gidx ep_end )实际使用的Reward数量为num_valid min( action_horizon, ep_end - gidx, )连续Advantage通过全局阈值转换为标签每个样本先得到连续值advantage_continuous所有数据集计算完成后代码会合并 Advantagecombined_advantages np.concatenate( all_advantages )默认positive_quantile: 0.3表示将Advantage最高的30%作为正样本。因此实际阈值位于70%分位点unified_threshold np.percentile( combined_advantages, 70, )保存Rollout数据时save_df[advantage] ( save_df[advantage_continuous] threshold )保存SFT数据时if dataset_type sft: save_df[advantage] True结果通过临时Parquet分块写入优势计算可能包含数百万帧如果一直把所有结果保存在内存中容易产生OOM。代码根据flush_interval和batch_size定期将结果写入临时Parquettemp_df.to_parquet( temp_file, indexFalse, )写入后清空内存for k in results: results[k] [] gc.collect()所有分块处理完成后再统一合并merged_df pd.concat( [ pd.read_parquet(f) for f in temp_files ], ignore_indexTrue, )最终输出文件为meta/advantages_fail300_N10_q30.parquet主要字段包括episode_index frame_index return value_current value_next reward_sum reward_sum_raw num_valid_rewards advantage_continuous advantage dataset_name代码还会更新mixture_config.yaml记录全局Return范围、统一阈值、正样本比例配置和数据集信息主要用于实验记录与结果追踪。下一阶段的CFG训练直接读取advantages_{tag}.parquet中的布尔优势标签。记录信息如下global_return_min global_return_max unified_threshold positive_quantile 数据集名称与权重这些信息会在下一阶段的CFG策略训练中继续使用。
RLinf复现RECAP(一):从轨迹回报到优势标签
发布时间:2026/6/14 2:08:04
一、RLinf使用Sidecar文件连接三个处理阶段与RECAP数据处理相关的代码主要位于examples/recap/ ├── process/ │ ├── compute_returns.py │ ├── compute_advantages.py │ ├── run_compute_returns.sh │ ├── run_compute_advantages.sh │ └── config/ └── value/ ├── train_value.py ├── run_value_sft.sh └── config/libero_sft_value.yaml完整数据流如下LeRobot轨迹 - compute_returns.py - returns_{tag}.parquet - Value Model SFT - Value Model checkpoint - compute_advantages.py - advantages_{tag}.parquetRLinf不会把Return和Advantage直接写回原始轨迹文件而是保存成独立的Sidecar Parquet。例如,dataset/ ├── data/ │ └── chunk-000/ ├── meta/ │ ├── info.json │ ├── stats.json │ ├── returns_fail300.parquet │ └── advantages_fail300_N10_q30.parquet └── mixture_config.yamlReturn和Advantage依靠episode_index、frame_index字段与原始轨迹对齐。注意三个阶段必须使用相同的数据版本。重新过滤轨迹、删除帧或者重新编号后旧的Sidecar文件通常不能继续使用。SFT和Rollout数据采用不同的处理方式数据配置示例如下data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft weight: 1.0 - dataset_path: /path/to/rollout_dataset type: rollout weight: 1.0其中sft一般表示人工示范rollout表示策略自主运行产生的数据。二者差别如下SFT所有轨迹默认成功最终所有Advantage标签强制设为True。Rollout根据is_success区分成功和失败根据连续Advantage和阈值生成正负标签。二、compute_returns.py将轨迹结果转换为逐帧回报Return的计算入口是examples/recap/process/compute_returns.py启动命令为bash examples/recap/process/run_compute_returns.sh compute_returns核心配置如下data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft - dataset_path: /path/to/rollout_dataset type: rollout gamma: 1.0 failure_reward: -300.0 tag: fail300 num_workers: 128compute_returns_for_episode负责计算单条轨迹单条轨迹的Reward和Return由下面的函数计算def compute_returns_for_episode( episode_length, is_success, gamma, failure_reward, ): rewards np.full( episode_length, -1.0, dtypenp.float32, ) rewards[-1] ( 0.0 if is_success else failure_reward ) returns np.zeros( episode_length, dtypenp.float32, ) returns[-1] rewards[-1] for t in range( episode_length - 2, -1, -1, ): returns[t] ( rewards[t] gamma * returns[t 1] ) return returns, rewards奖励规则为任务仍在执行Reward -1 成功终止帧Reward 0 失败终止帧Reward failure_reward其中-1表示每执行一步产生一次时间成本较大的失败惩罚则用于保证失败轨迹的回报明显低于成功轨迹。Return从轨迹末尾向前累计当gamma1时当前帧Return 当前帧Reward 下一帧Return。假设一条成功轨迹包含三个普通步骤和一个成功终止帧帧 Reward Return 当前帧 -1 -3 下一帧 -1 -2 再下一帧 -1 -1 成功终止帧 0 0计算过程为成功终止帧Return 0 再下一帧Return -1 0 -1 下一帧Return -1 -1 -2 当前帧Return -1 -2 -3因此成功轨迹中的Return会逐渐接近0-3 → -2 → -1 → 0_process_single_parquet负责处理数据文件RLinf不会加载图像数据只读取计算Return所需的元数据列_READ_COLUMNS [ episode_index, frame_index, is_success, task_index, task, ]实际读取时还会检查当前Parquet中是否存在这些字段pf pq.ParquetFile(pq_file) available set(pf.schema_arrow.names) cols_to_read [ c for c in _READ_COLUMNS if c in available ] table pq.read_table( pq_file, columnscols_to_read, )由于没有读取图像Return计算主要是Parquet I/O和数组运算内存占用相对较低。代码根据episode_index的变化找到轨迹边界change_mask ( np.diff(episode_indices) ! 0 ) change_positions ( np.where(change_mask)[0] 1 )每条轨迹分别调用compute_returns_for_episode(...)处理SFT数据时代码直接将轨迹设为成功if dataset_type sft: is_success True处理Rollout数据时则读取轨迹最后一帧的is_successis_success bool( is_success_col[ep_end - 1] )多个Parquet文件通过线程并行处理process_dataset会递归查找数据目录中的所有Parquetparquet_files sorted( str(p) for p in data_dir.rglob(*.parquet) )随后使用线程池并行处理with ThreadPoolExecutor( max_workerseffective_workers ) as pool: fut pool.submit( _process_single_parquet, pq_file, dataset_type, gamma, failure_reward, tasks, )PyArrow文件读取时能够释放GIL因此这里使用线程也可以获得较好的并行效果。所有结果最终合并为一个Arrow Tablecombined pa.concat_tables( result_tables )当配置为tag: fail300输出文件为meta/returns_fail300.parquet主要字段包括episode_index frame_index return reward prompt脚本还会更新meta/stats.json meta/info.jsonstats.json中会记录{ return: { mean: ..., std: ..., min: ..., max: ... }, reward: { mean: ..., std: ..., min: ..., max: ... } }后面的Value Model和Advantage计算都需要使用这里的Return范围。三、Value Model将逐帧Return转换为状态价值价值模型训练入口是examples/recap/value/train_value.py运行命令为bash examples/recap/value/run_value_sft.sh libero_sft_valuetrain_value.py本身没有实现完整训练循环而是创建RLinf的SFT Worker和Runneractor_group ( FSDPValueSftWorker .create_group(cfg) .launch( cluster, namecfg.actor.group_name, placement_strategyactor_placement, ) ) runner SFTRunner( cfgcfg, actoractor_group, ) runner.init_workers() runner.run()ValueDataset根据帧索引读取Return标签Value Model的数据集实现位于rlinf/data/datasets/recap/value_model.py每次读取样本时代码先从原始LeRobot数据中获得episode_index、frame_index、图像、任务文本。然后使用episode_index和frame_index从Return Sidecar中读取标签raw float( self._sidecar[ep][return][fr] )如果找不到对应Episode代码会提示Sidecar或Tag不匹配if ep not in self._sidecar: raise KeyError( The sidecar/tag may not match the dataset )最终返回给模型的数据结构为result { images: images, prompt: prompt, target_values: target_value, actions: None, }Value Model不需要动作标签因此actions NoneReturnNormalizer将原始Return归一化到-1到0配置默认开启data: normalize_to_minus_one_zero: true对应实现为def normalize_value(self, value): denom ( abs(self.return_min) if self.return_min ! 0 else 1.0 ) return value / denom假设全局最小Return为-300return_max 0原始 Return -300 归一化 Value -1.0 原始 Return -150 归一化 Value -0.5 原始 Return 0 归一化 Value 0如果不同任务的最大轨迹长度差异很大全局最小Return可能主要由最长任务或失败惩罚决定。这样会把短任务的价值压缩到靠近0的较小区间内因此多任务训练时需要分别观察各任务的价值分布。Value Head使用201个离散区间预测价值默认配置为num_bins: 201 v_min: -1.0 v_max: 0.0模型输出201个Logitslogit_0, logit_1, ..., logit_200每一个Logit对应-1到0之间的一个价值区间。推理时先计算概率probs F.softmax( logits, dim-1, )再计算期望价值values ( probs * self.value_head.atoms ).sum(dim-1)可以理解为预测价值 每个价值区间 × 该区间的预测概率连续标签被分配到相邻的两个区间训练时连续目标值通常不会刚好落在某一个离散区间上。代码先计算目标值在离散区间中的位置b ( target_values - self.v_min ) / self.delta_z然后找到左右两个区间l b.floor().long() u b.ceil().long()监督概率按照距离分配target_probs[batch_idx, l] d_to_u target_probs[batch_idx, u] d_to_l假设目标值位于两个区间之间并且更靠近左区间左区间监督概率0.8 右区间监督概率0.2这种方式比直接选择最近区间更平滑也保留了连续Return中的相对距离信息。最终损失为loss -( target_probs * F.log_softmax(logits, dim-1) ).sum(dim-1)训练日志中除了Loss还会记录cat_acc_best cat_acc_neighbor mae value_spearman其中value_spearman用于衡量预测价值和真实Return的排序一致性。四、compute_advantages.py将价值变化转换为正负标签Advantage的计算入口为examples/recap/process/compute_advantages.py运行命令为bash examples/recap/process/run_compute_advantages.sh compute_advantages主要配置如下advantage: value_checkpoint: /path/to/value_checkpoint batch_size: 1024 flush_interval: 256 num_dataloader_workers_per_gpu: 12 prefetch_factor: 2 discount_next_value: true positive_quantile: 0.3 returns_tag: fail300 tag: fail300_N10_q30 data: advantage_lookahead_step: 10 gamma: 1.0ValueInferenceDataset统一构造模型输入不同机器人数据集使用的字段名称可能不同。例如LIBERO可能使用observation.image observation.wrist_image observation.stateFranka数据可能使用observation.images.front_cam observation.images.wrist_cam observation.state.tcp_poseRLinf通过KEY_MAPPINGS将它们转换成Value Model使用的统一格式KEY_MAPPINGS { libero: { observation.image: observation/image, observation.wrist_image: observation/wrist_image, observation.state: observation/state, task: prompt, } }ValueInferenceDataset每次返回{ obs: obs, global_idx: idx, episode_index: ep_idx, frame_index: frame_idx, true_return: true_return, reward: reward, }代码分两个阶段完成价值推理和优势计算第一阶段批量推理所有状态的价值batch_results value_model.infer_batch( obs_list, batch_sizebatch_size, )预测结果保存到数组v_values[local_idx] float( result[value] )第二阶段不再调用模型而是通过数组下标获取当前状态价值V(o_t)和N步之后的状态价值 V(o_tN)每一帧只需要执行一次Value Model推理。如果直接针对每一个样本分别推理当前状态和未来状态大部分中间帧会被重复计算两次。RLinf先统一推理再按下标复用可以显著减少计算量。分片推理会额外读取N个未来样本多GPU模式下每个进程只负责数据集的一部分shard_start, shard_end ( get_shard_indices( total_samples, rank, world_size, ) )但计算当前分片末尾样本时仍然需要访问未来N步的Value。因此代码会将推理范围向后扩展extended_end min( shard_end action_horizon, len(dataset), )例如当前GPU负责样本 0 到样本 999并设置action_horizon 10那么该GPU最多会推理到样本 1009多出来的10个样本只用于查询分片末尾位置的未来价值不会重复写入最终结果。N步Advantage同时考虑状态变化和动作成本核心代码为reward_sum normalize( reward_sum_raw ) gamma_k ( gamma**num_valid if discount_next_value else 1.0 ) advantage ( reward_sum gamma_k * v_next - v_curr )直接写成容易理解的形式Advantage 归一化后的未来N步累计奖励 折扣后的未来状态价值 - 当前状态价值其中v_curr为当前状态价值v_next为N步之后的状态价值reward_sum_raw为中间N步的原始 Reward总和num_valid为当前轨迹中实际可用的未来步数假设当前价值v_curr -0.40 10步后价值v_next -0.30 10步Reward总和 -10 全局Return范围 [-300, 0]归一化后的累计Reward为reward_sum -10 / 300 ≈ -0.033因此Advantage -0.033 (-0.30) - (-0.40) 0.067结果为正说明状态价值的提升超过了中间10步产生的时间成本。如果10步后状态反而变差v_curr -0.30 v_next -0.35则Advantage -0.033 (-0.35) - (-0.30) -0.083结果为负说明动作既消耗了时间又没有推动任务进展。gamma等于1时直接使用Return差值计算累计奖励当gamma1.0代码不需要重新读取并累计每一步Reward。如果N步后的状态仍在当前轨迹中reward_sum_raw ( true_return - next_return )原因是当前Return 未来N步Reward总和 N步后的Return移项后是未来N步Reward总和 当前Return - N步后的Return如果N步后已经超出轨迹末尾reward_sum_raw true_return v_next 0.0此时直接使用当前帧到轨迹结束的完整Return。当gamma不等于1时代码才会显式读取Reward序列reward_sum_raw np.sum( gamma_powers[:num_valid] * reward_slice )轨迹末尾不会跨Episode读取未来状态未来位置通过下面的方式计算next_gidx ( gidx action_horizon ) is_next_pad ( next_gidx ep_end )实际使用的Reward数量为num_valid min( action_horizon, ep_end - gidx, )连续Advantage通过全局阈值转换为标签每个样本先得到连续值advantage_continuous所有数据集计算完成后代码会合并 Advantagecombined_advantages np.concatenate( all_advantages )默认positive_quantile: 0.3表示将Advantage最高的30%作为正样本。因此实际阈值位于70%分位点unified_threshold np.percentile( combined_advantages, 70, )保存Rollout数据时save_df[advantage] ( save_df[advantage_continuous] threshold )保存SFT数据时if dataset_type sft: save_df[advantage] True结果通过临时Parquet分块写入优势计算可能包含数百万帧如果一直把所有结果保存在内存中容易产生OOM。代码根据flush_interval和batch_size定期将结果写入临时Parquettemp_df.to_parquet( temp_file, indexFalse, )写入后清空内存for k in results: results[k] [] gc.collect()所有分块处理完成后再统一合并merged_df pd.concat( [ pd.read_parquet(f) for f in temp_files ], ignore_indexTrue, )最终输出文件为meta/advantages_fail300_N10_q30.parquet主要字段包括episode_index frame_index return value_current value_next reward_sum reward_sum_raw num_valid_rewards advantage_continuous advantage dataset_name代码还会更新mixture_config.yaml记录全局Return范围、统一阈值、正样本比例配置和数据集信息主要用于实验记录与结果追踪。下一阶段的CFG训练直接读取advantages_{tag}.parquet中的布尔优势标签。记录信息如下global_return_min global_return_max unified_threshold positive_quantile 数据集名称与权重这些信息会在下一阶段的CFG策略训练中继续使用。