Skip to main content
The RolloutEngine provides an abstraction for model inference across different backends (OpenAI, Fireworks, vLLM, etc.).

RolloutEngine

Base class for all rollout engines.
from rllm.engine.rollout import RolloutEngine

Methods

get_model_response

Generate a model response for the given messages.
output = await engine.get_model_response(
    messages=[{"role": "user", "content": "Hello"}],
    application_id="task_0",
    temperature=0.7,
    max_tokens=2048
)
messages
list[dict]
List of chat messages in OpenAI format.
application_id
str
Unique identifier for tracking requests.
**kwargs
dict
Additional sampling parameters (temperature, top_p, max_tokens, etc.).
output
ModelOutput
Model output containing text, tokens, and metadata.

wake_up

Initialize or warm up the engine (implementation-specific).
await engine.wake_up()

sleep

Shutdown or clean up the engine (implementation-specific).
await engine.sleep()

ModelOutput

Dataclass containing model generation output.
from rllm.engine.rollout import ModelOutput

Fields

text
str | None
Complete generated text (may include reasoning).
content
str | None
Content portion of the response (excluding reasoning).
reasoning
str | None
Reasoning or thought process (if model supports it).
tool_calls
list[ToolCall] | None
List of tool calls made by the model.
prompt_ids
list[int] | None
Token IDs for the input prompt.
completion_ids
list[int] | None
Token IDs for the completion.
multi_modal_inputs
dict[str, list] | None
Multimodal inputs (e.g., images).
logprobs
list[float] | None
Log probabilities for completion tokens.
prompt_logprobs
list[float] | None
Log probabilities for prompt tokens (aligned to prompt_ids).
prompt_length
int
default:"0"
Length of prompt in tokens.
completion_length
int
default:"0"
Length of completion in tokens.
finish_reason
str | None
Reason generation stopped (“stop”, “length”, etc.).

Methods

# Serialize
output_dict = model_output.to_dict()

# Deserialize
model_output = ModelOutput.from_dict(output_dict)

OpenAIEngine

Rollout engine using OpenAI-compatible APIs.
from rllm.engine.rollout import OpenAIEngine

engine = OpenAIEngine(
    base_url="http://localhost:8000/v1",
    api_key="EMPTY",
    model="Qwen/Qwen3-4B"
)

Constructor

base_url
str
Base URL for the API endpoint.
api_key
str
API key for authentication.
model
str
Model identifier.

FireworksEngine

Rollout engine using Fireworks AI API.
from rllm.engine.rollout import FireworksEngine

engine = FireworksEngine(
    api_key="your_fireworks_key",
    model="accounts/fireworks/models/deepseek-r1"
)

Example: Basic Usage

import asyncio
from rllm.engine.rollout import OpenAIEngine

engine = OpenAIEngine(
    base_url="http://localhost:4000/v1",
    api_key="EMPTY",
    model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
)

async def generate():
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is the capital of France?"}
    ]
    
    output = await engine.get_model_response(
        messages,
        temperature=0.7,
        max_tokens=512
    )
    
    print(f"Content: {output.content}")
    print(f"Reasoning: {output.reasoning}")
    print(f"Tokens: {output.completion_length}")
    print(f"Finish reason: {output.finish_reason}")

asyncio.run(generate())

Example: Batch Generation

import asyncio
from rllm.engine.rollout import OpenAIEngine

engine = OpenAIEngine(
    base_url="http://localhost:8000/v1",
    api_key="EMPTY",
    model="Qwen/Qwen3-4B"
)

async def generate_batch():
    tasks = [
        [{"role": "user", "content": "What is 2+2?"}],
        [{"role": "user", "content": "What is the speed of light?"}],
        [{"role": "user", "content": "Who wrote Romeo and Juliet?"}]
    ]
    
    # Generate concurrently
    results = await asyncio.gather(*[
        engine.get_model_response(messages, application_id=f"task_{i}")
        for i, messages in enumerate(tasks)
    ])
    
    for i, output in enumerate(results):
        print(f"\nTask {i}:")
        print(f"Response: {output.content}")

asyncio.run(generate_batch())