指标跟踪#

AReaL 提供统一的指标跟踪系统,处理分布式训练和 Rollout 工作器的统计信息收集。该系统支持针对其各自使用场景优化的两种不同范式:流式指标用于异步 Rollout 工作流,批量指标用于同步训练更新。

核心组件#

指标系统围绕 areal.utils.stats_tracker 构建,提供:

  • 命名跟踪器:不同组件的隔离指标命名空间

  • 层级作用域:将指标组织成逻辑组

  • 分布式聚合:跨工作器自动归约

  • 多种归约类型:支持平均值、求和、最小/最大值和标量

from areal.utils import stats_tracker

# 默认跟踪器(训练指标)
stats_tracker.scalar(learning_rate=0.001)

# 命名跟踪器(Rollout 指标)
stats_tracker.get("rollout").scalar(reward=0.5)

两种日志范式#

流式指标(Rollout 工作器)#

Rollout 工作器异步执行工作流,每个工作流独立记录指标。这种流式方法自然地处理可变完成时间。

特性:

  • 每个工作流在完成时单独记录标量

  • 指标在工作器进程内的列表中累积

  • 记录期间工作器之间无需同步

  • 归约在导出时通过控制器完成

来自 RLVRWorkflow 的示例:

# areal/workflow/rlvr.py
async def _collect_samples(self, engine, req, prompt_str, task_data):
    resp = await engine.agenerate(req)
    reward = await self._compute_rewards(resp, prompt_str, task_data)

    # 记录单个标量 - 追加到内部列表
    # `workflow_context.stat_scope()` 自动区分评估/训练作用域
    stats_tracker.get(workflow_context.stat_scope()).scalar(reward=reward)

    return resp, reward

你可以在自定义工作流中记录任何其他标量,例如:

async def run(self, data, **extra_kwargs):
    # `workflow_context.stat_scope()` 自动区分评估/训练作用域
    stats_tracker.get(workflow_context.stat_scope()).scalar(num_turns=num_turns, max_tokens=max_tokens, reward=reward)
    return reward

控制器聚合:

RolloutController 从所有工作器收集统计信息并计算加权平均值:

# areal/infra/controller/rollout_controller.py
def export_stats(self) -> dict[str, float]:
    all_raw_stats = self._collective_rpc(method="export_stats")

    # 使用计数作为权重进行聚合
    stats, counts = defaultdict(float), defaultdict(int)
    for raw_stats in all_raw_stats:
        for k, v in raw_stats.items():
            if k.endswith("__count"):
                counts[k] += v
            else:
                stats[k] += v * raw_stats.get(k + "__count", 0)

    # 计算加权平均值
    return {k: v / counts[k + "__count"] for k, v in stats.items()
            if counts.get(k + "__count", 0) > 0}

批量指标(训练引擎)#

训练引擎在数据并行 rank 之间同步处理批次。指标作为带布尔掩码的张量记录,在导出时跨所有 rank 归约。

特性:

  • 记录带分母掩码的完整批次张量

  • 支持每 token 和每序列统计

  • All-reduce 同步确保各 rank 统计信息一致

  • 多种归约类型:AVG_MIN_MAXAVGSUMMINMAX

来自 PPOActor 的示例:

# areal/trainer/ppo/actor.py
def ppo_update(self, data):
    loss_mask = data["loss_mask"].bool()
    reward_score = data["rewards"]

    # 定义分母(布尔掩码)
    stats_tracker.denominator(
        n_seqs=torch.ones_like(reward_score, dtype=torch.bool),
        n_valid_tokens=loss_mask,
    )

    # 使用分母引用记录张量指标
    stats_tracker.stat(
        advantages=data["advantages"],      # [batch, seq_len]
        kl_rewards=data["kl_rewards"],      # [batch, seq_len]
        denominator="n_valid_tokens"
    )

    stats_tracker.stat(
        task_reward=reward_score.float(),   # [batch]
        seq_len=seqlens.float(),            # [batch]
        denominator="n_seqs"
    )

导出行为:

# areal/engine/fsdp_engine.py
def export_stats(self) -> dict[str, float]:
    # 跨数据并行组 all-reduce
    return stats_tracker.export_all(reduce_group=self.data_parallel_group)
    # 所有 DP rank 接收相同的结果

API 参考#

记录方法#

方法

使用场景

示例

scalar(**kwargs)

单个浮点值

scalar(lr=0.001, eps=0.2)

denominator(**kwargs)

定义布尔掩码

denominator(valid=mask.bool())

stat(denominator, **kwargs)

带掩码的张量指标

stat(loss=tensor, denominator="valid")

归约类型#

使用 stat() 时,指标默认为 AVG_MIN_MAX,生成三个输出键:

stats_tracker.stat(loss=tensor, denominator="valid")
# 导出:{"loss/avg": 0.5, "loss/min": 0.1, "loss/max": 0.9}

可用的归约类型:

类型

输出

描述

AVG_MIN_MAX

key/avg, key/min, key/max

张量统计的默认值

AVG

key

仅加权平均值

SUM

key

所有元素求和

MIN

key

最小值

MAX

key

最大值

SCALAR

key, key__count

用于标量值

作用域#

使用层级作用域组织相关指标:

with stats_tracker.scope("ppo_actor"):
    with stats_tracker.scope("update"):
        stats_tracker.stat(loss=loss_tensor, denominator="valid")
        # 键:"ppo_actor/update/loss/avg"

计时#

使用 timeperf/ 下的自动作用域测量执行时间:

with stats_tracker.record_timing("rollout"):
    batch = actor.prepare_batch(dataloader, workflow)
# 键:"timeperf/rollout"

命名跟踪器#

为不同组件隔离指标:

# 训练指标(默认跟踪器)
stats_tracker.scalar(grad_norm=1.5)

# Rollout 指标
stats_tracker.get("rollout").scalar(reward=0.8)

# 评估指标
stats_tracker.get("eval-rollout").scalar(reward=0.9)

# 从所有跟踪器导出
all_stats = stats_tracker.export_all(reduce_group=group)

如果设置 evaluator.eval_before_train: true,系统会在第一个训练 step 开始前执行一次评估,以便在微调开始前估计初始模型的性能。

数据流#

从收集到记录的完整指标流程:

Rollout 工作器                          训练工作器
───────────────                          ───────────────
workflow.arun_episode()                  actor.ppo_update(batch)
        │                                        │
        ▼                                        ▼
get("rollout").scalar(r=0.5)             stat(tensor, denom=mask)
        │                                        │
        ▼                                        ▼
export_stats(reduce_group=None)          export_stats(reduce_group=dp_group)
{reward: 0.5, reward__count: 1}          → all_reduce 跨 DP rank
        │                                        │
        ▼                                        │
RolloutController.export_stats()                 │
→ 加权平均跨工作器                               │
        │                                        │
        └────────────────┬───────────────────────┘
                         ▼
          PPOTrainer._export_and_commit_stats()
                         │
                         ▼
              StatsLogger.commit(stats)
                         │
            ┌────────────┼────────────┐
            ▼            ▼            ▼
          wandb     tensorboard    swanlab

StatsLogger:日志后端#

StatsLogger 将聚合指标发送到外部日志后端。它由 PPOTrainer 自动管理,仅在 rank 0 运行以避免重复日志。

支持的后端#

后端

配置

描述

Weights & Biases

config.stats_logger.wandb

云端实验跟踪

SwanLab

config.stats_logger.swanlab

替代实验跟踪

TensorBoard

config.stats_logger.tensorboard

本地可视化

与 PPOTrainer 集成#

训练器在每个训练步结束时调用 StatsLogger.commit()

# areal/trainer/rl_trainer.py
def _export_and_commit_stats(self, epoch, epoch_step, global_step):
    # 1. 从所有组件收集指标
    stats = self.actor.export_stats()           # 训练指标(all-reduced)
    stats.update(self.rollout.export_stats())   # Rollout 指标(控制器聚合)
    stats.update(self.eval_rollout.export_stats())  # 评估指标

    # 2. 发送到日志后端(仅 rank 0)
    self.stats_logger.commit(epoch, epoch_step, global_step, stats)

StatsLogger.commit()#

commit() 方法过滤掉内部计数键并记录到所有配置的后端:

# areal/utils/stats_logger.py
def commit(self, epoch, step, global_step, data):
    if dist.is_initialized() and dist.get_rank() != 0:
        return  # 仅 rank 0 记录

    # 过滤掉 __count 键(用于内部加权平均)
    data = {k: v for k, v in data.items() if not k.endswith("__count")}

    # 记录到所有后端
    wandb.log(data, step=global_step)
    swanlab.log(data, step=global_step)
    if self.summary_writer:
        for key, val in data.items():
            self.summary_writer.add_scalar(key, val, global_step)

配置#

在实验配置中配置日志后端:

stats_logger:
  experiment_name: "gsm8k_grpo"
  trial_name: "run_001"
  fileroot: "/path/to/logs"

  wandb:
    mode: "online"  # "online"、"offline" 或 "disabled"
    project: "my-project"
    entity: "my-team"

  swanlab:
    mode: "online"  # "online"、"local" 或 "disabled"
    project: "my-project"

  tensorboard:
    path: "/path/to/tensorboard/logs"  # null 禁用

最佳实践#

  1. 选择正确的范式:对标量使用 scalar(),对批量 PyTorch 张量(通常是训练指标)使用带分母的 stat()

  2. 先定义分母:始终在 stat() 之前调用 denominator() 来建立掩码关系。

  3. 使用命名跟踪器:使用 stats_tracker.get(workflow_context.stat_scope()).scalar(...) 将 Rollout("rollout")和评估("eval-rollout")指标与训练指标隔离。