In this tutorial, you’ll build a two-agent system where:
- Solver: Generates candidate solutions to a problem
- Judge: Evaluates and selects the best solution
This pattern is powerful for training agents that can both generate and verify solutions.
Overview
By the end of this tutorial, you will have:
- Built a Solver agent that generates multiple solution candidates
- Built a Judge agent that selects the best solution
- Assigned separate rewards to each agent using
@trajectory
- Trained the multi-agent system end-to-end
Dataset: Countdown - Given numbers, reach a target using arithmetic operations.
Why Multi-Agent?
In a multi-agent system, you have multiple rollout functions (Solver and Judge), and each gets its own reward.
Concepts
We will cover:
@trajectory decorator: Automatic session management and trace capture
TrajectoryView: Access to steps, results, and rewards
- Multi-agent workflows: Composing multiple agents with independent rewards
Setup
Install dependencies
Install rLLM if you haven’t already: Prepare the dataset
Download the Countdown dataset:python -m rllm.data.prepare_countdown
Launch a vLLM server
Start a vLLM server for testing:vllm serve Qwen/Qwen3-4B-Instruct-2507 \
--host 0.0.0.0 \
--port 4000
1. Understanding @trajectory
The @trajectory decorator automatically:
- Tracks all LLM calls as steps
- Returns a
TrajectoryView with steps and result
1.1 Basic usage
from rllm.sdk import trajectory, get_chat_client_async
@trajectory(name="my_agent")
async def my_agent(prompt: str):
client = get_chat_client_async(
base_url="http://localhost:4000/v1",
api_key="EMPTY",
use_proxy=False # set to False when using vLLM server directly
)
response = await client.chat.completions.create(
model="Qwen/Qwen3-4B-Instruct-2507",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
1.2 What you get back
traj = await my_agent("What is 2+2?")
# traj is a TrajectoryView with:
print("Agent Name:", traj.name) # "my_agent"
print("Response:", traj.result) # "4" (your return value)
print("Steps:", traj.steps) # [StepView(...)] - one per LLM call
print("Reward:", traj.reward) # 0.0 (default, you can set this)
2. Countdown Task
Given a target number and a list of numbers, create an equation using the given numbers to reach the target.
Example:
- Target:
150
- Numbers:
[3, 50]
- Valid solution:
3 * 50 = 150
3. Build the Solver Agent
The Solver generates solution candidates for Countdown puzzles.
3.1 Define the Solver class
import asyncio
import re
from rllm.sdk import trajectory, get_chat_client_async
SOLVER_PROMPT = "{problem}. Output the final answer within <answer>...</answer>"
class Solver:
def __init__(self, use_proxy: bool = False):
self.client = get_chat_client_async(
base_url="http://localhost:4000/v1",
api_key="token-abc123",
use_proxy=use_proxy,
)
self.model = "Qwen/Qwen3-4B-Instruct-2507"
@trajectory(name="solver")
async def generate_solution(self, problem: str):
"""Generate a single solution. Returns TrajectoryView automatically."""
prompt = SOLVER_PROMPT.format(problem=problem)
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=1.0, # Higher temperature for diverse solutions
max_tokens=1000,
)
response_text = response.choices[0].message.content
return self._parse_answer(response_text)
def _parse_answer(self, response: str) -> str:
"""Extract answer from <answer>...</answer> tags."""
match = re.search(r"<answer>(.*?)</answer>", response, re.IGNORECASE | re.DOTALL)
if match:
return f"<answer>{match.group(1).strip()}</answer>"
return ""
async def generate_solutions(self, problem: str, n_solutions: int = 2):
"""Generate multiple solutions concurrently."""
tasks = [
asyncio.create_task(self.generate_solution(problem))
for _ in range(n_solutions)
]
return await asyncio.gather(*tasks)
Why <answer> tags? The reward function looks for <answer>equation</answer> to extract the solution. Without it, the reward function cannot find your answer—similar to \boxed{} in math problems.
3.2 Test the Solver
solver = Solver()
# Generate 2 solutions for a Countdown puzzle
problem = "Using numbers [3, 50], reach target 150"
trajs = await solver.generate_solutions(problem, n_solutions=2)
for i, traj in enumerate(trajs):
print(f"Solution {i+1}: {traj.result}")
print(f"Collected LLM Calls: {len(traj.steps)}")
Expected output:
Solution 1: <answer>3 * 50 = 150</answer>
Collected LLM Calls: 1
Solution 2: <answer>3 * 50</answer>
Collected LLM Calls: 1
4. Build the Judge Agent
The Judge evaluates solutions and selects the best one.
4.1 Define the Judge class
JUDGE_PROMPT = """You are an expert verifier. Given a countdown problem and multiple solution attempts, select a correct solution.
Problem:
{problem}
Solutions to evaluate:
{solutions}
A correct solution must:
1. Use only the given numbers
2. Use each number exactly once
3. Use only basic arithmetic operations (+, -, *, /)
4. Result in the target number
5. Be marked within <answer>...</answer> tags
Output the index of your selected solution within <answer>...</answer> tags, e.g., <answer>1</answer> for the first solution."""
class Judge:
def __init__(self, use_proxy: bool = False):
self.client = get_chat_client_async(
base_url="http://localhost:4000/v1",
api_key="token-abc123",
use_proxy=use_proxy,
)
self.model = "Qwen/Qwen3-4B-Instruct-2507"
@trajectory(name="judge")
async def judge_solutions(self, problem: str, solutions: list[str]):
"""Evaluate solutions and select the best one."""
# Format solutions list
solutions_text = ""
for i, sol in enumerate(solutions, 1):
solutions_text += f"\nSolution {i}:\n{sol}\n"
prompt = JUDGE_PROMPT.format(problem=problem, solutions=solutions_text)
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=1.0,
max_tokens=2000,
)
response_text = response.choices[0].message.content
return self._parse_selection(response_text, solutions)
def _parse_selection(self, response: str, solutions: list[str]) -> str:
"""Extract selected solution index."""
match = re.search(r"<answer>(\d+)</answer>", response)
if match:
idx = int(match.group(1)) - 1
if 0 <= idx < len(solutions):
return solutions[idx]
return ""
5. Compose the Workflow
Now combine Solver and Judge, assigning rewards to each trajectory.
from rllm.sdk import TrajectoryView
from rllm.rewards.countdown_reward import countdown_reward_fn
class SolverJudgeWorkflow:
def __init__(self, n_solutions: int = 2, **kwargs):
self.n_solutions = n_solutions
self.reward_function = countdown_reward_fn
self.solver = Solver(use_proxy=True)
self.judge = Judge(use_proxy=True)
async def run(self, task: dict, **kwargs) -> list[TrajectoryView]:
"""Run the full workflow and return all trajectories."""
problem = task["question"]
# Step 1: Generate multiple solutions
solver_trajs = await self.solver.generate_solutions(problem, self.n_solutions)
# Step 2: Assign rewards to each solver
solutions = []
for traj in solver_trajs:
parsed_answer = traj.result
reward = self.reward_function(task, parsed_answer).reward
# Assign reward to the trajectory AND its steps
traj.steps[0].reward = reward
traj.reward = reward
solutions.append(parsed_answer)
# Step 3: Judge selects the best solution
judge_traj = await self.judge.judge_solutions(problem, solutions)
selected = judge_traj.result
# Judge reward based on final selection quality
judge_reward = self.reward_function(task, selected).reward
judge_traj.steps[0].reward = judge_reward
judge_traj.reward = judge_reward
# Return ALL trajectories for training
return solver_trajs + [judge_traj]
5.1 Reward assignment strategy
Example run:
┌─────────────────────────────────────────────────┐
│ Problem: Reach 150 with [3, 50] │
├─────────────────────────────────────────────────┤
│ Solver 1: "100 + 50 = 150" → reward = 0.0 ✗ │
│ Solver 2: "3 * 50 = 150" → reward = 1.0 ✓ │
│ Judge: selects Solver 2 → reward = 1.0 ✓ │
└─────────────────────────────────────────────────┘
Training signal:
• Solver 2 is reinforced (correct answer)
• Solver 1 learns to improve (wrong answer)
• Judge learns to identify correct solutions
6. Run Training
import hydra
from rllm.data.dataset import DatasetRegistry
from rllm.trainer.agent_trainer import AgentTrainer
async def run_workflow(**kwargs) -> list[TrajectoryView]:
"""Training wrapper that returns trajectories."""
workflow = SolverJudgeWorkflow(n_solutions=2)
return await workflow.run(kwargs)
@hydra.main(
config_path="pkg://rllm.trainer.config",
config_name="agent_ppo_trainer",
version_base=None
)
def main(config):
train_dataset = DatasetRegistry.load_dataset("countdown", "train")
test_dataset = DatasetRegistry.load_dataset("countdown", "test")
trainer = AgentTrainer(
config=config,
train_dataset=train_dataset,
val_dataset=test_dataset,
agent_run_func=run_workflow,
)
trainer.train()
if __name__ == "__main__":
main()
Launch training:
cd rllm
bash examples/sdk/solver_judge/train_decorator.sh
Next Steps
Resources