Trajectories represent multi-step workflows where each LLM call becomes a step with assignable rewards. The @trajectory decorator automatically converts function execution into structured TrajectoryView objects, making it easy to collect training data for reinforcement learning.
Trajectory Basics
The @trajectory Decorator
The @trajectory decorator transforms a function into a trajectory-tracked workflow:
from rllm.sdk import trajectory, get_chat_client_async
llm = get_chat_client_async(api_key="sk-...")
@trajectory(name="solver")
async def solve_math_problem(problem: str):
# Each LLM call automatically becomes a step
response1 = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Solve: {problem}"}]
)
response2 = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Is this correct?"}]
)
return response2.choices[0].message.content
# Returns TrajectoryView instead of string
traj = await solve_math_problem("What is 2+2?")
print(f"Steps: {len(traj.steps)}") # 2
Important: The @trajectory decorator changes the return type of your function. Instead of returning the original value, it returns a TrajectoryView object with the original return value stored in the output field.
TrajectoryView Structure
The decorator returns a TrajectoryView with the following structure:
class TrajectoryView(BaseModel):
name: str = "agent" # Trajectory name
steps: list[StepView] = [] # List of steps (LLM calls)
reward: float = 0.0 # Trajectory reward (set manually)
input: dict | None = None # Function arguments
output: Any = None # Function return value
metadata: dict | None = None # Additional tracking data
Access the original return value via traj.output or traj.result:
traj = await solve_math_problem("What is 2+2?")
answer = traj.output # Original return value: "4"
# or
answer = traj.result # Backward compatibility alias
Working with Steps
StepView Structure
Each step in a trajectory is a StepView - a wrapper around a trace with a reward field:
class StepView(BaseModel):
id: str # Trace ID
input: Any | None = None # LLM input (messages)
output: Any | None = None # LLM output (response)
action: Any | None = None # Parsed action (set manually)
reward: float = 0.0 # Step reward (set manually)
metadata: dict | None = None # Model, tokens, latency, etc.
Assigning Rewards
Rewards must be set manually based on your evaluation logic:
@trajectory(name="solver")
async def solve_problem(problem: str, ground_truth: str):
response = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": problem}]
)
return response.choices[0].message.content
# Execute trajectory
traj = await solve_problem("What is 2+2?", ground_truth="4")
# Assign step rewards
for step in traj.steps:
# Evaluate based on your criteria
if is_correct(step.output, ground_truth):
step.reward = 1.0
else:
step.reward = 0.0
# Assign trajectory reward (e.g., sum or average)
traj.reward = sum(s.reward for s in traj.steps)
Accessing Step Data
traj = await solve_math_problem("What is 2+2?")
# Iterate over steps
for i, step in enumerate(traj.steps):
print(f"Step {i}:")
print(f" Input: {step.input}")
print(f" Output: {step.output}")
print(f" Reward: {step.reward}")
print(f" Metadata: {step.metadata}")
# Access specific step
first_step = traj.steps[0]
print(f"First step ID: {first_step.id}")
print(f"First step output: {first_step.output}")
Pass metadata to the decorator to tag trajectories:
@trajectory(
name="solver",
experiment="v1",
model_version="gpt-4",
task_type="math"
)
async def solve_problem(problem: str):
# Implementation
pass
traj = await solve_problem("What is 2+2?")
print(traj.metadata) # {"experiment": "v1", "model_version": "gpt-4", ...}
Capturing Function Arguments
The decorator automatically captures function arguments in the input field:
@trajectory(name="solver")
async def solve_problem(problem: str, max_steps: int = 3):
# Implementation
pass
traj = await solve_problem("What is 2+2?", max_steps=5)
print(traj.input) # {"problem": "What is 2+2?", "max_steps": 5}
Synchronous vs Async
The @trajectory decorator works with both synchronous and asynchronous functions:
Async Functions
from rllm.sdk import trajectory, get_chat_client_async
llm = get_chat_client_async(api_key="sk-...")
@trajectory(name="async_solver")
async def async_solve(problem: str):
response = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": problem}]
)
return response.choices[0].message.content
traj = await async_solve("What is 2+2?")
Synchronous Functions
from rllm.sdk import trajectory, get_chat_client
llm = get_chat_client(api_key="sk-...")
@trajectory(name="sync_solver")
def sync_solve(problem: str):
response = llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": problem}]
)
return response.choices[0].message.content
traj = sync_solve("What is 2+2?")
Multi-Step Workflows
Sequential Steps
@trajectory(name="solver_verifier")
async def solve_and_verify(problem: str):
# Step 1: Generate solution
solution = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Solve: {problem}"}]
)
# Step 2: Verify solution
verification = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Is this correct? {solution}"}]
)
return verification.choices[0].message.content
traj = await solve_and_verify("What is 2+2?")
print(f"Total steps: {len(traj.steps)}") # 2
# Assign rewards based on step position
traj.steps[0].reward = 0.5 # Partial reward for solution
traj.steps[1].reward = 1.0 # Full reward for verification
traj.reward = traj.steps[-1].reward # Use last step's reward
Parallel Steps
import asyncio
@trajectory(name="multi_solver")
async def solve_multiple(problem: str, n_solutions: int = 3):
# Generate multiple solutions in parallel
tasks = [
llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Solve: {problem}"}]
)
for _ in range(n_solutions)
]
solutions = await asyncio.gather(*tasks)
return [s.choices[0].message.content for s in solutions]
traj = await solve_multiple("What is 2+2?", n_solutions=3)
print(f"Total steps: {len(traj.steps)}") # 3
print(f"Solutions: {traj.output}") # List of 3 solutions
Real-World Example: Solver-Judge Workflow
Here’s a complete example from the rLLM examples:
import re
from rllm.sdk import trajectory, get_chat_client_async
llm = get_chat_client_async(base_url="http://localhost:4000/v1", api_key="EMPTY")
class Solver:
@trajectory(name="solver")
async def generate_solution(self, problem: str):
messages = [
{"role": "user", "content": f"{problem}. Output within <answer>...</answer>"}
]
response = await llm.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=1.0,
max_tokens=1000,
)
response_text = response.choices[0].message.content
return self._parse_answer(response_text)
def _parse_answer(self, response: str) -> str:
match = re.search(r"<answer>(.*?)</answer>", response, re.IGNORECASE | re.DOTALL)
return f"<answer>{match.group(1).strip()}</answer>" if match else "No solution"
class Judge:
@trajectory(name="judge")
async def judge_solutions(self, problem: str, solutions: list[str]):
prompt = self._create_judge_prompt(problem, solutions)
response = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=1.0,
)
return self._select_best_solution(response, solutions)
def _create_judge_prompt(self, problem: str, solutions: list[str]) -> str:
prompt = f"Problem: {problem}\n\nSolutions:\n"
for i, sol in enumerate(solutions, 1):
prompt += f"\nSolution {i}: {sol}"
prompt += "\n\nSelect the best solution (output index in <answer>)."
return prompt
def _select_best_solution(self, response, solutions: list[str]) -> str:
match = re.search(r"<answer>(.*?)</answer>", response.choices[0].message.content)
if match:
try:
idx = int(match.group(1).strip()) - 1
return solutions[idx]
except (ValueError, IndexError):
pass
return ""
# Usage
solver = Solver()
judge = Judge()
problem = "What is 2+2?"
# Generate solutions
solver_traj = await solver.generate_solution(problem)
solver_traj.steps[0].reward = 1.0 if is_correct(solver_traj.output) else 0.0
# Judge solutions
judge_traj = await judge.judge_solutions(problem, [solver_traj.output])
judge_traj.steps[0].reward = 1.0 if is_correct(judge_traj.output) else 0.0
# Combine trajectories
all_trajectories = [solver_traj, judge_traj]
total_reward = sum(t.reward for t in all_trajectories)
How It Works Internally
The @trajectory decorator:
- Creates a session internally using
session(trajectory_name=name, **metadata)
- Executes the wrapped function within the session context
- Collects all LLM traces from the session
- Converts each trace to a
StepView using trace_to_step_view()
- Captures function arguments in
input field
- Stores function return value in
output field
- Returns a
TrajectoryView instead of the original return value
Source code reference: rllm/sdk/decorators.py:12-112
Advanced Patterns
Nested Trajectories
Trajectories can be nested, though each creates its own session:
@trajectory(name="outer")
async def outer_workflow(problem: str):
# Calls another trajectory
inner_traj = await inner_workflow(problem)
# Continue with more steps
result = await llm.chat.completions.create(...)
return result
@trajectory(name="inner")
async def inner_workflow(problem: str):
result = await llm.chat.completions.create(...)
return result
# outer_workflow's trajectory includes its own steps,
# inner_workflow returns its own separate trajectory
traj = await outer_workflow("What is 2+2?")
Conditional Steps
@trajectory(name="conditional_solver")
async def conditional_solve(problem: str, max_attempts: int = 3):
for attempt in range(max_attempts):
response = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": problem}]
)
answer = response.choices[0].message.content
if is_valid(answer):
return answer
return "No valid answer found"
traj = await conditional_solve("What is 2+2?")
# Number of steps depends on when valid answer is found
print(f"Attempts: {len(traj.steps)}")
Error Handling
@trajectory(name="robust_solver")
async def robust_solve(problem: str):
try:
response = await llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": problem}]
)
return response.choices[0].message.content
except Exception as e:
# Still returns a trajectory, even on error
return f"Error: {str(e)}"
traj = await robust_solve("What is 2+2?")
if traj.output.startswith("Error:"):
traj.reward = 0.0 # Penalize errors
Best Practices
- Always set rewards: The decorator sets
reward=0.0 by default - you must set it based on your evaluation logic
- Use descriptive names: Choose trajectory names that clearly indicate the workflow purpose
- Leverage metadata: Add experiment tracking metadata to the decorator
- Access original return value: Remember to use
traj.output or traj.result to get the original return value
- Consider step-level rewards: Assign rewards to individual steps for more granular training signals
- Handle errors gracefully: Trajectories are created even when errors occur - handle them appropriately
Trajectory vs Session
| Feature | Session | Trajectory |
|---|
| Use case | Manual trace collection | Automatic workflow tracking |
| Return value | Unchanged | TrajectoryView |
| Syntax | Context manager | Decorator |
| Reward field | Not included | Included in steps |
| Function args | Not captured | Captured in input |
| Nesting | Metadata inherited | Separate trajectories |
Next Steps