Overview
rLLM uses Ray and VERL for distributed training, enabling you to scale experiments across multiple GPUs and nodes. This guide covers configuration, resource allocation, and best practices for distributed training.
Training Architecture
rLLM’s distributed training separates computation into specialized workers:
- Actor: Runs policy model for generation
- Critic: Runs value model (for actor-critic methods)
- Rollout Workers: Execute environment interactions in parallel
- Trainer: Performs gradient updates
Each component can be independently scaled based on your workload.
Basic Configuration
Single-GPU Training
Default configuration for single-GPU setup:
import hydra
from rllm.trainer.agent_trainer import AgentTrainer
from rllm.data.dataset import DatasetRegistry
@hydra.main(
config_path="pkg://rllm.trainer.config",
config_name="agent_ppo_trainer",
version_base=None
)
def main(config):
train_dataset = DatasetRegistry.load_dataset("gsm8k", "train")
test_dataset = DatasetRegistry.load_dataset("gsm8k", "test")
trainer = AgentTrainer(
agent_class=MathAgent,
agent_args={},
env_class=SingleTurnEnvironment,
env_args={"reward_fn": math_reward_fn},
config=config,
train_dataset=train_dataset,
val_dataset=test_dataset,
)
trainer.train()
if __name__ == "__main__":
main()
From examples/gsm8k_lora/train_gsm8k_with_lora.py:10.
Configuration Files
rLLM uses Hydra for configuration management. The default config is at:
# rllm/trainer/config/agent_ppo_trainer.yaml
defaults:
- ppo_trainer
- _self_
actor_rollout_ref:
rollout:
mode: async
agent:
num_workers: 0
val_kwargs:
do_sample: True
data:
gen_batch_size: ${mul:${data.train_batch_size},${rllm.rejection_sample.multiplier}}
return_multi_modal_inputs: False
rllm:
agent:
name: math_agent
max_steps: 20
trajectory_timeout: null
agent_args: {}
engine_args: {}
workflow:
use_workflow: False
n_parallel_tasks: 256
retry_limit: 3
trainer:
log_episodes: false
episode_log_dir: logs/${trainer.project_name}/${trainer.experiment_name}
From rllm/trainer/config/agent_ppo_trainer.yaml:1.
Scaling Configuration
Set batch sizes
Configure batch sizes for throughput:@hydra.main(
config_path="pkg://rllm.trainer.config",
config_name="agent_ppo_trainer"
)
def main(config):
# Override batch sizes
config.data.train_batch_size = 128
config.data.val_batch_size = 64
trainer = AgentTrainer(
agent_class=MyAgent,
env_class=MyEnv,
config=config,
train_dataset=train_dataset
)
trainer.train()
Or via command line:python train.py data.train_batch_size=128 data.val_batch_size=64
Configure parallelism
Set the number of parallel tasks:config.rllm.workflow.n_parallel_tasks = 256 # Parallel rollouts
config.actor_rollout_ref.rollout.agent.num_workers = 4 # Rollout workers
Higher values increase throughput but require more memory. Set GPU allocation
Configure GPU resources in the base VERL config:# Inherited from verl.trainer.config.ppo_trainer
actor_rollout_ref:
actor:
model:
path: meta-llama/Llama-3.1-8B-Instruct
ppo_mini_batch_size: 256
ppo_micro_batch_size: 64
fsdp_config:
param_offload: false
grad_offload: false
optimizer_offload: false
rollout:
log_prob_micro_batch_size: 256
tensor_model_parallel_size: 1
critic:
model:
enable_gradient_checkpointing: true
optim:
lr: 1e-6
Enable multi-node training
For multi-node setups, configure Ray cluster:import ray
# On head node
ray.init(address="auto")
# On worker nodes
ray.init(address="ray://<head-node-ip>:10001")
Then run training as usual. Ray will distribute workers automatically.
Configuration Overrides
You can override config values in multiple ways:
Python Dictionary
config_overrides = {
"data.train_batch_size": 256,
"rllm.workflow.n_parallel_tasks": 512,
"actor_rollout_ref.actor.ppo_mini_batch_size": 128,
}
trainer = AgentTrainer(
agent_class=MyAgent,
env_class=MyEnv,
config=config_overrides,
train_dataset=train_dataset
)
Command Line Arguments
python train.py \
data.train_batch_size=256 \
rllm.workflow.n_parallel_tasks=512 \
actor_rollout_ref.actor.ppo_mini_batch_size=128
Config List
config_overrides = [
"data.train_batch_size=256",
"rllm.workflow.n_parallel_tasks=512",
]
trainer = AgentTrainer(
agent_class=MyAgent,
env_class=MyEnv,
config=config_overrides,
train_dataset=train_dataset
)
Ray Resource Management
Monitoring Resources
View Ray dashboard:
# Ray dashboard runs on http://localhost:8265 by default
ray start --head --dashboard-host=0.0.0.0
Resource Allocation
Control GPU placement:
# Set environment variable before importing Ray
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" # Use GPUs 0-3
import ray
ray.init(num_gpus=4)
Memory Management
For large models, enable offloading:
actor_rollout_ref:
actor:
fsdp_config:
param_offload: true # Offload parameters to CPU
grad_offload: true # Offload gradients to CPU
optimizer_offload: true # Offload optimizer states to CPU
Enabling offloading reduces memory usage but increases training time. Use it only when you run out of GPU memory.
Rollout Parallelism
Increase parallel tasks for faster data collection:
rllm:
workflow:
n_parallel_tasks: 512 # Higher = more parallelism
Rule of thumb: n_parallel_tasks ≈ batch_size / avg_trajectory_length
Async Rollouts
Enable asynchronous rollout mode:
actor_rollout_ref:
rollout:
mode: async # vs 'sync'
Async mode overlaps generation and environment execution for better GPU utilization.
Gradient Accumulation
For large batch sizes with limited memory:
actor_rollout_ref:
actor:
ppo_mini_batch_size: 256 # Total batch size
ppo_micro_batch_size: 64 # Per-GPU batch size
Gradient accumulation steps = ppo_mini_batch_size / ppo_micro_batch_size
Model Parallelism
For models that don’t fit on a single GPU:
actor_rollout_ref:
rollout:
tensor_model_parallel_size: 2 # Split model across 2 GPUs
Common Patterns
Multi-GPU Training (Single Node)
# 4 GPUs, batch size 256
data:
train_batch_size: 256
actor_rollout_ref:
actor:
ppo_mini_batch_size: 256
ppo_micro_batch_size: 64 # 64 per GPU
rollout:
tensor_model_parallel_size: 1
rllm:
workflow:
n_parallel_tasks: 256
Large Model Training (70B+)
actor_rollout_ref:
actor:
model:
path: meta-llama/Llama-3.1-70B-Instruct
ppo_micro_batch_size: 16 # Smaller batches
fsdp_config:
param_offload: true
grad_offload: true
rollout:
tensor_model_parallel_size: 4 # Split across 4 GPUs
log_prob_micro_batch_size: 64
rllm:
workflow:
n_parallel_tasks: 128 # Reduce for memory
High-Throughput Training
# Optimize for speed over memory
data:
train_batch_size: 512
actor_rollout_ref:
actor:
ppo_mini_batch_size: 512
ppo_micro_batch_size: 128
rollout:
mode: async
log_prob_micro_batch_size: 512
rllm:
workflow:
n_parallel_tasks: 1024 # High parallelism
Troubleshooting
Out of Memory (OOM)
- Reduce
ppo_micro_batch_size
- Enable gradient checkpointing:
enable_gradient_checkpointing: true
- Enable offloading:
param_offload: true
- Reduce
n_parallel_tasks
- Use tensor parallelism:
tensor_model_parallel_size: 2
Slow Training
- Increase
n_parallel_tasks for better GPU utilization
- Enable async rollouts:
mode: async
- Increase
ppo_micro_batch_size if you have memory
- Check Ray dashboard for bottlenecks
- Disable offloading if not needed
Ray Connection Issues
# Increase timeout for slow networks
import ray
ray.init(address="auto", _redis_max_retries=10)
Distributed Training Not Working
- Ensure all nodes can reach each other
- Check firewall settings (Ray uses ports 6379, 8265, 10001)
- Verify same Ray version on all nodes
- Check Ray dashboard for node status
The from_dict method in your environment and agent classes must be properly implemented for distributed training to work. Ray serializes and deserializes these objects across workers.
Best Practices
- Start small: Test on single GPU before scaling
- Monitor metrics: Use Ray dashboard and TensorBoard
- Profile first: Identify bottlenecks before optimizing
- Batch size scaling: Increase batch size with number of GPUs
- Checkpoint frequently: Distributed training can be unstable
- Test serialization: Ensure
from_dict works correctly
Next Steps