The rapid adoption of Large Language Models has created a new category of production challenges. Moving from a Jupyter notebook running ChatGPT API calls to a production-grade LLM service requires careful attention to infrastructure, cost, latency, and reliability. This post shares patterns and practices for running LLMs at scale.

The Production LLM Stack

A production LLM system consists of multiple layers:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Application Layer                 β”‚
β”‚   (Prompt Engineering, Orchestration)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   LLM Serving Layer                 β”‚
β”‚   (Inference, Caching, Load Balancing)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Model Layer                       β”‚
β”‚   (GPT-4, Claude, Llama, Custom)     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Infrastructure Layer              β”‚
β”‚   (GPUs, Memory, Storage)            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Model Selection and Deployment Strategy

Not all LLM workloads require the largest, most expensive models:

from enum import Enum
from typing import Optional
from dataclasses import dataclass

class ModelSize(Enum):
    SMALL = "small"    # 7B parameters, fast, cheap
    MEDIUM = "medium"  # 13-30B parameters, balanced
    LARGE = "large"    # 70B+ parameters, high quality
    FRONTIER = "frontier"  # GPT-4, Claude 3, highest quality

@dataclass
class ModelConfig:
    name: str
    size: ModelSize
    cost_per_1k_tokens: float
    latency_p99: float  # milliseconds
    context_window: int
    capabilities: list[str]

class ModelRouter:
    """Route requests to appropriate model based on requirements"""

    def __init__(self):
        self.models = {
            ModelSize.SMALL: ModelConfig(
                name="llama-3-7b",
                size=ModelSize.SMALL,
                cost_per_1k_tokens=0.0002,
                latency_p99=500,
                context_window=4096,
                capabilities=["classification", "extraction", "simple_qa"]
            ),
            ModelSize.MEDIUM: ModelConfig(
                name="mixtral-8x7b",
                size=ModelSize.MEDIUM,
                cost_per_1k_tokens=0.002,
                latency_p99=1500,
                context_window=32000,
                capabilities=["reasoning", "code_generation", "summarization"]
            ),
            ModelSize.LARGE: ModelConfig(
                name="llama-3-70b",
                size=ModelSize.LARGE,
                cost_per_1k_tokens=0.01,
                latency_p99=3000,
                context_window=8192,
                capabilities=["complex_reasoning", "creative_writing"]
            ),
            ModelSize.FRONTIER: ModelConfig(
                name="gpt-4-turbo",
                size=ModelSize.FRONTIER,
                cost_per_1k_tokens=0.03,
                latency_p99=5000,
                context_window=128000,
                capabilities=["all"]
            ),
        }

    def select_model(
        self,
        task_type: str,
        max_latency: Optional[float] = None,
        max_cost: Optional[float] = None,
        required_context: int = 4096
    ) -> ModelConfig:
        """Select optimal model for task requirements"""

        candidates = []

        for size, config in self.models.items():
            # Check capability
            if task_type not in config.capabilities and "all" not in config.capabilities:
                continue

            # Check latency constraint
            if max_latency and config.latency_p99 > max_latency:
                continue

            # Check cost constraint
            if max_cost and config.cost_per_1k_tokens > max_cost:
                continue

            # Check context window
            if config.context_window < required_context:
                continue

            candidates.append(config)

        if not candidates:
            raise ValueError("No model satisfies constraints")

        # Return smallest/cheapest model that meets requirements
        return min(candidates, key=lambda c: c.cost_per_1k_tokens)

Efficient Inference with vLLM

For self-hosted models, vLLM provides state-of-the-art inference performance:

from vllm import LLM, SamplingParams
from typing import List, Dict
import asyncio

class LLMInferenceEngine:
    def __init__(self, model_name: str, tensor_parallel_size: int = 1):
        """
        Initialize vLLM with optimizations:
        - PagedAttention for efficient memory
        - Continuous batching for throughput
        - Tensor parallelism for large models
        """
        self.llm = LLM(
            model=model_name,
            tensor_parallel_size=tensor_parallel_size,
            gpu_memory_utilization=0.95,  # Use 95% of GPU memory
            max_num_seqs=256,  # Maximum batch size
            max_model_len=4096,  # Maximum sequence length
        )

    async def generate(
        self,
        prompts: List[str],
        temperature: float = 0.7,
        max_tokens: int = 512,
        top_p: float = 0.9,
    ) -> List[str]:
        """Generate responses with optimal batching"""

        sampling_params = SamplingParams(
            temperature=temperature,
            top_p=top_p,
            max_tokens=max_tokens,
        )

        # vLLM automatically batches and schedules requests
        outputs = self.llm.generate(prompts, sampling_params)

        return [output.outputs[0].text for output in outputs]

class BatchingQueue:
    """Dynamic batching for optimal throughput"""

    def __init__(
        self,
        engine: LLMInferenceEngine,
        max_batch_size: int = 32,
        max_wait_ms: int = 100
    ):
        self.engine = engine
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue: List[tuple[str, asyncio.Future]] = []

    async def enqueue(self, prompt: str) -> str:
        """Add prompt to queue and wait for result"""
        future = asyncio.Future()
        self.queue.append((prompt, future))

        # Trigger batch processing if needed
        if len(self.queue) >= self.max_batch_size:
            asyncio.create_task(self.process_batch())

        return await future

    async def process_batch(self):
        """Process accumulated requests as batch"""
        if not self.queue:
            return

        # Take current batch
        batch = self.queue[:self.max_batch_size]
        self.queue = self.queue[self.max_batch_size:]

        prompts = [p for p, _ in batch]
        futures = [f for _, f in batch]

        try:
            # Generate responses for batch
            responses = await self.engine.generate(prompts)

            # Resolve futures
            for future, response in zip(futures, responses):
                future.set_result(response)

        except Exception as e:
            # Reject all futures on error
            for future in futures:
                future.set_exception(e)

    async def run_batch_processor(self):
        """Background task to process batches periodically"""
        while True:
            await asyncio.sleep(self.max_wait_ms / 1000)
            if self.queue:
                await self.process_batch()

Caching for Cost and Latency Reduction

Caching LLM responses can dramatically reduce costs and latency:

import hashlib
import json
from typing import Optional
import redis.asyncio as redis

class SemanticCache:
    """Cache LLM responses with semantic similarity matching"""

    def __init__(self, redis_client: redis.Redis, embedding_model):
        self.redis = redis_client
        self.embedding_model = embedding_model
        self.similarity_threshold = 0.95

    async def get(self, prompt: str) -> Optional[str]:
        """Get cached response for semantically similar prompt"""

        # Generate embedding for prompt
        prompt_embedding = self.embedding_model.encode(prompt)

        # Search for similar prompts in cache
        # In production, use vector database for this
        cache_key = self._get_cache_key(prompt)

        # Try exact match first (fast path)
        cached = await self.redis.get(f"llm:exact:{cache_key}")
        if cached:
            return cached.decode('utf-8')

        # Semantic search (slower but handles variations)
        similar = await self._find_similar_prompt(prompt_embedding)
        if similar:
            return similar

        return None

    async def set(self, prompt: str, response: str, ttl: int = 3600):
        """Cache response with TTL"""
        cache_key = self._get_cache_key(prompt)

        # Store exact match
        await self.redis.setex(
            f"llm:exact:{cache_key}",
            ttl,
            response
        )

        # Store embedding for semantic search
        embedding = self.embedding_model.encode(prompt)
        await self._store_embedding(cache_key, embedding, response)

    def _get_cache_key(self, prompt: str) -> str:
        """Generate cache key from prompt"""
        # Normalize prompt
        normalized = prompt.lower().strip()

        # Hash for compact key
        return hashlib.sha256(normalized.encode()).hexdigest()[:16]

    async def _find_similar_prompt(self, embedding) -> Optional[str]:
        """Find semantically similar cached prompt"""
        # In production, use vector database
        # This is simplified for illustration
        pass

# Example usage
class CachedLLMService:
    def __init__(self, engine: LLMInferenceEngine, cache: SemanticCache):
        self.engine = engine
        self.cache = cache
        self.cache_hit_rate = 0.0

    async def generate(self, prompt: str) -> str:
        # Try cache first
        cached = await self.cache.get(prompt)
        if cached:
            self.cache_hit_rate = 0.99 * self.cache_hit_rate + 0.01
            return cached

        # Cache miss - generate
        self.cache_hit_rate = 0.99 * self.cache_hit_rate
        response = await self.engine.generate([prompt])

        # Cache for future
        await self.cache.set(prompt, response[0])

        return response[0]

Rate Limiting and Cost Control

Protect against runaway costs:

from datetime import datetime, timedelta
from dataclasses import dataclass
import asyncio

@dataclass
class CostBudget:
    max_hourly_cost: float
    max_daily_cost: float
    max_monthly_cost: float

class CostController:
    def __init__(self, budget: CostBudget, model_costs: Dict[str, float]):
        self.budget = budget
        self.model_costs = model_costs  # Cost per 1K tokens

        self.hourly_spend = 0.0
        self.daily_spend = 0.0
        self.monthly_spend = 0.0

        self.hour_start = datetime.now()
        self.day_start = datetime.now()
        self.month_start = datetime.now()

    async def check_budget(self, model: str, estimated_tokens: int) -> bool:
        """Check if request is within budget"""

        estimated_cost = (estimated_tokens / 1000) * self.model_costs[model]

        # Check hourly budget
        if self.hourly_spend + estimated_cost > self.budget.max_hourly_cost:
            return False

        # Check daily budget
        if self.daily_spend + estimated_cost > self.budget.max_daily_cost:
            return False

        # Check monthly budget
        if self.monthly_spend + estimated_cost > self.budget.max_monthly_cost:
            return False

        return True

    async def record_usage(self, model: str, tokens_used: int):
        """Record actual usage"""

        actual_cost = (tokens_used / 1000) * self.model_costs[model]

        self.hourly_spend += actual_cost
        self.daily_spend += actual_cost
        self.monthly_spend += actual_cost

        # Reset counters if period elapsed
        now = datetime.now()

        if now - self.hour_start > timedelta(hours=1):
            self.hourly_spend = 0.0
            self.hour_start = now

        if now - self.day_start > timedelta(days=1):
            self.daily_spend = 0.0
            self.day_start = now

        if now - self.month_start > timedelta(days=30):
            self.monthly_spend = 0.0
            self.month_start = now

class RateLimitedLLMService:
    def __init__(
        self,
        engine: LLMInferenceEngine,
        cost_controller: CostController,
        max_concurrent_requests: int = 100
    ):
        self.engine = engine
        self.cost_controller = cost_controller
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)

    async def generate(self, prompt: str, model: str) -> str:
        # Estimate token count (rough approximation)
        estimated_tokens = len(prompt) // 4 + 512  # Input + output estimate

        # Check budget
        if not await self.cost_controller.check_budget(model, estimated_tokens):
            raise BudgetExceededError("Request would exceed budget")

        # Acquire semaphore (limit concurrent requests)
        async with self.semaphore:
            response = await self.engine.generate([prompt])

            # Record actual usage
            actual_tokens = self.count_tokens(prompt + response[0])
            await self.cost_controller.record_usage(model, actual_tokens)

            return response[0]

    def count_tokens(self, text: str) -> int:
        # Use proper tokenizer in production
        return len(text) // 4

Monitoring and Observability

Track LLM-specific metrics:

from prometheus_client import Counter, Histogram, Gauge

class LLMMetrics:
    def __init__(self):
        # Request metrics
        self.requests_total = Counter(
            'llm_requests_total',
            'Total LLM requests',
            ['model', 'status']
        )

        # Latency metrics
        self.latency = Histogram(
            'llm_latency_seconds',
            'LLM inference latency',
            ['model'],
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
        )

        self.time_to_first_token = Histogram(
            'llm_ttft_seconds',
            'Time to first token',
            ['model'],
            buckets=[0.1, 0.2, 0.5, 1.0, 2.0]
        )

        # Token metrics
        self.tokens_generated = Counter(
            'llm_tokens_total',
            'Total tokens generated',
            ['model', 'type']  # type: input or output
        )

        # Cost metrics
        self.estimated_cost = Counter(
            'llm_estimated_cost_dollars',
            'Estimated cost in dollars',
            ['model']
        )

        # Quality metrics
        self.cache_hit_rate = Gauge(
            'llm_cache_hit_rate',
            'Cache hit rate'
        )

        # Resource metrics
        self.gpu_utilization = Gauge(
            'llm_gpu_utilization',
            'GPU utilization percentage',
            ['gpu_id']
        )

        self.gpu_memory_used = Gauge(
            'llm_gpu_memory_bytes',
            'GPU memory used in bytes',
            ['gpu_id']
        )

    def record_request(
        self,
        model: str,
        latency: float,
        ttft: float,
        input_tokens: int,
        output_tokens: int,
        cost: float,
        success: bool
    ):
        status = 'success' if success else 'error'

        self.requests_total.labels(model=model, status=status).inc()
        self.latency.labels(model=model).observe(latency)
        self.time_to_first_token.labels(model=model).observe(ttft)
        self.tokens_generated.labels(model=model, type='input').inc(input_tokens)
        self.tokens_generated.labels(model=model, type='output').inc(output_tokens)
        self.estimated_cost.labels(model=model).inc(cost)

Error Handling and Retries

LLMs can fail in various waysβ€”handle gracefully:

from tenacity import retry, stop_after_attempt, wait_exponential
import logging

class LLMError(Exception):
    pass

class RateLimitError(LLMError):
    pass

class ModelOverloadedError(LLMError):
    pass

class InvalidResponseError(LLMError):
    pass

class ResilientLLMService:
    def __init__(self, primary_engine, fallback_engine=None):
        self.primary = primary_engine
        self.fallback = fallback_engine

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=lambda e: isinstance(e, (RateLimitError, ModelOverloadedError))
    )
    async def generate_with_retry(self, prompt: str) -> str:
        """Generate with automatic retries on transient failures"""

        try:
            return await self.primary.generate([prompt])

        except RateLimitError:
            logging.warning("Rate limit hit, retrying...")
            raise

        except ModelOverloadedError:
            logging.warning("Model overloaded, retrying...")
            raise

        except InvalidResponseError as e:
            # Don't retry on invalid responses
            logging.error(f"Invalid response: {e}")
            raise

    async def generate_with_fallback(self, prompt: str) -> str:
        """Generate with fallback to alternative model"""

        try:
            return await self.generate_with_retry(prompt)

        except Exception as e:
            if self.fallback:
                logging.warning(f"Primary failed: {e}, trying fallback")
                return await self.fallback.generate([prompt])
            raise

Conclusion

Running LLMs in production requires careful attention to:

  1. Model Selection - Right-size for task requirements
  2. Inference Optimization - vLLM, batching, caching
  3. Cost Control - Budgets, rate limiting, caching
  4. Reliability - Retries, fallbacks, circuit breakers
  5. Observability - Track latency, cost, quality metrics
  6. Security - Input validation, output filtering, PII protection

The key to success: Start simple, measure everything, optimize incrementally. Don’t over-engineer upfront, but build with production requirements in mind from day one.

LLMs are powerful but operationally complex. Treat them with the same rigor as any other production systemβ€”monitoring, testing, capacity planning, and incident response all apply.