Real-time AI applications demand single-digit millisecond latency. This post explores techniques for achieving ultra-low latency inference at scale, from model optimization to serving infrastructure.
Latency Budget Breakdown
Understanding where time goes in an AI request:
Total Latency (10ms target):
├── Network (2ms)
│ ├── Client → Load Balancer: 0.5ms
│ ├── Load Balancer → Service: 0.5ms
│ └── Response return: 1ms
├── Preprocessing (1ms)
│ ├── Input validation: 0.2ms
│ ├── Tokenization/normalization: 0.5ms
│ └── Batching: 0.3ms
├── Inference (5ms)
│ ├── Model execution: 4.5ms
│ └── Post-processing: 0.5ms
└── Overhead (2ms)
├── Serialization: 0.5ms
├── Logging/metrics: 0.5ms
└── Buffer: 1ms
Dynamic Batching
Maximize throughput while meeting latency SLAs:
import asyncio
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
import time
import numpy as np
@dataclass
class InferenceRequest:
id: str
input_data: Any
future: asyncio.Future
arrival_time: float
deadline: float
class DynamicBatcher:
"""Dynamic batching with latency awareness"""
def __init__(
self,
model_runner: Callable,
max_batch_size: int = 32,
max_wait_ms: float = 2.0,
target_latency_ms: float = 10.0
):
self.model_runner = model_runner
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms / 1000 # Convert to seconds
self.target_latency_ms = target_latency_ms / 1000
self.pending_requests: List[InferenceRequest] = []
self.processing = False
# Adaptive parameters
self.recent_latencies = []
self.current_batch_size = max_batch_size
async def infer(
self,
input_data: Any,
deadline_ms: Optional[float] = None
) -> Any:
"""Submit inference request with deadline"""
request_id = str(uuid.uuid4())
arrival_time = time.time()
# Calculate deadline
if deadline_ms:
deadline = arrival_time + (deadline_ms / 1000)
else:
deadline = arrival_time + self.target_latency_ms
# Create future for result
future = asyncio.Future()
# Create request
request = InferenceRequest(
id=request_id,
input_data=input_data,
future=future,
arrival_time=arrival_time,
deadline=deadline
)
# Add to pending queue
self.pending_requests.append(request)
# Trigger batch processing if needed
if self._should_process_batch():
asyncio.create_task(self._process_batch())
# Wait for result
try:
result = await future
return result
except asyncio.TimeoutError:
raise TimeoutError("Inference deadline exceeded")
def _should_process_batch(self) -> bool:
"""Decide if batch should be processed now"""
if not self.pending_requests:
return False
# Process if batch is full
if len(self.pending_requests) >= self.current_batch_size:
return True
# Process if oldest request approaching deadline
oldest = self.pending_requests[0]
time_remaining = oldest.deadline - time.time()
# Leave enough time for inference
inference_budget = 0.005 # 5ms for inference
if time_remaining <= inference_budget:
return True
# Process if waited long enough
wait_time = time.time() - oldest.arrival_time
if wait_time >= self.max_wait_ms:
return True
return False
async def _process_batch(self):
"""Process accumulated batch"""
if self.processing or not self.pending_requests:
return
self.processing = True
try:
# Extract batch
batch = self.pending_requests[:self.current_batch_size]
self.pending_requests = self.pending_requests[self.current_batch_size:]
# Prepare batch input
batch_input = np.array([req.input_data for req in batch])
# Execute inference
start_time = time.time()
try:
batch_output = await self.model_runner(batch_input)
latency = time.time() - start_time
# Update latency statistics
self._update_latency_stats(latency, len(batch))
# Distribute results
for req, output in zip(batch, batch_output):
if not req.future.done():
req.future.set_result(output)
except Exception as e:
# Propagate error to all requests
for req in batch:
if not req.future.done():
req.future.set_exception(e)
finally:
self.processing = False
# Process next batch if requests pending
if self.pending_requests:
asyncio.create_task(self._process_batch())
def _update_latency_stats(self, latency: float, batch_size: int):
"""Update latency statistics and adapt batch size"""
self.recent_latencies.append({
'latency': latency,
'batch_size': batch_size,
'timestamp': time.time()
})
# Keep last 100 samples
if len(self.recent_latencies) > 100:
self.recent_latencies.pop(0)
# Adapt batch size based on latency
avg_latency = np.mean([l['latency'] for l in self.recent_latencies[-20:]])
if avg_latency > self.target_latency_ms * 0.8:
# Too slow - reduce batch size
self.current_batch_size = max(1, self.current_batch_size - 2)
elif avg_latency < self.target_latency_ms * 0.5:
# Room to increase batch size
self.current_batch_size = min(
self.max_batch_size,
self.current_batch_size + 2
)
def get_metrics(self) -> Dict:
"""Get batching metrics"""
if not self.recent_latencies:
return {}
latencies = [l['latency'] * 1000 for l in self.recent_latencies] # Convert to ms
batch_sizes = [l['batch_size'] for l in self.recent_latencies]
return {
'latency_p50': np.percentile(latencies, 50),
'latency_p95': np.percentile(latencies, 95),
'latency_p99': np.percentile(latencies, 99),
'avg_batch_size': np.mean(batch_sizes),
'current_batch_size': self.current_batch_size,
'pending_requests': len(self.pending_requests)
}
Model Optimization for Latency
Aggressive optimization techniques:
class LatencyOptimizedModel:
"""Optimize model specifically for low latency"""
def __init__(self, model_path: str, target_latency_ms: float):
self.model_path = model_path
self.target_latency_ms = target_latency_ms
async def optimize(self) -> Dict:
"""Apply latency-focused optimizations"""
optimization_results = {}
# 1. Graph fusion
fused_model = await self._apply_graph_fusion()
optimization_results['graph_fusion'] = await self._benchmark(fused_model)
# 2. Operator fusion
op_fused = await self._fuse_operators(fused_model)
optimization_results['operator_fusion'] = await self._benchmark(op_fused)
# 3. Memory optimization
mem_optimized = await self._optimize_memory_layout(op_fused)
optimization_results['memory_opt'] = await self._benchmark(mem_optimized)
# 4. Quantization (if latency still above target)
current_latency = optimization_results['memory_opt']['latency_ms']
if current_latency > self.target_latency_ms:
quantized = await self._apply_quantization(mem_optimized)
optimization_results['quantization'] = await self._benchmark(quantized)
# 5. Kernel tuning
tuned_model = await self._tune_kernels(quantized if current_latency > self.target_latency_ms else mem_optimized)
optimization_results['kernel_tuning'] = await self._benchmark(tuned_model)
return {
'optimized_model': tuned_model,
'optimization_results': optimization_results,
'final_latency_ms': optimization_results['kernel_tuning']['latency_ms']
}
async def _apply_graph_fusion(self) -> str:
"""Fuse operations in computation graph"""
import onnx
from onnxruntime.transformers import optimizer
model = onnx.load(self.model_path)
# Apply fusion passes
optimized = optimizer.optimize_model(
self.model_path,
model_type='bert', # or appropriate model type
optimization_options={
'enable_gelu_fusion': True,
'enable_layer_norm_fusion': True,
'enable_attention_fusion': True,
'enable_skip_layer_norm_fusion': True,
'enable_bias_skip_layer_norm_fusion': True,
'enable_bias_gelu_fusion': True,
}
)
fused_path = self.model_path.replace('.onnx', '_fused.onnx')
optimized.save_model_to_file(fused_path)
return fused_path
async def _tune_kernels(self, model_path: str) -> str:
"""Auto-tune kernels for target hardware"""
# Use TVM or similar for kernel tuning
import tvm
from tvm import relay, autotvm
# Load model
model = onnx.load(model_path)
# Convert to Relay IR
shape_dict = {'input': (1, 3, 224, 224)}
mod, params = relay.frontend.from_onnx(model, shape_dict)
# Auto-tuning
target = tvm.target.Target("llvm -mcpu=core-avx2")
# Tuning options
tuning_option = {
'n_trial': 2000,
'early_stopping': 600,
'measure_option': autotvm.measure_option(
builder=autotvm.LocalBuilder(timeout=10),
runner=autotvm.LocalRunner(number=20, repeat=3, timeout=4)
)
}
# Tune tasks
tasks = autotvm.task.extract_from_program(
mod["main"],
target=target,
params=params
)
for task in tasks:
tuner = autotvm.tuner.XGBTuner(task)
tuner.tune(**tuning_option)
# Compile with best configs
with autotvm.apply_history_best(tuning_log):
with tvm.transform.PassContext(opt_level=3):
lib = relay.build(mod, target=target, params=params)
tuned_path = model_path.replace('.onnx', '_tuned.so')
lib.export_library(tuned_path)
return tuned_path
async def _benchmark(self, model_path: str) -> Dict:
"""Benchmark model latency"""
# Load model
runtime = self._load_runtime(model_path)
# Warm-up
dummy_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
for _ in range(10):
await runtime.infer(dummy_input)
# Benchmark
latencies = []
for _ in range(100):
start = time.perf_counter()
await runtime.infer(dummy_input)
latency = (time.perf_counter() - start) * 1000 # ms
latencies.append(latency)
return {
'latency_ms': np.median(latencies),
'latency_p95': np.percentile(latencies, 95),
'latency_p99': np.percentile(latencies, 99)
}
Hardware Acceleration
Leveraging specialized hardware:
class AcceleratedInference:
"""Multi-backend inference with hardware acceleration"""
def __init__(self):
self.backends = self._initialize_backends()
def _initialize_backends(self) -> Dict:
"""Initialize available accelerators"""
backends = {}
# CPU backend (always available)
backends['cpu'] = CPUBackend()
# GPU backend
if self._has_cuda():
backends['cuda'] = CUDABackend()
# TPU backend
if self._has_tpu():
backends['tpu'] = TPUBackend()
# Custom accelerators (e.g., AWS Inferentia, Google Coral)
if self._has_inferentia():
backends['inferentia'] = InferentiaBackend()
return backends
async def infer(
self,
model_id: str,
input_data: np.ndarray,
backend_hint: Optional[str] = None
) -> np.ndarray:
"""Infer with optimal backend selection"""
# Select backend
if backend_hint and backend_hint in self.backends:
backend = self.backends[backend_hint]
else:
backend = await self._select_optimal_backend(
model_id,
input_data.shape
)
# Execute inference
return await backend.infer(model_id, input_data)
async def _select_optimal_backend(
self,
model_id: str,
input_shape: tuple
) -> 'Backend':
"""Select optimal backend for model and input"""
# Get backend performance characteristics
candidates = []
for name, backend in self.backends.items():
# Estimate latency
latency_estimate = await backend.estimate_latency(
model_id,
input_shape
)
candidates.append({
'backend': backend,
'latency': latency_estimate,
'utilization': await backend.get_utilization()
})
# Sort by latency, considering current utilization
candidates.sort(
key=lambda x: x['latency'] * (1 + x['utilization'])
)
return candidates[0]['backend']
class CUDABackend:
"""CUDA-accelerated inference"""
async def infer(
self,
model_id: str,
input_data: np.ndarray
) -> np.ndarray:
"""Execute inference on GPU"""
import tensorrt as trt
import pycuda.driver as cuda
# Get or build TensorRT engine
engine = await self._get_engine(model_id)
# Allocate memory
context = engine.create_execution_context()
# Copy input to device
d_input = cuda.mem_alloc(input_data.nbytes)
cuda.memcpy_htod(d_input, input_data)
# Allocate output
output_shape = self._get_output_shape(engine)
output = np.empty(output_shape, dtype=np.float32)
d_output = cuda.mem_alloc(output.nbytes)
# Execute
bindings = [int(d_input), int(d_output)]
context.execute_v2(bindings=bindings)
# Copy output to host
cuda.memcpy_dtoh(output, d_output)
return output
async def _get_engine(self, model_id: str):
"""Get or build TensorRT engine"""
cache_path = f"engines/{model_id}.trt"
if os.path.exists(cache_path):
# Load cached engine
with open(cache_path, 'rb') as f:
runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING))
return runtime.deserialize_cuda_engine(f.read())
# Build new engine
engine = await self._build_engine(model_id)
# Cache for future use
with open(cache_path, 'wb') as f:
f.write(engine.serialize())
return engine
Conclusion
Achieving real-time AI inference requires optimization at every layer: model architecture, serving infrastructure, and hardware utilization. Dynamic batching balances throughput and latency, while aggressive model optimization and hardware acceleration push latency to sub-millisecond levels.
The key is measuring everything and optimizing the bottlenecks. As models and hardware evolve, these techniques enable increasingly sophisticated AI in real-time applications.