Real-time AI applications demand single-digit millisecond latency. This post explores techniques for achieving ultra-low latency inference at scale, from model optimization to serving infrastructure.

Latency Budget Breakdown

Understanding where time goes in an AI request:

Total Latency (10ms target):
├── Network (2ms)
│   ├── Client → Load Balancer: 0.5ms
│   ├── Load Balancer → Service: 0.5ms
│   └── Response return: 1ms
├── Preprocessing (1ms)
│   ├── Input validation: 0.2ms
│   ├── Tokenization/normalization: 0.5ms
│   └── Batching: 0.3ms
├── Inference (5ms)
│   ├── Model execution: 4.5ms
│   └── Post-processing: 0.5ms
└── Overhead (2ms)
    ├── Serialization: 0.5ms
    ├── Logging/metrics: 0.5ms
    └── Buffer: 1ms

Dynamic Batching

Maximize throughput while meeting latency SLAs:

import asyncio
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
import time
import numpy as np

@dataclass
class InferenceRequest:
    id: str
    input_data: Any
    future: asyncio.Future
    arrival_time: float
    deadline: float

class DynamicBatcher:
    """Dynamic batching with latency awareness"""

    def __init__(
        self,
        model_runner: Callable,
        max_batch_size: int = 32,
        max_wait_ms: float = 2.0,
        target_latency_ms: float = 10.0
    ):
        self.model_runner = model_runner
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms / 1000  # Convert to seconds
        self.target_latency_ms = target_latency_ms / 1000

        self.pending_requests: List[InferenceRequest] = []
        self.processing = False

        # Adaptive parameters
        self.recent_latencies = []
        self.current_batch_size = max_batch_size

    async def infer(
        self,
        input_data: Any,
        deadline_ms: Optional[float] = None
    ) -> Any:
        """Submit inference request with deadline"""

        request_id = str(uuid.uuid4())
        arrival_time = time.time()

        # Calculate deadline
        if deadline_ms:
            deadline = arrival_time + (deadline_ms / 1000)
        else:
            deadline = arrival_time + self.target_latency_ms

        # Create future for result
        future = asyncio.Future()

        # Create request
        request = InferenceRequest(
            id=request_id,
            input_data=input_data,
            future=future,
            arrival_time=arrival_time,
            deadline=deadline
        )

        # Add to pending queue
        self.pending_requests.append(request)

        # Trigger batch processing if needed
        if self._should_process_batch():
            asyncio.create_task(self._process_batch())

        # Wait for result
        try:
            result = await future
            return result
        except asyncio.TimeoutError:
            raise TimeoutError("Inference deadline exceeded")

    def _should_process_batch(self) -> bool:
        """Decide if batch should be processed now"""

        if not self.pending_requests:
            return False

        # Process if batch is full
        if len(self.pending_requests) >= self.current_batch_size:
            return True

        # Process if oldest request approaching deadline
        oldest = self.pending_requests[0]
        time_remaining = oldest.deadline - time.time()

        # Leave enough time for inference
        inference_budget = 0.005  # 5ms for inference

        if time_remaining <= inference_budget:
            return True

        # Process if waited long enough
        wait_time = time.time() - oldest.arrival_time
        if wait_time >= self.max_wait_ms:
            return True

        return False

    async def _process_batch(self):
        """Process accumulated batch"""

        if self.processing or not self.pending_requests:
            return

        self.processing = True

        try:
            # Extract batch
            batch = self.pending_requests[:self.current_batch_size]
            self.pending_requests = self.pending_requests[self.current_batch_size:]

            # Prepare batch input
            batch_input = np.array([req.input_data for req in batch])

            # Execute inference
            start_time = time.time()

            try:
                batch_output = await self.model_runner(batch_input)
                latency = time.time() - start_time

                # Update latency statistics
                self._update_latency_stats(latency, len(batch))

                # Distribute results
                for req, output in zip(batch, batch_output):
                    if not req.future.done():
                        req.future.set_result(output)

            except Exception as e:
                # Propagate error to all requests
                for req in batch:
                    if not req.future.done():
                        req.future.set_exception(e)

        finally:
            self.processing = False

            # Process next batch if requests pending
            if self.pending_requests:
                asyncio.create_task(self._process_batch())

    def _update_latency_stats(self, latency: float, batch_size: int):
        """Update latency statistics and adapt batch size"""

        self.recent_latencies.append({
            'latency': latency,
            'batch_size': batch_size,
            'timestamp': time.time()
        })

        # Keep last 100 samples
        if len(self.recent_latencies) > 100:
            self.recent_latencies.pop(0)

        # Adapt batch size based on latency
        avg_latency = np.mean([l['latency'] for l in self.recent_latencies[-20:]])

        if avg_latency > self.target_latency_ms * 0.8:
            # Too slow - reduce batch size
            self.current_batch_size = max(1, self.current_batch_size - 2)

        elif avg_latency < self.target_latency_ms * 0.5:
            # Room to increase batch size
            self.current_batch_size = min(
                self.max_batch_size,
                self.current_batch_size + 2
            )

    def get_metrics(self) -> Dict:
        """Get batching metrics"""

        if not self.recent_latencies:
            return {}

        latencies = [l['latency'] * 1000 for l in self.recent_latencies]  # Convert to ms
        batch_sizes = [l['batch_size'] for l in self.recent_latencies]

        return {
            'latency_p50': np.percentile(latencies, 50),
            'latency_p95': np.percentile(latencies, 95),
            'latency_p99': np.percentile(latencies, 99),
            'avg_batch_size': np.mean(batch_sizes),
            'current_batch_size': self.current_batch_size,
            'pending_requests': len(self.pending_requests)
        }

Model Optimization for Latency

Aggressive optimization techniques:

class LatencyOptimizedModel:
    """Optimize model specifically for low latency"""

    def __init__(self, model_path: str, target_latency_ms: float):
        self.model_path = model_path
        self.target_latency_ms = target_latency_ms

    async def optimize(self) -> Dict:
        """Apply latency-focused optimizations"""

        optimization_results = {}

        # 1. Graph fusion
        fused_model = await self._apply_graph_fusion()
        optimization_results['graph_fusion'] = await self._benchmark(fused_model)

        # 2. Operator fusion
        op_fused = await self._fuse_operators(fused_model)
        optimization_results['operator_fusion'] = await self._benchmark(op_fused)

        # 3. Memory optimization
        mem_optimized = await self._optimize_memory_layout(op_fused)
        optimization_results['memory_opt'] = await self._benchmark(mem_optimized)

        # 4. Quantization (if latency still above target)
        current_latency = optimization_results['memory_opt']['latency_ms']

        if current_latency > self.target_latency_ms:
            quantized = await self._apply_quantization(mem_optimized)
            optimization_results['quantization'] = await self._benchmark(quantized)

        # 5. Kernel tuning
        tuned_model = await self._tune_kernels(quantized if current_latency > self.target_latency_ms else mem_optimized)
        optimization_results['kernel_tuning'] = await self._benchmark(tuned_model)

        return {
            'optimized_model': tuned_model,
            'optimization_results': optimization_results,
            'final_latency_ms': optimization_results['kernel_tuning']['latency_ms']
        }

    async def _apply_graph_fusion(self) -> str:
        """Fuse operations in computation graph"""

        import onnx
        from onnxruntime.transformers import optimizer

        model = onnx.load(self.model_path)

        # Apply fusion passes
        optimized = optimizer.optimize_model(
            self.model_path,
            model_type='bert',  # or appropriate model type
            optimization_options={
                'enable_gelu_fusion': True,
                'enable_layer_norm_fusion': True,
                'enable_attention_fusion': True,
                'enable_skip_layer_norm_fusion': True,
                'enable_bias_skip_layer_norm_fusion': True,
                'enable_bias_gelu_fusion': True,
            }
        )

        fused_path = self.model_path.replace('.onnx', '_fused.onnx')
        optimized.save_model_to_file(fused_path)

        return fused_path

    async def _tune_kernels(self, model_path: str) -> str:
        """Auto-tune kernels for target hardware"""

        # Use TVM or similar for kernel tuning
        import tvm
        from tvm import relay, autotvm

        # Load model
        model = onnx.load(model_path)

        # Convert to Relay IR
        shape_dict = {'input': (1, 3, 224, 224)}
        mod, params = relay.frontend.from_onnx(model, shape_dict)

        # Auto-tuning
        target = tvm.target.Target("llvm -mcpu=core-avx2")

        # Tuning options
        tuning_option = {
            'n_trial': 2000,
            'early_stopping': 600,
            'measure_option': autotvm.measure_option(
                builder=autotvm.LocalBuilder(timeout=10),
                runner=autotvm.LocalRunner(number=20, repeat=3, timeout=4)
            )
        }

        # Tune tasks
        tasks = autotvm.task.extract_from_program(
            mod["main"],
            target=target,
            params=params
        )

        for task in tasks:
            tuner = autotvm.tuner.XGBTuner(task)
            tuner.tune(**tuning_option)

        # Compile with best configs
        with autotvm.apply_history_best(tuning_log):
            with tvm.transform.PassContext(opt_level=3):
                lib = relay.build(mod, target=target, params=params)

        tuned_path = model_path.replace('.onnx', '_tuned.so')
        lib.export_library(tuned_path)

        return tuned_path

    async def _benchmark(self, model_path: str) -> Dict:
        """Benchmark model latency"""

        # Load model
        runtime = self._load_runtime(model_path)

        # Warm-up
        dummy_input = np.random.randn(1, 3, 224, 224).astype(np.float32)

        for _ in range(10):
            await runtime.infer(dummy_input)

        # Benchmark
        latencies = []

        for _ in range(100):
            start = time.perf_counter()
            await runtime.infer(dummy_input)
            latency = (time.perf_counter() - start) * 1000  # ms

            latencies.append(latency)

        return {
            'latency_ms': np.median(latencies),
            'latency_p95': np.percentile(latencies, 95),
            'latency_p99': np.percentile(latencies, 99)
        }

Hardware Acceleration

Leveraging specialized hardware:

class AcceleratedInference:
    """Multi-backend inference with hardware acceleration"""

    def __init__(self):
        self.backends = self._initialize_backends()

    def _initialize_backends(self) -> Dict:
        """Initialize available accelerators"""

        backends = {}

        # CPU backend (always available)
        backends['cpu'] = CPUBackend()

        # GPU backend
        if self._has_cuda():
            backends['cuda'] = CUDABackend()

        # TPU backend
        if self._has_tpu():
            backends['tpu'] = TPUBackend()

        # Custom accelerators (e.g., AWS Inferentia, Google Coral)
        if self._has_inferentia():
            backends['inferentia'] = InferentiaBackend()

        return backends

    async def infer(
        self,
        model_id: str,
        input_data: np.ndarray,
        backend_hint: Optional[str] = None
    ) -> np.ndarray:
        """Infer with optimal backend selection"""

        # Select backend
        if backend_hint and backend_hint in self.backends:
            backend = self.backends[backend_hint]
        else:
            backend = await self._select_optimal_backend(
                model_id,
                input_data.shape
            )

        # Execute inference
        return await backend.infer(model_id, input_data)

    async def _select_optimal_backend(
        self,
        model_id: str,
        input_shape: tuple
    ) -> 'Backend':
        """Select optimal backend for model and input"""

        # Get backend performance characteristics
        candidates = []

        for name, backend in self.backends.items():
            # Estimate latency
            latency_estimate = await backend.estimate_latency(
                model_id,
                input_shape
            )

            candidates.append({
                'backend': backend,
                'latency': latency_estimate,
                'utilization': await backend.get_utilization()
            })

        # Sort by latency, considering current utilization
        candidates.sort(
            key=lambda x: x['latency'] * (1 + x['utilization'])
        )

        return candidates[0]['backend']


class CUDABackend:
    """CUDA-accelerated inference"""

    async def infer(
        self,
        model_id: str,
        input_data: np.ndarray
    ) -> np.ndarray:
        """Execute inference on GPU"""

        import tensorrt as trt
        import pycuda.driver as cuda

        # Get or build TensorRT engine
        engine = await self._get_engine(model_id)

        # Allocate memory
        context = engine.create_execution_context()

        # Copy input to device
        d_input = cuda.mem_alloc(input_data.nbytes)
        cuda.memcpy_htod(d_input, input_data)

        # Allocate output
        output_shape = self._get_output_shape(engine)
        output = np.empty(output_shape, dtype=np.float32)
        d_output = cuda.mem_alloc(output.nbytes)

        # Execute
        bindings = [int(d_input), int(d_output)]
        context.execute_v2(bindings=bindings)

        # Copy output to host
        cuda.memcpy_dtoh(output, d_output)

        return output

    async def _get_engine(self, model_id: str):
        """Get or build TensorRT engine"""

        cache_path = f"engines/{model_id}.trt"

        if os.path.exists(cache_path):
            # Load cached engine
            with open(cache_path, 'rb') as f:
                runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING))
                return runtime.deserialize_cuda_engine(f.read())

        # Build new engine
        engine = await self._build_engine(model_id)

        # Cache for future use
        with open(cache_path, 'wb') as f:
            f.write(engine.serialize())

        return engine

Conclusion

Achieving real-time AI inference requires optimization at every layer: model architecture, serving infrastructure, and hardware utilization. Dynamic batching balances throughput and latency, while aggressive model optimization and hardware acceleration push latency to sub-millisecond levels.

The key is measuring everything and optimizing the bottlenecks. As models and hardware evolve, these techniques enable increasingly sophisticated AI in real-time applications.