Skip to main content
rLLM’s eval framework is built around two protocols: AgentFlow runs an agent on a task and produces an Episode, and Evaluator scores that Episode. Together they form a clean separation between agent logic and evaluation logic.

AgentFlow

An AgentFlow is any callable that takes a task and returns an Episode. It is the eval-side equivalent of a Workflow (used during training), but has no training dependencies — it only needs a base_url and model to make LLM calls.
@runtime_checkable
class AgentFlow(Protocol):
    def run(self, task: Task, config: AgentConfig) -> Episode: ...
An AgentFlow may orchestrate one or many agents internally. Each agent contributes one or more Trajectories to the returned Episode. Implementations can provide run (sync) or arun (async) — the runner prefers arun when available.

Task and AgentConfig

The two arguments passed to every AgentFlow call:
@dataclass
class Task:
    data: dict                  # Raw dataset row
    spec: TaskSpec | None       # Optional structured task specification

@dataclass
class AgentConfig:
    base_url: str               # LLM API endpoint
    model: str                  # Model name
    session_uid: str            # Unique session identifier
    metadata: dict              # Extra configuration
Task wraps a raw dataset row. Agents can use task.spec for structured rendering or fall back to reading task.data directly.

Example AgentFlow

A minimal math agent that solves a problem in one turn:
from rllm.types import Episode, Trajectory, Step
from openai import OpenAI

class MathAgentFlow:
    def run(self, task, config):
        client = OpenAI(base_url=config.base_url, api_key="unused")

        question = task.data["question"]
        response = client.chat.completions.create(
            model=config.model,
            messages=[{"role": "user", "content": question}],
        )
        answer = response.choices[0].message.content

        step = Step(input=question, output=answer)
        trajectory = Trajectory(steps=[step], task=task.data)
        return Episode(task=task.data, trajectories=[trajectory])

Evaluator

An Evaluator scores an Episode produced by an AgentFlow. It examines the task and the episode’s trajectories, then returns an EvalOutput with a reward and correctness flag.
@runtime_checkable
class Evaluator(Protocol):
    def evaluate(self, task: dict, episode: Episode) -> EvalOutput: ...

EvalOutput

The result of evaluating a single episode:
@dataclass
class EvalOutput:
    reward: float                           # Scalar reward
    is_correct: bool                        # Whether the agent succeeded
    signals: list[Signal] = []              # Named sub-scores
    metadata: dict = {}                     # Extra data (e.g., extracted answer)

@dataclass
class Signal:
    name: str           # e.g., "accuracy", "format", "f1"
    value: float        # Typically 0.0–1.0
    metadata: dict = {}
signals lets an evaluator report multiple dimensions of quality. For example, an IFEval evaluator might return separate signals for instruction-following accuracy and format compliance.

Example Evaluator

A simple exact-match evaluator for math:
from rllm.experimental.eval.types import EvalOutput

class ExactMatchEvaluator:
    def evaluate(self, task, episode):
        expected = str(task["ground_truth"])
        actual = episode.trajectories[0].steps[-1].output or ""

        # Extract boxed answer if present
        if "\\boxed{" in actual:
            actual = actual.split("\\boxed{")[1].split("}")[0]

        is_correct = actual.strip() == expected.strip()
        return EvalOutput(
            reward=1.0 if is_correct else 0.0,
            is_correct=is_correct,
        )

Built-in Evaluators

EvaluatorMethodUse case
ExactMatchEvaluatorString comparisonMath, factoid QA
LLMJudgeEvaluatorLLM-as-judgeOpen-ended generation, summarization
BFCLEvaluatorFunction call matchingTool-use benchmarks
IFEvalEvaluatorInstruction-following checksInstruction-following benchmarks

How they work together

The eval runner connects AgentFlow and Evaluator in a simple pipeline:
Dataset → AgentFlow.run(task) → Episode → Evaluator.evaluate(task, episode) → EvalOutput
1

Load dataset

The runner loads tasks from the dataset catalog or a custom source.
2

Run agent

For each task, the runner calls agent_flow.run(task, config) to produce an Episode.
3

Score results

The runner passes each Episode to evaluator.evaluate(task, episode) to get an EvalOutput.
4

Aggregate

Rewards and signals are aggregated into an EvalResult with overall score, per-example breakdowns, and signal averages.
From the CLI, this entire pipeline runs with a single command:
rllm eval gsm8k --model Qwen/Qwen3-8B
The CLI resolves the appropriate AgentFlow and Evaluator from the benchmark name, pulls the dataset from HuggingFace, and runs the pipeline.

Connection to training

AgentFlow and Evaluator are designed to be reusable across eval and training. During training:
  • The AgentFlow logic is replaced by a Workflow that also captures token-level data (prompt IDs, logprobs) needed for policy gradients
  • The Evaluator logic maps directly to reward functions — the same scoring logic that produces EvalOutput.reward during eval produces the reward signal during training
  • The Episode structure is identical in both paths, so evaluation results are directly comparable to training metrics