Skip to main content
The AgentExecutionEngine provides an asynchronous execution system for running agents in environments with efficient batching and parallelization.

AgentExecutionEngine

from rllm.engine import AgentExecutionEngine

Constructor

def __init__(
    engine_name: str = "openai",
    tokenizer = None,
    rollout_engine = None,
    chat_parser = None,
    n_parallel_agents: int = 128,
    trajectory_timeout: int | None = None,
    gamma: float = 0.2,
    api_retries: int = 3,
    retry_limit: int = 3,
    max_steps: int = 5,
    max_response_length: int = 8192,
    max_prompt_length: int = 1024,
    config = None,
    agent_class: type | None = None,
    env_class: type | None = None,
    agent_args: dict | None = None,
    rollout_engine_args: dict | None = None,
    env_args: dict | None = None,
    max_workers: int = 64,
    enforce_max_prompt_length: bool = False,
    overlong_filter: bool = False,
    **kwargs
)
engine_name
str
default:"openai"
Rollout engine backend: “openai”, “verl”, or “tinker”.
tokenizer
PreTrainedTokenizer | None
Tokenizer for encoding/decoding text.
rollout_engine
RolloutEngine | None
Pre-initialized rollout engine. If None, creates one based on engine_name.
chat_parser
ChatParser | None
Parser for converting between chat format and tokens. Auto-detected if None.
n_parallel_agents
int
default:"128"
Number of active agents to run in parallel.
trajectory_timeout
int | None
Timeout in seconds for a single trajectory. Defaults to 1e9 if None.
gamma
float
default:"0.2"
Discount factor for computing Monte Carlo returns.
api_retries
int
default:"3"
Number of API call retries on failure.
retry_limit
int
default:"3"
Maximum retry attempts for failed trajectories.
max_steps
int
default:"5"
Maximum steps per episode before termination.
max_response_length
int
default:"8192"
Maximum response length in tokens.
max_prompt_length
int
default:"1024"
Maximum prompt length in tokens.
config
dict | None
Configuration dictionary for additional settings.
agent_class
type | None
Agent class to instantiate.
env_class
type | None
Environment class to instantiate.
agent_args
dict | None
Arguments passed to agent constructor.
rollout_engine_args
dict | None
Arguments for rollout engine initialization (e.g., base_url, api_key).
env_args
dict | None
Arguments passed to environment constructor.
max_workers
int
default:"64"
Number of concurrent environment operations.
enforce_max_prompt_length
bool
default:"False"
If True, applies max prompt check at every step.
overlong_filter
bool
default:"False"
If True, filters out trajectories with termination reasons: TRUNCATION, MAX_STEPS, TIMEOUT.

Methods

execute_tasks

Execute agent-environment interactions on a batch of tasks.
results = await engine.execute_tasks(
    tasks=task_list,
    num_rollouts_per_task=1,
    **sampling_params
)
tasks
list[dict]
List of task dictionaries.
num_rollouts_per_task
int
default:"1"
Number of rollouts to generate per task.
**sampling_params
dict
Additional sampling parameters (temperature, top_p, etc.).
results
list[Trajectory]
List of completed trajectories.

Example: Search Agent Execution

import asyncio
from transformers import AutoTokenizer
from rllm.engine import AgentExecutionEngine
from rllm.agents import ToolAgent
from rllm.environments import ToolEnvironment
from rllm.rewards import search_reward_fn

# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-4B")

# Define tools
tool_map = {"search": MySearchTool}

# Create engine
engine = AgentExecutionEngine(
    agent_class=ToolAgent,
    agent_args={
        "tool_map": tool_map,
        "system_prompt": "You are a helpful search assistant."
    },
    env_class=ToolEnvironment,
    env_args={
        "tool_map": tool_map,
        "reward_fn": search_reward_fn
    },
    engine_name="openai",
    rollout_engine_args={
        "base_url": "http://localhost:8000/v1",
        "api_key": "EMPTY"
    },
    tokenizer=tokenizer,
    n_parallel_agents=64,
    max_steps=5,
    max_response_length=4096
)

# Define tasks
tasks = [
    {"question": "What is the capital of France?", "answer": "Paris"},
    {"question": "Who wrote Hamlet?", "answer": "William Shakespeare"}
]

# Execute
results = asyncio.run(engine.execute_tasks(
    tasks,
    num_rollouts_per_task=2,
    temperature=0.7,
    top_p=0.95
))

print(f"Generated {len(results)} trajectories")
for traj in results:
    print(f"Task: {traj.task}")
    print(f"Reward: {traj.reward}")
    print(f"Steps: {len(traj.steps)}")

Example: Math Problem Solving

import asyncio
from transformers import AutoTokenizer
from rllm.engine import AgentExecutionEngine
from rllm.agents import MathAgent
from rllm.environments import SingleTurnEnvironment
from rllm.rewards import math_reward_fn

tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")

engine = AgentExecutionEngine(
    agent_class=MathAgent,
    env_class=SingleTurnEnvironment,
    env_args={"reward_fn": math_reward_fn},
    engine_name="openai",
    rollout_engine_args={
        "base_url": "http://localhost:4000/v1",
        "api_key": "EMPTY"
    },
    tokenizer=tokenizer,
    n_parallel_agents=128,
    max_steps=1,
    max_response_length=2048
)

tasks = [
    {"question": "Solve 2x + 5 = 13", "answer": "4"},
    {"question": "What is 15% of 80?", "answer": "12"}
]

results = asyncio.run(engine.execute_tasks(tasks, temperature=0.6))

for traj in results:
    print(f"Question: {traj.task['question']}")
    print(f"Answer: {traj.steps[-1].action}")
    print(f"Reward: {traj.reward}")