指标跟踪#
AReaL 提供统一的指标跟踪系统,处理分布式训练和 Rollout 工作器的统计信息收集。该系统支持针对其各自使用场景优化的两种不同范式:流式指标用于异步 Rollout 工作流,批量指标用于同步训练更新。
核心组件#
指标系统围绕 areal.utils.stats_tracker 构建,提供:
命名跟踪器:不同组件的隔离指标命名空间
层级作用域:将指标组织成逻辑组
分布式聚合:跨工作器自动归约
多种归约类型:支持平均值、求和、最小/最大值和标量
from areal.utils import stats_tracker
# 默认跟踪器(训练指标)
stats_tracker.scalar(learning_rate=0.001)
# 命名跟踪器(Rollout 指标)
stats_tracker.get("rollout").scalar(reward=0.5)
两种日志范式#
流式指标(Rollout 工作器)#
Rollout 工作器异步执行工作流,每个工作流独立记录指标。这种流式方法自然地处理可变完成时间。
特性:
每个工作流在完成时单独记录标量
指标在工作器进程内的列表中累积
记录期间工作器之间无需同步
归约在导出时通过控制器完成
来自 RLVRWorkflow 的示例:
# areal/workflow/rlvr.py
async def _collect_samples(self, engine, req, prompt_str, task_data):
resp = await engine.agenerate(req)
reward = await self._compute_rewards(resp, prompt_str, task_data)
# 记录单个标量 - 追加到内部列表
# `workflow_context.stat_scope()` 自动区分评估/训练作用域
stats_tracker.get(workflow_context.stat_scope()).scalar(reward=reward)
return resp, reward
你可以在自定义工作流中记录任何其他标量,例如:
async def run(self, data, **extra_kwargs):
# `workflow_context.stat_scope()` 自动区分评估/训练作用域
stats_tracker.get(workflow_context.stat_scope()).scalar(num_turns=num_turns, max_tokens=max_tokens, reward=reward)
return reward
控制器聚合:
RolloutController 从所有工作器收集统计信息并计算加权平均值:
# areal/infra/controller/rollout_controller.py
def export_stats(self) -> dict[str, float]:
all_raw_stats = self._collective_rpc(method="export_stats")
# 使用计数作为权重进行聚合
stats, counts = defaultdict(float), defaultdict(int)
for raw_stats in all_raw_stats:
for k, v in raw_stats.items():
if k.endswith("__count"):
counts[k] += v
else:
stats[k] += v * raw_stats.get(k + "__count", 0)
# 计算加权平均值
return {k: v / counts[k + "__count"] for k, v in stats.items()
if counts.get(k + "__count", 0) > 0}
批量指标(训练引擎)#
训练引擎在数据并行 rank 之间同步处理批次。指标作为带布尔掩码的张量记录,在导出时跨所有 rank 归约。
特性:
记录带分母掩码的完整批次张量
支持每 token 和每序列统计
All-reduce 同步确保各 rank 统计信息一致
多种归约类型:
AVG_MIN_MAX、AVG、SUM、MIN、MAX
来自 PPOActor 的示例:
# areal/trainer/ppo/actor.py
def ppo_update(self, data):
loss_mask = data["loss_mask"].bool()
reward_score = data["rewards"]
# 定义分母(布尔掩码)
stats_tracker.denominator(
n_seqs=torch.ones_like(reward_score, dtype=torch.bool),
n_valid_tokens=loss_mask,
)
# 使用分母引用记录张量指标
stats_tracker.stat(
advantages=data["advantages"], # [batch, seq_len]
kl_rewards=data["kl_rewards"], # [batch, seq_len]
denominator="n_valid_tokens"
)
stats_tracker.stat(
task_reward=reward_score.float(), # [batch]
seq_len=seqlens.float(), # [batch]
denominator="n_seqs"
)
导出行为:
# areal/engine/fsdp_engine.py
def export_stats(self) -> dict[str, float]:
# 跨数据并行组 all-reduce
return stats_tracker.export_all(reduce_group=self.data_parallel_group)
# 所有 DP rank 接收相同的结果
API 参考#
记录方法#
方法 |
使用场景 |
示例 |
|---|---|---|
|
单个浮点值 |
|
|
定义布尔掩码 |
|
|
带掩码的张量指标 |
|
归约类型#
使用 stat() 时,指标默认为 AVG_MIN_MAX,生成三个输出键:
stats_tracker.stat(loss=tensor, denominator="valid")
# 导出:{"loss/avg": 0.5, "loss/min": 0.1, "loss/max": 0.9}
可用的归约类型:
类型 |
输出 |
描述 |
|---|---|---|
|
|
张量统计的默认值 |
|
|
仅加权平均值 |
|
|
所有元素求和 |
|
|
最小值 |
|
|
最大值 |
|
|
用于标量值 |
作用域#
使用层级作用域组织相关指标:
with stats_tracker.scope("ppo_actor"):
with stats_tracker.scope("update"):
stats_tracker.stat(loss=loss_tensor, denominator="valid")
# 键:"ppo_actor/update/loss/avg"
计时#
使用 timeperf/ 下的自动作用域测量执行时间:
with stats_tracker.record_timing("rollout"):
batch = actor.prepare_batch(dataloader, workflow)
# 键:"timeperf/rollout"
命名跟踪器#
为不同组件隔离指标:
# 训练指标(默认跟踪器)
stats_tracker.scalar(grad_norm=1.5)
# Rollout 指标
stats_tracker.get("rollout").scalar(reward=0.8)
# 评估指标
stats_tracker.get("eval-rollout").scalar(reward=0.9)
# 从所有跟踪器导出
all_stats = stats_tracker.export_all(reduce_group=group)
如果设置 evaluator.eval_before_train: true,系统会在第一个训练 step 开始前执行一次评估,以便在微调开始前估计初始模型的性能。
数据流#
从收集到记录的完整指标流程:
Rollout 工作器 训练工作器
─────────────── ───────────────
workflow.arun_episode() actor.ppo_update(batch)
│ │
▼ ▼
get("rollout").scalar(r=0.5) stat(tensor, denom=mask)
│ │
▼ ▼
export_stats(reduce_group=None) export_stats(reduce_group=dp_group)
{reward: 0.5, reward__count: 1} → all_reduce 跨 DP rank
│ │
▼ │
RolloutController.export_stats() │
→ 加权平均跨工作器 │
│ │
└────────────────┬───────────────────────┘
▼
PPOTrainer._export_and_commit_stats()
│
▼
StatsLogger.commit(stats)
│
┌────────────┼────────────┐
▼ ▼ ▼
wandb tensorboard swanlab
StatsLogger:日志后端#
StatsLogger
将聚合指标发送到外部日志后端。它由 PPOTrainer 自动管理,仅在 rank 0 运行以避免重复日志。
支持的后端#
后端 |
配置 |
描述 |
|---|---|---|
Weights & Biases |
|
云端实验跟踪 |
SwanLab |
|
替代实验跟踪 |
TensorBoard |
|
本地可视化 |
与 PPOTrainer 集成#
训练器在每个训练步结束时调用 StatsLogger.commit():
# areal/trainer/rl_trainer.py
def _export_and_commit_stats(self, epoch, epoch_step, global_step):
# 1. 从所有组件收集指标
stats = self.actor.export_stats() # 训练指标(all-reduced)
stats.update(self.rollout.export_stats()) # Rollout 指标(控制器聚合)
stats.update(self.eval_rollout.export_stats()) # 评估指标
# 2. 发送到日志后端(仅 rank 0)
self.stats_logger.commit(epoch, epoch_step, global_step, stats)
StatsLogger.commit()#
commit() 方法过滤掉内部计数键并记录到所有配置的后端:
# areal/utils/stats_logger.py
def commit(self, epoch, step, global_step, data):
if dist.is_initialized() and dist.get_rank() != 0:
return # 仅 rank 0 记录
# 过滤掉 __count 键(用于内部加权平均)
data = {k: v for k, v in data.items() if not k.endswith("__count")}
# 记录到所有后端
wandb.log(data, step=global_step)
swanlab.log(data, step=global_step)
if self.summary_writer:
for key, val in data.items():
self.summary_writer.add_scalar(key, val, global_step)
配置#
在实验配置中配置日志后端:
stats_logger:
experiment_name: "gsm8k_grpo"
trial_name: "run_001"
fileroot: "/path/to/logs"
wandb:
mode: "online" # "online"、"offline" 或 "disabled"
project: "my-project"
entity: "my-team"
swanlab:
mode: "online" # "online"、"local" 或 "disabled"
project: "my-project"
tensorboard:
path: "/path/to/tensorboard/logs" # null 禁用
最佳实践#
选择正确的范式:对标量使用
scalar(),对批量 PyTorch 张量(通常是训练指标)使用带分母的stat()。先定义分母:始终在
stat()之前调用denominator()来建立掩码关系。使用命名跟踪器:使用
stats_tracker.get(workflow_context.stat_scope()).scalar(...)将 Rollout("rollout")和评估("eval-rollout")指标与训练指标隔离。