Skip to main content
AgentFlow is the recommended way to author an agent in rLLM. An AgentFlow is a plain async function that takes a Task and an AgentConfig and returns an Episode, a single Trajectory, or None. The same function runs both for evaluation and for training — at training time and at eval time the runner routes config.base_url through a model gateway that captures token IDs and logprobs transparently, so the flow code itself doesn’t change. For a conceptual walkthrough see AgentFlow & Evaluator; for worked examples see cookbooks/.

Eval and training share one engine

Both rllm eval and rllm train drive rllm.engine.agentflow_engine.AgentFlowEngine. The same _run_single loop is used end-to-end: gateway session → run flow → fetch traces → enrich Episode → evaluate. The eval-specific concerns (per-task verifier resolution, sandbox lifecycle) plug in via the engine’s optional TaskHooks parameter:
class TaskHooks(Protocol):
    def setup(self, task: Task, agent_flow: AgentFlow, uid: str) -> TaskContext: ...
Eval installs rllm.hooks.SandboxTaskHooks, which detects each task’s [verifier] block, builds a sandbox if needed, and resolves a per-task evaluator. Training leaves hooks=None and uses a single engine-bound evaluator. After the refactor that introduced the unified engine, rllm eval returns Episodes whose Steps are populated from gateway traces — flows that return None work identically at eval and training time. For training agents that need a sandbox per rollout (sandboxed code agents, harbor tasks), wire the same hook style at trainer construction time. The engine handles per-rollout setup/teardown in a try/finally so retries get fresh sandboxes automatically.

The protocol

from rllm.types import AgentFlow, AgentConfig, Episode, Task, Trajectory

@runtime_checkable
class AgentFlow(Protocol):
    def run(self, task: Task, config: AgentConfig) -> Episode | Trajectory | None: ...
An implementation may provide either run (sync) or arun (async). The runner prefers arun when running inside an event loop. In practice you almost always write the async form. For single-agent flows, returning None is the simplest path — the framework builds an Episode with one Trajectory, and gateway-captured traces fill in the Steps. For multi-trajectory flows (e.g. solver / judge), return an explicit Episode with named trajectories so the trainer can group them for advantage computation.

@rllm.rollout decorator

The simplest way to satisfy the AgentFlow protocol is to decorate a plain function:
import rllm
from rllm.types import AgentConfig, Episode, Task

@rllm.rollout(name="my-agent")
async def my_flow(task: Task, config: AgentConfig) -> Episode:
    # ...build messages, call the LLM, package an Episode
    ...
The decorator returns an AgentFlowFn object that exposes .run() (sync, blocks until done) and .arun() (async). Both are usable directly; the trainer/runner calls them automatically.

Bare and parameterized forms

@rllm.rollout                          # bare — uses default trajectory name "solver"
@rllm.rollout(name="solver")           # parameterized
@rllm.rollout(name="solver", register="my_agent")   # also auto-registers under entry-point group
The name is what shows up on Trajectory.name when the framework auto-builds a trajectory (i.e. when the function returns None or a Trajectory whose name is unset). It is also the role the trainer uses to group rollouts of the same task into a TrajectoryGroup for advantage computation, so it must be stable across rollouts.

Return-value coercion

The same coercion applies whether you use @rllm.rollout or implement the AgentFlow protocol directly on a class — both go through rllm.types._coerce_to_episode.
Function returnsWrapped as
Episodepassed through (multi-trajectory flows must use this)
TrajectoryEpisode(trajectories=[t]). The trajectory is left untouched — the evaluator parses whatever the user put on it.
NoneEpisode(trajectories=[Trajectory(name=…, steps=[])]). Gateway traces fill in the Steps during enrichment; the evaluator reads what it needs from those steps (e.g. step.model_response, step.chat_completions).
Anything else raises TypeError. The canonical patterns are: return None for single-agent flows where the gateway captures everything, and return Episode(...) when you need explicit artifacts or multiple named trajectories — see cookbooks/solver_judge_flow/.

Task

The first argument to every AgentFlow.run:
@dataclass
class Task:
    id: str                              # Stable identifier (row index, task name, ...)
    instruction: str | list[dict]        # What the agent sees (text or multimodal blocks)
    metadata: dict[str, Any]             # Ground truth, MCQ choices, parsed task.toml, ...
    dataset_dir: Path                    # Where dataset.toml lives (for sandbox tasks)
    sub_dir: Path | None                 # Per-task subdir (sandbox); None for data tasks
Task is pure data. The instruction is rendered ahead of time (from a JSONL row, an instruction.md, or an instruction.md.tpl template). metadata carries everything the verifier or the flow needs at runtime — the source row for catalog datasets, the parsed task.toml for sandbox tasks, the gym-env config for cookbooks/frozenlake.

AgentConfig

The second argument:
@dataclass
class AgentConfig:
    base_url: str       # OpenAI-compatible endpoint URL
    model: str          # Model name to pass to chat.completions.create
    session_uid: str    # Unique session identifier (used for trace stitching at training)
    metadata: dict      # Extra configuration the flow may need
Construct an AsyncOpenAI(base_url=config.base_url, api_key="EMPTY") and call .chat.completions.create(model=config.model, …) — that’s the canonical wiring. Don’t hard-code a base_url or model in the flow body.

Evaluator protocol

from rllm.types import Evaluator, Episode
from rllm.eval.types import EvalOutput

@runtime_checkable
class Evaluator(Protocol):
    def evaluate(self, task: Any, episode: Episode) -> EvalOutput: ...
The Evaluator scores the Episode produced by an AgentFlow. Set traj.reward on each trajectory if you need per-trajectory rewards (e.g. solver vs judge in cookbooks/solver_judge_flow); set EvalOutput.reward for the episode-level scalar that rllm eval aggregates and rllm train feeds into advantage computation.

@rllm.evaluator decorator

import rllm
from rllm.eval.types import EvalOutput, Signal
from rllm.types import Episode

@rllm.evaluator
def my_evaluator(task: dict, episode: Episode) -> EvalOutput:
    answer = str(episode.artifacts.get("answer", ""))
    is_correct = answer == task["ground_truth"]
    return EvalOutput(
        reward=1.0 if is_correct else 0.0,
        is_correct=is_correct,
        signals=[Signal(name="accuracy", value=1.0 if is_correct else 0.0)],
    )
Like @rllm.rollout, supports bare and parameterized forms (@rllm.evaluator(register="my_eval")).

EvalOutput

@dataclass
class EvalOutput:
    reward: float
    is_correct: bool
    signals: list[Signal] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
signals is the right place for per-axis metrics that aggregate across the eval — accuracy, table-access rate, judge-correctness, etc. rllm eval reports the mean of each signal across the dataset.

Return-value coercion

The decorator accepts EvalOutput, a plain float (treated as reward), or a (reward: float, is_correct: bool) tuple. Returning the explicit EvalOutput keeps the signal/metadata channels available.

run_agent_flow helper

For ad-hoc use outside the trainer / runner:
from rllm.types import run_agent_flow

episode = await run_agent_flow(my_flow, task, config)
Prefers arun when present, falls back to run in a thread executor so sync flows don’t block the event loop.

Data types

The shapes the protocols return and consume. All live in rllm.types and are re-exported from rllm.agents for backward compatibility.

Action

Wraps an arbitrary action emitted by an agent.
from rllm.types import Action

action = Action(action="move_forward")
action
Any
The action content (string, dict, or any type).

Step

A single LLM interaction. The first group of fields is what every flow populates; the second group is filled in transparently by the gateway during training.
from rllm.types import Step
Core fields
id
str
Auto-generated UUID.
input
Any | None
Optional structured input (rendered prompt, tool args, …).
output
Any | None
Optional structured output (parsed answer, return value, …).
action
Any | None
The action taken at this step (parsed answer, tool call, …).
reward
float
default:"0.0"
Per-step reward (set by the evaluator if you score per-step).
done
bool
default:"False"
Whether the episode ended at this step.
metadata
dict | None
Arbitrary per-step metadata (also accessible as step.info).
chat_completions
list[dict]
The chat history at this step in OpenAI message format.
model_response
str
The raw assistant content from this step’s LLM call.
thought
str
Reasoning text (e.g. <think>…</think> content extracted from the response).
Training-side fields (populated by the gateway; default-empty in eval-only paths)
prompt_ids
list[int]
Prompt token IDs.
response_ids
list[int]
Response token IDs.
logprobs
list[float]
Per-token logprobs.
model_output
ModelOutput | None
The full structured output from the rollout engine.
advantage
list[float] | float | None
Per-token or scalar advantage, populated by the trainer.
weight_version
int | None
Model-weight version at generation time (used for async-staleness tracking).

Trajectory

A sequence of Steps with a name. The name is what the trainer uses to group trajectories across rollouts when computing advantages — see cookbooks/solver_judge_flow/ for an example with two named groups (solver / judge).
from rllm.types import Trajectory

trajectory = Trajectory(name="solver", steps=[step1, step2])
uid
str
Auto-generated UUID.
name
str
Trajectory role name. Used for advantage grouping. Default: "default_traj_name".
steps
list[Step]
Ordered list of steps in this trajectory.
reward
float | None
Trajectory-level reward (set by the evaluator for per-trajectory scoring).
output
Any
Optional final answer / return value.
metadata
dict | None
Arbitrary per-trajectory metadata (also accessible as traj.info).
is_cumulative(): returns True if every step’s chat_completions is a strict superset of the previous step’s — useful for trainers that need to know whether the trajectory shares a single growing context vs. independent turns.

Episode

The top-level return shape of an AgentFlow. Bundles all trajectories from one rollout plus any artifacts the evaluator will read.
from rllm.types import Episode

episode = Episode(
    trajectories=[traj1, traj2],
    artifacts={"answer": final_answer},
)
id
str
Auto-generated UUID. The runner overrides this to f"{task.id}:{rollout_idx}".
task
Any
Task data (often task.id or the metadata dict, depending on the flow).
trajectories
list[Trajectory]
All trajectories produced during this rollout.
artifacts
dict
Free-form output bag the evaluator reads. Convention: store the agent’s final answer at artifacts["answer"].
is_correct
bool
default:"False"
Whether this episode counts as a correct solve. The evaluator typically writes this.
termination_reason
TerminationReason | None
Why the episode ended (set by the trainer / runner, not usually by the flow).
metrics
dict
Optional per-episode metrics that the trainer logs.
metadata
dict
Arbitrary metadata.

TrajectoryGroup

The trainer reorganizes per-rollout Episode objects into per-task TrajectoryGroups for advantage computation — all solver trajectories for one task into one group, all judge trajectories into another, and so on. Most users don’t construct these directly; the trainer does.
from rllm.types import TrajectoryGroup
trajectories
list[Trajectory]
All trajectories in this group (typically same name, same task).
group_id
str
Identifier in the form {task_id}:{role} (e.g. "task1:solver").
metadata
list[dict]
Per-trajectory metadata aligned with trajectories.

Episode artifacts convention

The convention across all rLLM cookbooks: the flow stores its final user-facing answer in episode.artifacts["answer"], and the evaluator reads it from there. This keeps reward computation outside the flow (so the same flow is reusable with different graders) and gives rllm.eval.reward_fns._helpers.extract_answer_text a single place to look.

See also

Cookbooks

Seven worked AgentFlow examples

AgentFlow & Evaluator

Conceptual walkthrough of the protocol

Workflows

The legacy Workflow path (uses BaseAgent + BaseEnv)

Trainer

Wire an AgentFlow + Evaluator into RL training