The rapid adoption of Large Language Models has created a new category of production challenges. Moving from a Jupyter notebook running ChatGPT API calls to a production-grade LLM service requires careful attention to infrastructure, cost, latency, and reliability. This post shares patterns and practices for running LLMs at scale.
The Production LLM Stack
A production LLM system consists of multiple layers:
βββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β (Prompt Engineering, Orchestration)β
βββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββ
β LLM Serving Layer β
β (Inference, Caching, Load Balancing)β
βββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββ
β Model Layer β
β (GPT-4, Claude, Llama, Custom) β
βββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββ
β Infrastructure Layer β
β (GPUs, Memory, Storage) β
βββββββββββββββββββββββββββββββββββββββ
Model Selection and Deployment Strategy
Not all LLM workloads require the largest, most expensive models:
from enum import Enum
from typing import Optional
from dataclasses import dataclass
class ModelSize(Enum):
SMALL = "small" # 7B parameters, fast, cheap
MEDIUM = "medium" # 13-30B parameters, balanced
LARGE = "large" # 70B+ parameters, high quality
FRONTIER = "frontier" # GPT-4, Claude 3, highest quality
@dataclass
class ModelConfig:
name: str
size: ModelSize
cost_per_1k_tokens: float
latency_p99: float # milliseconds
context_window: int
capabilities: list[str]
class ModelRouter:
"""Route requests to appropriate model based on requirements"""
def __init__(self):
self.models = {
ModelSize.SMALL: ModelConfig(
name="llama-3-7b",
size=ModelSize.SMALL,
cost_per_1k_tokens=0.0002,
latency_p99=500,
context_window=4096,
capabilities=["classification", "extraction", "simple_qa"]
),
ModelSize.MEDIUM: ModelConfig(
name="mixtral-8x7b",
size=ModelSize.MEDIUM,
cost_per_1k_tokens=0.002,
latency_p99=1500,
context_window=32000,
capabilities=["reasoning", "code_generation", "summarization"]
),
ModelSize.LARGE: ModelConfig(
name="llama-3-70b",
size=ModelSize.LARGE,
cost_per_1k_tokens=0.01,
latency_p99=3000,
context_window=8192,
capabilities=["complex_reasoning", "creative_writing"]
),
ModelSize.FRONTIER: ModelConfig(
name="gpt-4-turbo",
size=ModelSize.FRONTIER,
cost_per_1k_tokens=0.03,
latency_p99=5000,
context_window=128000,
capabilities=["all"]
),
}
def select_model(
self,
task_type: str,
max_latency: Optional[float] = None,
max_cost: Optional[float] = None,
required_context: int = 4096
) -> ModelConfig:
"""Select optimal model for task requirements"""
candidates = []
for size, config in self.models.items():
# Check capability
if task_type not in config.capabilities and "all" not in config.capabilities:
continue
# Check latency constraint
if max_latency and config.latency_p99 > max_latency:
continue
# Check cost constraint
if max_cost and config.cost_per_1k_tokens > max_cost:
continue
# Check context window
if config.context_window < required_context:
continue
candidates.append(config)
if not candidates:
raise ValueError("No model satisfies constraints")
# Return smallest/cheapest model that meets requirements
return min(candidates, key=lambda c: c.cost_per_1k_tokens)
Efficient Inference with vLLM
For self-hosted models, vLLM provides state-of-the-art inference performance:
from vllm import LLM, SamplingParams
from typing import List, Dict
import asyncio
class LLMInferenceEngine:
def __init__(self, model_name: str, tensor_parallel_size: int = 1):
"""
Initialize vLLM with optimizations:
- PagedAttention for efficient memory
- Continuous batching for throughput
- Tensor parallelism for large models
"""
self.llm = LLM(
model=model_name,
tensor_parallel_size=tensor_parallel_size,
gpu_memory_utilization=0.95, # Use 95% of GPU memory
max_num_seqs=256, # Maximum batch size
max_model_len=4096, # Maximum sequence length
)
async def generate(
self,
prompts: List[str],
temperature: float = 0.7,
max_tokens: int = 512,
top_p: float = 0.9,
) -> List[str]:
"""Generate responses with optimal batching"""
sampling_params = SamplingParams(
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
)
# vLLM automatically batches and schedules requests
outputs = self.llm.generate(prompts, sampling_params)
return [output.outputs[0].text for output in outputs]
class BatchingQueue:
"""Dynamic batching for optimal throughput"""
def __init__(
self,
engine: LLMInferenceEngine,
max_batch_size: int = 32,
max_wait_ms: int = 100
):
self.engine = engine
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue: List[tuple[str, asyncio.Future]] = []
async def enqueue(self, prompt: str) -> str:
"""Add prompt to queue and wait for result"""
future = asyncio.Future()
self.queue.append((prompt, future))
# Trigger batch processing if needed
if len(self.queue) >= self.max_batch_size:
asyncio.create_task(self.process_batch())
return await future
async def process_batch(self):
"""Process accumulated requests as batch"""
if not self.queue:
return
# Take current batch
batch = self.queue[:self.max_batch_size]
self.queue = self.queue[self.max_batch_size:]
prompts = [p for p, _ in batch]
futures = [f for _, f in batch]
try:
# Generate responses for batch
responses = await self.engine.generate(prompts)
# Resolve futures
for future, response in zip(futures, responses):
future.set_result(response)
except Exception as e:
# Reject all futures on error
for future in futures:
future.set_exception(e)
async def run_batch_processor(self):
"""Background task to process batches periodically"""
while True:
await asyncio.sleep(self.max_wait_ms / 1000)
if self.queue:
await self.process_batch()
Caching for Cost and Latency Reduction
Caching LLM responses can dramatically reduce costs and latency:
import hashlib
import json
from typing import Optional
import redis.asyncio as redis
class SemanticCache:
"""Cache LLM responses with semantic similarity matching"""
def __init__(self, redis_client: redis.Redis, embedding_model):
self.redis = redis_client
self.embedding_model = embedding_model
self.similarity_threshold = 0.95
async def get(self, prompt: str) -> Optional[str]:
"""Get cached response for semantically similar prompt"""
# Generate embedding for prompt
prompt_embedding = self.embedding_model.encode(prompt)
# Search for similar prompts in cache
# In production, use vector database for this
cache_key = self._get_cache_key(prompt)
# Try exact match first (fast path)
cached = await self.redis.get(f"llm:exact:{cache_key}")
if cached:
return cached.decode('utf-8')
# Semantic search (slower but handles variations)
similar = await self._find_similar_prompt(prompt_embedding)
if similar:
return similar
return None
async def set(self, prompt: str, response: str, ttl: int = 3600):
"""Cache response with TTL"""
cache_key = self._get_cache_key(prompt)
# Store exact match
await self.redis.setex(
f"llm:exact:{cache_key}",
ttl,
response
)
# Store embedding for semantic search
embedding = self.embedding_model.encode(prompt)
await self._store_embedding(cache_key, embedding, response)
def _get_cache_key(self, prompt: str) -> str:
"""Generate cache key from prompt"""
# Normalize prompt
normalized = prompt.lower().strip()
# Hash for compact key
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
async def _find_similar_prompt(self, embedding) -> Optional[str]:
"""Find semantically similar cached prompt"""
# In production, use vector database
# This is simplified for illustration
pass
# Example usage
class CachedLLMService:
def __init__(self, engine: LLMInferenceEngine, cache: SemanticCache):
self.engine = engine
self.cache = cache
self.cache_hit_rate = 0.0
async def generate(self, prompt: str) -> str:
# Try cache first
cached = await self.cache.get(prompt)
if cached:
self.cache_hit_rate = 0.99 * self.cache_hit_rate + 0.01
return cached
# Cache miss - generate
self.cache_hit_rate = 0.99 * self.cache_hit_rate
response = await self.engine.generate([prompt])
# Cache for future
await self.cache.set(prompt, response[0])
return response[0]
Rate Limiting and Cost Control
Protect against runaway costs:
from datetime import datetime, timedelta
from dataclasses import dataclass
import asyncio
@dataclass
class CostBudget:
max_hourly_cost: float
max_daily_cost: float
max_monthly_cost: float
class CostController:
def __init__(self, budget: CostBudget, model_costs: Dict[str, float]):
self.budget = budget
self.model_costs = model_costs # Cost per 1K tokens
self.hourly_spend = 0.0
self.daily_spend = 0.0
self.monthly_spend = 0.0
self.hour_start = datetime.now()
self.day_start = datetime.now()
self.month_start = datetime.now()
async def check_budget(self, model: str, estimated_tokens: int) -> bool:
"""Check if request is within budget"""
estimated_cost = (estimated_tokens / 1000) * self.model_costs[model]
# Check hourly budget
if self.hourly_spend + estimated_cost > self.budget.max_hourly_cost:
return False
# Check daily budget
if self.daily_spend + estimated_cost > self.budget.max_daily_cost:
return False
# Check monthly budget
if self.monthly_spend + estimated_cost > self.budget.max_monthly_cost:
return False
return True
async def record_usage(self, model: str, tokens_used: int):
"""Record actual usage"""
actual_cost = (tokens_used / 1000) * self.model_costs[model]
self.hourly_spend += actual_cost
self.daily_spend += actual_cost
self.monthly_spend += actual_cost
# Reset counters if period elapsed
now = datetime.now()
if now - self.hour_start > timedelta(hours=1):
self.hourly_spend = 0.0
self.hour_start = now
if now - self.day_start > timedelta(days=1):
self.daily_spend = 0.0
self.day_start = now
if now - self.month_start > timedelta(days=30):
self.monthly_spend = 0.0
self.month_start = now
class RateLimitedLLMService:
def __init__(
self,
engine: LLMInferenceEngine,
cost_controller: CostController,
max_concurrent_requests: int = 100
):
self.engine = engine
self.cost_controller = cost_controller
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
async def generate(self, prompt: str, model: str) -> str:
# Estimate token count (rough approximation)
estimated_tokens = len(prompt) // 4 + 512 # Input + output estimate
# Check budget
if not await self.cost_controller.check_budget(model, estimated_tokens):
raise BudgetExceededError("Request would exceed budget")
# Acquire semaphore (limit concurrent requests)
async with self.semaphore:
response = await self.engine.generate([prompt])
# Record actual usage
actual_tokens = self.count_tokens(prompt + response[0])
await self.cost_controller.record_usage(model, actual_tokens)
return response[0]
def count_tokens(self, text: str) -> int:
# Use proper tokenizer in production
return len(text) // 4
Monitoring and Observability
Track LLM-specific metrics:
from prometheus_client import Counter, Histogram, Gauge
class LLMMetrics:
def __init__(self):
# Request metrics
self.requests_total = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
# Latency metrics
self.latency = Histogram(
'llm_latency_seconds',
'LLM inference latency',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
self.time_to_first_token = Histogram(
'llm_ttft_seconds',
'Time to first token',
['model'],
buckets=[0.1, 0.2, 0.5, 1.0, 2.0]
)
# Token metrics
self.tokens_generated = Counter(
'llm_tokens_total',
'Total tokens generated',
['model', 'type'] # type: input or output
)
# Cost metrics
self.estimated_cost = Counter(
'llm_estimated_cost_dollars',
'Estimated cost in dollars',
['model']
)
# Quality metrics
self.cache_hit_rate = Gauge(
'llm_cache_hit_rate',
'Cache hit rate'
)
# Resource metrics
self.gpu_utilization = Gauge(
'llm_gpu_utilization',
'GPU utilization percentage',
['gpu_id']
)
self.gpu_memory_used = Gauge(
'llm_gpu_memory_bytes',
'GPU memory used in bytes',
['gpu_id']
)
def record_request(
self,
model: str,
latency: float,
ttft: float,
input_tokens: int,
output_tokens: int,
cost: float,
success: bool
):
status = 'success' if success else 'error'
self.requests_total.labels(model=model, status=status).inc()
self.latency.labels(model=model).observe(latency)
self.time_to_first_token.labels(model=model).observe(ttft)
self.tokens_generated.labels(model=model, type='input').inc(input_tokens)
self.tokens_generated.labels(model=model, type='output').inc(output_tokens)
self.estimated_cost.labels(model=model).inc(cost)
Error Handling and Retries
LLMs can fail in various waysβhandle gracefully:
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
class LLMError(Exception):
pass
class RateLimitError(LLMError):
pass
class ModelOverloadedError(LLMError):
pass
class InvalidResponseError(LLMError):
pass
class ResilientLLMService:
def __init__(self, primary_engine, fallback_engine=None):
self.primary = primary_engine
self.fallback = fallback_engine
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=lambda e: isinstance(e, (RateLimitError, ModelOverloadedError))
)
async def generate_with_retry(self, prompt: str) -> str:
"""Generate with automatic retries on transient failures"""
try:
return await self.primary.generate([prompt])
except RateLimitError:
logging.warning("Rate limit hit, retrying...")
raise
except ModelOverloadedError:
logging.warning("Model overloaded, retrying...")
raise
except InvalidResponseError as e:
# Don't retry on invalid responses
logging.error(f"Invalid response: {e}")
raise
async def generate_with_fallback(self, prompt: str) -> str:
"""Generate with fallback to alternative model"""
try:
return await self.generate_with_retry(prompt)
except Exception as e:
if self.fallback:
logging.warning(f"Primary failed: {e}, trying fallback")
return await self.fallback.generate([prompt])
raise
Conclusion
Running LLMs in production requires careful attention to:
- Model Selection - Right-size for task requirements
- Inference Optimization - vLLM, batching, caching
- Cost Control - Budgets, rate limiting, caching
- Reliability - Retries, fallbacks, circuit breakers
- Observability - Track latency, cost, quality metrics
- Security - Input validation, output filtering, PII protection
The key to success: Start simple, measure everything, optimize incrementally. Donβt over-engineer upfront, but build with production requirements in mind from day one.
LLMs are powerful but operationally complex. Treat them with the same rigor as any other production systemβmonitoring, testing, capacity planning, and incident response all apply.