The AgentExecutionEngine provides an asynchronous execution system for running agents in environments with efficient batching and parallelization.
AgentExecutionEngine
from rllm.engine import AgentExecutionEngine
Constructor
def __init__(
engine_name: str = "openai",
tokenizer = None,
rollout_engine = None,
chat_parser = None,
n_parallel_agents: int = 128,
trajectory_timeout: int | None = None,
gamma: float = 0.2,
api_retries: int = 3,
retry_limit: int = 3,
max_steps: int = 5,
max_response_length: int = 8192,
max_prompt_length: int = 1024,
config = None,
agent_class: type | None = None,
env_class: type | None = None,
agent_args: dict | None = None,
rollout_engine_args: dict | None = None,
env_args: dict | None = None,
max_workers: int = 64,
enforce_max_prompt_length: bool = False,
overlong_filter: bool = False,
**kwargs
)
Rollout engine backend: “openai”, “verl”, or “tinker”.
tokenizer
PreTrainedTokenizer | None
Tokenizer for encoding/decoding text.
Pre-initialized rollout engine. If None, creates one based on engine_name.
Parser for converting between chat format and tokens. Auto-detected if None.
Number of active agents to run in parallel.
Timeout in seconds for a single trajectory. Defaults to 1e9 if None.
Discount factor for computing Monte Carlo returns.
Number of API call retries on failure.
Maximum retry attempts for failed trajectories.
Maximum steps per episode before termination.
Maximum response length in tokens.
Maximum prompt length in tokens.
Configuration dictionary for additional settings.
Agent class to instantiate.
Environment class to instantiate.
Arguments passed to agent constructor.
Arguments for rollout engine initialization (e.g., base_url, api_key).
Arguments passed to environment constructor.
Number of concurrent environment operations.
enforce_max_prompt_length
If True, applies max prompt check at every step.
If True, filters out trajectories with termination reasons: TRUNCATION, MAX_STEPS, TIMEOUT.
Methods
execute_tasks
Execute agent-environment interactions on a batch of tasks.
results = await engine.execute_tasks(
tasks=task_list,
num_rollouts_per_task=1,
**sampling_params
)
List of task dictionaries.
Number of rollouts to generate per task.
Additional sampling parameters (temperature, top_p, etc.).
List of completed trajectories.
Example: Search Agent Execution
import asyncio
from transformers import AutoTokenizer
from rllm.engine import AgentExecutionEngine
from rllm.agents import ToolAgent
from rllm.environments import ToolEnvironment
from rllm.rewards import search_reward_fn
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-4B")
# Define tools
tool_map = {"search": MySearchTool}
# Create engine
engine = AgentExecutionEngine(
agent_class=ToolAgent,
agent_args={
"tool_map": tool_map,
"system_prompt": "You are a helpful search assistant."
},
env_class=ToolEnvironment,
env_args={
"tool_map": tool_map,
"reward_fn": search_reward_fn
},
engine_name="openai",
rollout_engine_args={
"base_url": "http://localhost:8000/v1",
"api_key": "EMPTY"
},
tokenizer=tokenizer,
n_parallel_agents=64,
max_steps=5,
max_response_length=4096
)
# Define tasks
tasks = [
{"question": "What is the capital of France?", "answer": "Paris"},
{"question": "Who wrote Hamlet?", "answer": "William Shakespeare"}
]
# Execute
results = asyncio.run(engine.execute_tasks(
tasks,
num_rollouts_per_task=2,
temperature=0.7,
top_p=0.95
))
print(f"Generated {len(results)} trajectories")
for traj in results:
print(f"Task: {traj.task}")
print(f"Reward: {traj.reward}")
print(f"Steps: {len(traj.steps)}")
Example: Math Problem Solving
import asyncio
from transformers import AutoTokenizer
from rllm.engine import AgentExecutionEngine
from rllm.agents import MathAgent
from rllm.environments import SingleTurnEnvironment
from rllm.rewards import math_reward_fn
tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")
engine = AgentExecutionEngine(
agent_class=MathAgent,
env_class=SingleTurnEnvironment,
env_args={"reward_fn": math_reward_fn},
engine_name="openai",
rollout_engine_args={
"base_url": "http://localhost:4000/v1",
"api_key": "EMPTY"
},
tokenizer=tokenizer,
n_parallel_agents=128,
max_steps=1,
max_response_length=2048
)
tasks = [
{"question": "Solve 2x + 5 = 13", "answer": "4"},
{"question": "What is 15% of 80?", "answer": "12"}
]
results = asyncio.run(engine.execute_tasks(tasks, temperature=0.6))
for traj in results:
print(f"Question: {traj.task['question']}")
print(f"Answer: {traj.steps[-1].action}")
print(f"Reward: {traj.reward}")