Ray for LLM Inference
Preface
Ray is a distributed execution engine. Its job is to take a messy cluster of machines and make it feel like one giant computer.
- The head node is the control plane. It tracks resources, schedules actors, and handles autoscaling.
- Worker nodes are the execution plane. Each contributes CPUs, GPUs, and memory into a cluster-wide pool.
- A shared object store lets actors pass results around without you writing sockets or wiring Kafka/Redis.
From Python, you just ask for resources (num_cpus=4, num_gpus=1), and Ray figures out where to run your code. That’s why it works as the backbone for pipelines, training jobs, and inference services.
Why Not Just FastAPI (or Kubernetes)?
If you’ve shipped services with FastAPI or Kubernetes, the gaps are familiar:
FastAPI
- Stateless — nothing persists across requests.
- No cluster view — every process is isolated.
- No resource awareness — you pin workers to CPUs/GPUs yourself.
- No backpressure — concurrency blows up latency tails.
Kubernetes
- YAML-first — deployments, HPA rules, crash loops.
- Resources are generic labels — not first-class CPU/GPU scheduling.
- No shared memory — you need Redis, Kafka, or S3 for data passing.
- Long-lived workers? Still your responsibility to manage.
What Ray Actually Gives `You
Ray closes those gaps in Python:
- Actors → long-lived workers that persist across calls.
- Placement groups → smart scheduling across CPUs + GPUs.
- Shared object store → cluster-wide memory for passing results.
- Unified cluster view → one pool of CPUs + GPUs.
- Autoscaling & backpressure → scale with traffic, stay fair under load.
That’s the core difference: Ray is a Python-native control plane for distributed workloads.
Failure Modes
- Preemption → VM disappears, Ray reschedules actors.
- OOM → actor dies, Ray restarts it, other replicas keep serving.
- Cold start → Ray can pre-warm or scale gradually.
- Tail latency → Serve router queues requests fairly.
- Failures aren’t eliminated, but they’re contained.
Example: Ray Data pipeline
What We'll Do
Let’s build a simple data processing pipeline that looks like what you’d do before training or serving an LLM:
- Tokenize raw text into tokens.
- Convert tokens to IDs using a vocabulary.
- Generate embeddings (toy example, just computing vector norms).
We’ll use both map (per-row) and map_batches (per-batch) along the way, and see how Ray automatically shards the dataset across the cluster so tasks run in parallel.
Example
import ray, numpy as np
from typing import Dict, List
ray.init()
# 0) Create dataset (Ray splits into blocks automatically)
docs = [{"id": i, "text": f"doc {i}"} for i in range(200_000)]
ds = ray.data.from_items(docs)
print("Initial blocks:", ds.num_blocks())
# 1) Per-row transform
def tokenize(r: Dict) -> Dict:
r["tokens"] = r["text"].split()
return r
ds = ds.map(tokenize)
# 2) Batch transform
VOCAB = {"doc": 1, **{str(i): i+2 for i in range(200_000)}}
def to_ids(batch: List[Dict]) -> List[Dict]:
for r in batch:
r["ids"] = [VOCAB.get(tok, 0) for tok in r["tokens"]]
return batch
ds = ds.map_batches(to_ids, batch_size=4096)
# 3) Batch embedding (toy example)
def embed(batch: List[Dict]) -> List[Dict]:
for r in batch:
arr = np.asarray(r["ids"], dtype=np.float32)
r["emb"] = float(np.linalg.norm(arr))
return batch
ds = ds.map_batches(embed, batch_size=4096)
print("Final blocks:", ds.num_blocks())
print(ds.take(3))
How It Works
- Sharding: Ray automatically splits the dataset into blocks (shards). Each block is stored in a worker’s object store.
- Execution: For every operator (map, map_batches), Ray spawns per-block tasks and runs them on the nodes that hold the data (data locality).
- Parallelism: More blocks = more tasks. If you add worker nodes, Ray spreads blocks across them automatically — your code doesn’t change.
- Why map_batches: Each worker processes a batch slice at once, which cuts overhead and lets you use vectorized libraries like NumPy or Pandas.
LLM Inference with Ray Serve
For production inference, you need a router, replicas, autoscaling, and fairness. That’s what Ray Serve adds.
import ray
from ray import serve
from transformers import pipeline
ray.init(address="auto")
serve.start(detached=True)
@serve.deployment(num_replicas=2, ray_actor_options={"num_gpus": 1})
class LLMWorker:
def __init__(self):
self.pipe = pipeline("text-generation", model="gpt2")
async def __call__(self, request):
body = await request.json()
prompt = body.get("prompt", "")
out = self.pipe(prompt, max_new_tokens=64)[0]["generated_text"]
return {"output": out}
LLMWorker.deploy()
Serve gives you /LLMWorker
on port 8000, with a proxy router balancing requests across replicas.
Ray Serve with vLLM
Below is a minimal, production-shaped pattern: Ray Serve routes requests across replicas, each replica hosts a vLLM engine bound to a GPU. vLLM handles dynamic batching/prefill/streaming; Ray gives you horizontal scale, autoscaling, health checks, and colocated pre/post-processing.
import asyncio
from typing import List
import ray
from ray import serve
# vLLM imports
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import EngineArgs
ray.init() # or ray.init(address="auto")
serve.start(detached=True) # Serve controller on the cluster
MODEL_NAME = "Qwen2.5-7B-Instruct" # pick any vLLM-supported model
SAMPLING = SamplingParams(temperature=0.2, max_tokens=256)
@serve.deployment(
num_replicas=2,
ray_actor_options={"num_gpus": 1},
route_prefix="/generate",
)
class VLLMApp:
def __init__(self):
engine_args = EngineArgs(
model=MODEL_NAME,
tensor_parallel_size=1,
max_num_seqs=512,
gpu_memory_utilization=0.9,
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
async def __call__(self, request):
body = await request.json()
prompts: List[str] = body.get("prompts", [])
results = await self._generate(prompts)
return {"outputs": results}
async def _generate(self, prompts: List[str]):
gens = await asyncio.gather(
*[self.engine.generate(prompt, SAMPLING) for prompt in prompts]
)
return [g.outputs[0].text for g in gens]
VLLMApp.deploy()
Why this combo rocks
- Two-level performance:
- vLLM maximizes a single GPU with paged attention, async scheduling, and dynamic batching.
- Ray Serve spreads replicas across many nodes/GPUs and handles routing, autoscaling, retries.
- Simple scale-up: Increase num_replicas, or spin more GPU nodes—no app rewrite.
- Colocation: Pre/post-processing, embeddings, or tool calls can run in Ray actors near your vLLM replicas to cut network overhead.
- Observability & backpressure: Ray gives you metrics, health checks, and queueing while vLLM tunes batch sizes to keep GPUs hot.
Benchmarks
Will be published once it's ready.
What's next
We've covered inference, now's the time to cover training!