Skip to main content
The rLLM SDK integrates seamlessly with popular agent frameworks and infrastructure tools. This guide shows you how to use the SDK with LangGraph, SmolAgent, Strands, and the LiteLLM proxy.

LangGraph Integration

LangGraph is a library for building stateful, multi-actor applications with LLMs. The rLLM SDK tracks all LLM calls made through LangGraph agents.

Basic Setup

import os
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition

from rllm.sdk import get_chat_client, get_chat_client_async, session
from rllm.sdk.session.base import _ensure_tracer_initialized

# Initialize rLLM SDK chat clients
sync_client = get_chat_client(
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url="http://localhost:4000/v1",  # LiteLLM proxy
    use_proxy=True,
)

async_client = get_chat_client_async(
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url="http://localhost:4000/v1",
    use_proxy=True,
)

# Configure tracer (optional, for distributed tracing)
_ensure_tracer_initialized("search_agent")

# Pass both clients to ChatOpenAI
response_model = ChatOpenAI(
    model="gpt-4",
    temperature=0.7,
    max_tokens=2048,
    client=sync_client,        # For sync operations
    async_client=async_client, # For async operations
)

Complete LangGraph Agent Example

async def agent_step(state: MessagesState):
    """Agent decides whether to call tools or provide final answer."""
    response = await response_model.bind_tools([retriever_tool]).ainvoke(state["messages"])
    return {"messages": [response]}

# Build the workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_step)
workflow.add_node("tools", ToolNode([retriever_tool]))

workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    tools_condition,
    {
        "tools": "tools",
        END: END,
    },
)
workflow.add_edge("tools", "agent")

graph = workflow.compile()

# Run with session tracking
async def run_agent(question: str):
    with session(experiment="v1", task="qa") as sess:
        async for chunk in graph.astream(
            {"messages": [{"role": "user", "content": question}]},
            {"recursion_limit": 20},
        ):
            # Process chunks
            pass
        
        # Access collected traces
        print(f"Collected {len(sess.llm_calls)} traces")
        return sess.llm_calls

await run_agent("What is the capital of France?")
Key Points:
  • Pass both client and async_client to ChatOpenAI
  • Use get_chat_client() and get_chat_client_async() from rLLM SDK
  • Wrap execution in session() context to collect traces
  • All LLM calls are automatically tracked
Source: examples/sdk/langgraph/search_agent_langgraph.py

SmolAgent Integration

SmolAgent is HuggingFace’s lightweight agent framework. The rLLM SDK can track SmolAgent executions.

Basic Setup

from rllm.sdk import session, get_chat_client
from smolagents import CodeAgent, HfApiModel

# Initialize rLLM SDK client
llm = get_chat_client(
    api_key="hf_...",
    base_url="https://api-inference.huggingface.co/v1",
)

# Create SmolAgent with tracked client
agent = CodeAgent(
    tools=[],
    model=HfApiModel(client=llm),
)

# Track agent execution
with session(experiment="smol_v1", agent="code") as sess:
    result = agent.run("Calculate 2+2")
    print(f"Collected {len(sess.llm_calls)} traces")

Advanced SmolAgent Usage

from rllm.sdk import trajectory

@trajectory(name="smol_agent")
async def run_smol_agent(task: str):
    agent = CodeAgent(
        tools=[search_tool, calculator_tool],
        model=HfApiModel(client=llm),
    )
    
    result = agent.run(task)
    return result

# Returns TrajectoryView with all agent steps
traj = await run_smol_agent("Search for Python tutorials")
print(f"Steps: {len(traj.steps)}")
print(f"Result: {traj.output}")

# Assign rewards
for step in traj.steps:
    step.reward = evaluate_step(step)
traj.reward = sum(s.reward for s in traj.steps)

Strands Integration

Strands is a framework for building production-grade AI agents. The rLLM SDK integrates via tracked chat clients.

Basic Setup

from rllm.sdk import session, get_chat_client_async
from strands import Agent, Tool

# Initialize rLLM SDK client
llm = get_chat_client_async(
    api_key="sk-...",
    base_url="https://api.openai.com/v1",
)

# Create Strands agent with tracked client
agent = Agent(
    name="research_assistant",
    model=llm,
    tools=[search_tool, summarize_tool],
)

# Track execution
with session(experiment="strands_v1") as sess:
    result = await agent.run("Research quantum computing")
    print(f"Collected {len(sess.llm_calls)} traces")

Multi-Agent Strands Workflow

from rllm.sdk import trajectory

@trajectory(name="multi_agent_workflow")
async def run_multi_agent(task: str):
    # Create multiple agents
    researcher = Agent(name="researcher", model=llm, tools=[search_tool])
    writer = Agent(name="writer", model=llm, tools=[summarize_tool])
    
    # Research phase
    research_results = await researcher.run(f"Research: {task}")
    
    # Writing phase
    final_output = await writer.run(f"Write report: {research_results}")
    
    return final_output

# Track entire workflow
traj = await run_multi_agent("AI safety research")
print(f"Total steps: {len(traj.steps)}")
print(f"Final output: {traj.output}")

LiteLLM Proxy Integration

The rLLM SDK includes deep integration with LiteLLM proxy for metadata routing and trace collection.

Proxy Architecture

The SDK uses metadata slug encoding to route session context through the proxy:
Client (SDK) → Metadata Slug → LiteLLM Proxy → LLM Provider

              Session Context
              (session_name, experiment, etc.)

Metadata Slug Encoding

The SDK encodes metadata into the URL path for proxy routing:
from rllm.sdk.proxy import (
    encode_metadata_slug,
    decode_metadata_slug,
    build_proxied_base_url,
)

# Encode metadata into URL
metadata = {
    "session_name": "sess_123",
    "experiment": "v1",
    "task": "math"
}

slug = encode_metadata_slug(metadata)
print(slug)  # "rllm1:eyJleHBlcmltZW50IjoidjEiLCAic2Vzc2lvbl9uYW1lIjoic2Vzc18xMjMiLCAidGFzayI6Im1hdGgifQ"

# Build proxied base URL
proxied_url = build_proxied_base_url(
    "http://localhost:4000/v1",
    metadata
)
print(proxied_url)
# "http://localhost:4000/meta/rllm1:eyJleHBlcmltZW50IjoidjEiLCAic2Vzc2lvbl9uYW1lIjoic2Vzc18xMjMiLCAidGFzayI6Im1hdGgifQ/v1"

# Decode metadata from slug
decoded = decode_metadata_slug(slug)
print(decoded)  # {"session_name": "sess_123", "experiment": "v1", "task": "math"}

Automatic Metadata Routing

The SDK automatically routes metadata when use_proxy=True:
from rllm.sdk import session, get_chat_client

llm = get_chat_client(
    api_key="EMPTY",
    base_url="http://localhost:4000/v1",
    use_proxy=True,  # Enable automatic metadata routing
)

with session(experiment="v1", task="math"):
    # Metadata automatically encoded into URL:
    # http://localhost:4000/meta/rllm1:eyJleHBlcmltZW50IjoidjEiLCAidGFzayI6Im1hdGgifQ/v1/chat/completions
    response = llm.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}]
    )

Proxy Middleware

Add the MetadataRoutingMiddleware to your ASGI app to decode metadata slugs:
from fastapi import FastAPI
from rllm.sdk.proxy import MetadataRoutingMiddleware

app = FastAPI()

# Add middleware to extract metadata from URL
app = MetadataRoutingMiddleware(app)

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    # Metadata automatically extracted from URL
    # and available in request scope
    pass
The middleware:
  • Extracts metadata slug from request path
  • Decodes metadata and injects into request scope
  • Cleans the path before forwarding to LiteLLM

LiteLLM Callbacks

The SDK provides callbacks for LiteLLM proxy integration:
from rllm.sdk.proxy import TracingCallback, SamplingParametersCallback

# TracingCallback: Collect traces from LLM calls
tracing_callback = TracingCallback()

# SamplingParametersCallback: Handle sampling parameters
sampling_callback = SamplingParametersCallback()

# Use with LiteLLM
callbacks = [tracing_callback, sampling_callback]
TracingCallback: Automatically collects traces and stores them based on session context. SamplingParametersCallback: Handles custom sampling parameters (temperature, top_p, etc.).

Starting the Proxy

Start the LiteLLM proxy with rLLM configuration:
#!/bin/bash

# Set ulimit and connection limits
ulimit -n 65536
export AIOHTTP_CONNECTOR_LIMIT=4096
export AIOHTTP_KEEPALIVE_TIMEOUT=60

# Start LiteLLM proxy
python -m rllm.sdk.proxy.litellm_server \
  --config litellm_proxy_config.yaml \
  --host 127.0.0.1 \
  --port 4000 \
  --state-dir /tmp/litellm_proxy \
  --cs-endpoint http://localhost:8000 \
  --cs-api-key "your-api-key" \
  --project rllm-agent-sdk \
  --admin-token my-shared-secret
Proxy Configuration (litellm_proxy_config.yaml):
model_list:
  - model_name: gpt-4
    litellm_params:
      model: openai/gpt-4
      api_key: sk-...
  
  - model_name: claude-3-opus
    litellm_params:
      model: anthropic/claude-3-opus-20240229
      api_key: sk-ant-...

general_settings:
  master_key: my-shared-secret
  database_url: sqlite:///litellm_proxy.db

Proxy Manager (Subprocess Mode)

The SDK can automatically manage the proxy lifecycle:
from rllm.sdk.proxy import ProxyManager

# Automatic proxy management
with ProxyManager(
    config_path="litellm_proxy_config.yaml",
    host="127.0.0.1",
    port=4000,
) as proxy:
    # Proxy automatically started
    llm = get_chat_client(
        base_url="http://localhost:4000/v1",
        use_proxy=True,
    )
    
    with session(experiment="v1"):
        llm.chat.completions.create(...)
    
    # Proxy automatically stopped on exit
This is useful for training scripts that need temporary proxy instances. Source: examples/sdk/README.md

OpenTelemetry Integration

For distributed tracing across multiple services, use the OpenTelemetry backend:

Configuration

Edit rllm/sdk/config.yaml:
session_backend: "opentelemetry"
Install dependencies:
pip install rllm[otel]

Basic Usage

from rllm.sdk.session import otel_session, configure_default_tracer
from rllm.sdk import get_chat_client_async

# Configure tracer once per process
configure_default_tracer(service_name="my-agent")

llm = get_chat_client_async(
    api_key="sk-...",
    base_url="https://api.openai.com/v1",
)

# Client service
with otel_session(name="client") as client_sess:
    # Make HTTP call to server
    # Session context automatically propagated via W3C baggage headers
    httpx.post("http://server/api", ...)

# Server service
with otel_session(name="handler") as server_sess:
    # Automatically inherits client's session context
    await llm.chat.completions.create(...)
    
    # Access traces (async)
    traces = await server_sess.llm_calls_async()

Cross-Service Context Propagation

OpenTelemetry sessions use W3C baggage for automatic context propagation:
# Service A (client)
with otel_session(experiment="v1", user="alice"):
    # HTTP request automatically includes baggage headers:
    # baggage: rllm-session={"metadata":{"experiment":"v1","user":"alice"},...}
    httpx.post("http://service-b/api", ...)

# Service B (server)
with otel_session(task="processing"):
    # Automatically inherits experiment="v1", user="alice"
    # Merges with task="processing"
    metadata = get_current_metadata()
    print(metadata)  # {"experiment": "v1", "user": "alice", "task": "processing"}
Key Features:
  • Baggage is the single source of truth
  • Automatic HTTP header propagation
  • Compatible with OpenTelemetry observability tools
  • Works across process boundaries

Framework Comparison

FrameworkIntegration MethodSession SupportTrajectory SupportUse Case
LangGraphTracked chat clientMulti-agent workflows
SmolAgentTracked chat clientCode generation agents
StrandsTracked chat clientProduction AI apps
LiteLLM ProxyMetadata routingInfrastructure layer
OpenTelemetryW3C baggageDistributed tracing

Best Practices

  1. Use tracked clients: Always initialize LLM clients with get_chat_client() or get_chat_client_async()
  2. Enable proxy for training: Set use_proxy=True when routing through LiteLLM proxy
  3. Wrap execution in sessions: Use session() context manager to collect traces
  4. Leverage trajectories for RL: Use @trajectory decorator for reinforcement learning workflows
  5. Configure OpenTelemetry for distributed systems: Use otel_session for multi-service architectures
  6. Set metadata strategically: Add experiment tracking metadata at the session level

Example: End-to-End LangGraph Training

Complete example combining LangGraph, LiteLLM proxy, and trajectory tracking:
import asyncio
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, MessagesState, StateGraph

from rllm.sdk import get_chat_client_async, trajectory
from rllm.sdk.session.base import _ensure_tracer_initialized

# Initialize
_ensure_tracer_initialized("rag_agent")
llm = get_chat_client_async(
    base_url="http://localhost:4000/v1",
    api_key="EMPTY",
    use_proxy=True,
)

model = ChatOpenAI(model="gpt-4", async_client=llm)

# Build LangGraph workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", lambda s: {"messages": [model.ainvoke(s["messages"])]})
workflow.add_edge(START, "agent")
workflow.add_edge("agent", END)
graph = workflow.compile()

# Trajectory-tracked execution
@trajectory(name="rag_agent")
async def run_rag_agent(question: str):
    result = await graph.ainvoke(
        {"messages": [{"role": "user", "content": question}]}
    )
    return result["messages"][-1].content

# Execute and collect training data
async def main():
    questions = [
        "What is quantum computing?",
        "Explain neural networks",
        "What is reinforcement learning?",
    ]
    
    trajectories = []
    for q in questions:
        traj = await run_rag_agent(q)
        
        # Assign rewards
        for step in traj.steps:
            step.reward = evaluate_quality(step.output)
        traj.reward = sum(s.reward for s in traj.steps) / len(traj.steps)
        
        trajectories.append(traj)
    
    # Use trajectories for training
    return trajectories

trajectories = asyncio.run(main())
print(f"Collected {len(trajectories)} trajectories for training")

Next Steps