In high-performance systems, the data path—the code that processes every request or packet—is the most critical component. A single microsecond of overhead multiplied by billions of requests becomes significant. This post explores techniques for optimizing data paths to achieve consistent microsecond-level latency.
Understanding the Latency Budget
At scale, every microsecond counts:
Network latency: ~50-100μs (same datacenter)
SSD read: ~100-200μs
Memory access: ~100ns
L1 cache: ~1ns
Context switch: ~1-5μs
System call: ~500ns-1μs
Mutex lock/unlock: ~25ns (uncontended)
For a 100μs total latency budget, we can’t afford inefficiencies.
Zero-Copy Data Processing
Avoid copying data whenever possible:
use bytes::{Bytes, BytesMut, Buf};
use std::io::IoSlice;
// ❌ Bad: Multiple copies
fn process_data_copied(data: Vec<u8>) -> Vec<u8> {
// Copy 1: into string
let text = String::from_utf8(data).unwrap();
// Copy 2: into new buffer
let processed = text.to_uppercase();
// Copy 3: back to bytes
processed.into_bytes()
}
// ✅ Good: Zero-copy with slices
fn process_data_zerocopy(data: &[u8]) -> Bytes {
// Work directly with byte slices
// No allocations or copies until absolutely necessary
if is_already_processed(data) {
// Return reference-counted slice
return Bytes::from(data.to_vec());
}
// Only allocate when transformation needed
let mut output = BytesMut::with_capacity(data.len());
process_inplace(&mut output, data);
output.freeze()
}
// Zero-copy scatter/gather I/O
async fn send_multipart(socket: &TcpStream, parts: &[&[u8]]) -> Result<()> {
// No buffer allocation - send directly from multiple sources
let io_slices: Vec<IoSlice> = parts
.iter()
.map(|part| IoSlice::new(part))
.collect();
socket.write_vectored(&io_slices).await?;
Ok(())
}
Lock-Free Data Structures
Avoid locks in the hot path:
use std::sync::atomic::{AtomicU64, AtomicPtr, Ordering};
use std::ptr;
// Lock-free statistics counters
pub struct LockFreeStats {
requests: AtomicU64,
bytes_processed: AtomicU64,
errors: AtomicU64,
}
impl LockFreeStats {
pub fn record_request(&self, bytes: u64, is_error: bool) {
// All atomic operations, no locks
self.requests.fetch_add(1, Ordering::Relaxed);
self.bytes_processed.fetch_add(bytes, Ordering::Relaxed);
if is_error {
self.errors.fetch_add(1, Ordering::Relaxed);
}
}
pub fn get_snapshot(&self) -> StatsSnapshot {
StatsSnapshot {
requests: self.requests.load(Ordering::Relaxed),
bytes: self.bytes_processed.load(Ordering::Relaxed),
errors: self.errors.load(Ordering::Relaxed),
}
}
}
// Lock-free queue for inter-thread communication
use crossbeam::queue::ArrayQueue;
pub struct DataPipeline {
input_queue: Arc<ArrayQueue<Request>>,
output_queue: Arc<ArrayQueue<Response>>,
}
impl DataPipeline {
pub fn process_async(&self, request: Request) -> Result<(), Request> {
// Non-blocking enqueue
self.input_queue.push(request)
}
pub fn worker_loop(&self) {
loop {
// Non-blocking dequeue
if let Some(request) = self.input_queue.pop() {
let response = self.process(request);
// Non-blocking send to output
while self.output_queue.push(response.clone()).is_err() {
// Queue full, yield and retry
std::thread::yield_now();
}
} else {
// No work, yield CPU
std::thread::yield_now();
}
}
}
}
CPU Cache Optimization
Organize data for cache efficiency:
// ❌ Bad: Cache-unfriendly layout
#[repr(C)]
struct BadRequest {
id: u64, // 8 bytes
priority: bool, // 1 byte + 7 padding
timestamp: u64, // 8 bytes
processed: bool, // 1 byte + 7 padding
data: [u8; 32], // 32 bytes
// Total: 64 bytes, but poor cache utilization
}
// ✅ Good: Cache-optimized layout
#[repr(C)]
struct GoodRequest {
// Hot fields accessed together (fits in single cache line)
id: u64, // 8 bytes
timestamp: u64, // 8 bytes
data: [u8; 32], // 32 bytes
// Cold fields (accessed rarely)
priority: bool, // 1 byte
processed: bool, // 1 byte
_padding: [u8; 14], // Explicit padding to next cache line
// Total: 64 bytes, aligned to cache line
}
// Structure-of-Arrays for better cache utilization
struct RequestBatch {
ids: Vec<u64>,
timestamps: Vec<u64>,
data: Vec<[u8; 32]>,
count: usize,
}
impl RequestBatch {
fn process_batch(&mut self) {
// Process in tight loops - better cache locality
for i in 0..self.count {
// All ids accessed sequentially - cache-friendly
if self.ids[i] % 2 == 0 {
// Process even IDs
self.process_id(i);
}
}
// Then process data - separate cache lines
for i in 0..self.count {
self.process_data(i);
}
}
}
// Prefetching for predictable access patterns
#[inline]
fn prefetch_next<T>(ptr: *const T) {
unsafe {
#[cfg(target_arch = "x86_64")]
std::arch::x86_64::_mm_prefetch(
ptr as *const i8,
std::arch::x86_64::_MM_HINT_T0
);
}
}
fn process_array_with_prefetch(data: &[Request]) {
const PREFETCH_DISTANCE: usize = 4;
for i in 0..data.len() {
// Prefetch ahead
if i + PREFETCH_DISTANCE < data.len() {
prefetch_next(&data[i + PREFETCH_DISTANCE]);
}
// Process current
process_request(&data[i]);
}
}
SIMD Optimization
Use CPU vector instructions for parallel processing:
use std::arch::x86_64::*;
// Process 32 bytes in parallel using AVX2
#[target_feature(enable = "avx2")]
unsafe fn process_simd(input: &[u8], output: &mut [u8]) {
assert_eq!(input.len(), output.len());
assert_eq!(input.len() % 32, 0);
let threshold = _mm256_set1_epi8(128);
for i in (0..input.len()).step_by(32) {
// Load 32 bytes
let data = _mm256_loadu_si256(input.as_ptr().add(i) as *const __m256i);
// Compare with threshold (32 comparisons in parallel)
let mask = _mm256_cmpgt_epi8(data, threshold);
// Conditional processing based on mask
let processed = _mm256_and_si256(data, mask);
// Store 32 bytes
_mm256_storeu_si256(output.as_mut_ptr().add(i) as *mut __m256i, processed);
}
}
// Checksum calculation with SIMD
#[target_feature(enable = "avx2")]
unsafe fn checksum_simd(data: &[u8]) -> u64 {
let mut sum = _mm256_setzero_si256();
for chunk in data.chunks_exact(32) {
let values = _mm256_loadu_si256(chunk.as_ptr() as *const __m256i);
sum = _mm256_add_epi64(sum, values);
}
// Horizontal sum
let mut result = 0u64;
let sum_array: [u64; 4] = std::mem::transmute(sum);
for val in sum_array {
result = result.wrapping_add(val);
}
result
}
Memory Pool and Object Reuse
Avoid allocations in the hot path:
use std::sync::Mutex;
struct MemoryPool<T> {
pool: Mutex<Vec<Box<T>>>,
factory: fn() -> T,
max_size: usize,
}
impl<T> MemoryPool<T> {
fn new(factory: fn() -> T, initial_size: usize, max_size: usize) -> Self {
let pool = (0..initial_size)
.map(|_| Box::new(factory()))
.collect();
MemoryPool {
pool: Mutex::new(pool),
factory,
max_size,
}
}
fn acquire(&self) -> PooledObject<T> {
let obj = {
let mut pool = self.pool.lock().unwrap();
pool.pop().unwrap_or_else(|| Box::new((self.factory)()))
};
PooledObject {
obj: Some(obj),
pool: self,
}
}
fn release(&self, mut obj: Box<T>) {
// Reset object state
self.reset(&mut obj);
let mut pool = self.pool.lock().unwrap();
if pool.len() < self.max_size {
pool.push(obj);
}
// Otherwise drop
}
fn reset(&self, _obj: &mut T) {
// Application-specific reset logic
}
}
struct PooledObject<'a, T> {
obj: Option<Box<T>>,
pool: &'a MemoryPool<T>,
}
impl<'a, T> Drop for PooledObject<'a, T> {
fn drop(&mut self) {
if let Some(obj) = self.obj.take() {
self.pool.release(obj);
}
}
}
impl<'a, T> std::ops::Deref for PooledObject<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.obj.as_ref().unwrap()
}
}
impl<'a, T> std::ops::DerefMut for PooledObject<'a, T> {
fn deref_mut(&mut self) -> &mut T {
self.obj.as_mut().unwrap()
}
}
// Usage
struct Request {
buffer: Vec<u8>,
// ... other fields
}
fn request_factory() -> Request {
Request {
buffer: Vec::with_capacity(4096),
}
}
static REQUEST_POOL: Lazy<MemoryPool<Request>> = Lazy::new(|| {
MemoryPool::new(request_factory, 1000, 10000)
});
fn handle_request() {
let mut request = REQUEST_POOL.acquire();
// Use request...
// Automatically returned to pool on drop
}
Efficient Error Handling
Errors in the hot path must be fast:
// ❌ Bad: String allocations on error path
fn parse_header(data: &[u8]) -> Result<Header, String> {
if data.len() < 4 {
return Err(format!("Header too short: {}", data.len()));
}
// ...
}
// ✅ Good: Zero-allocation errors
#[derive(Debug, Clone, Copy)]
enum ParseError {
TooShort { actual: usize, expected: usize },
InvalidMagic { actual: u32 },
InvalidChecksum,
}
fn parse_header_fast(data: &[u8]) -> Result<Header, ParseError> {
if data.len() < 4 {
return Err(ParseError::TooShort {
actual: data.len(),
expected: 4,
});
}
// ...
}
// Early return pattern for common case
#[inline]
fn process_with_fast_path(data: &[u8]) -> Result<Output, Error> {
// Fast path: common case, no branches
if data.len() >= 4 && data[0] == MAGIC_BYTE {
return Ok(fast_process(data));
}
// Slow path: validation and error handling
slow_process(data)
}
Benchmarking and Profiling
Measure before and after optimization:
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn benchmark_data_processing(c: &mut Criterion) {
let mut group = c.benchmark_group("data_processing");
for size in [1024, 4096, 16384, 65536] {
let data = vec![0u8; size];
group.bench_with_input(
BenchmarkId::new("original", size),
&data,
|b, data| {
b.iter(|| process_data_original(black_box(data)))
},
);
group.bench_with_input(
BenchmarkId::new("optimized", size),
&data,
|b, data| {
b.iter(|| process_data_optimized(black_box(data)))
},
);
}
group.finish();
}
criterion_group!(benches, benchmark_data_processing);
criterion_main!(benches);
Performance profiling with perf:
# CPU profiling
perf record -g ./my_service
perf report
# Cache misses
perf stat -e cache-misses,cache-references ./my_service
# Memory bandwidth
perf stat -e cpu/event=0xb7,umask=0x1,name=OCR.DEMAND_DATA_RD.L3_MISS.REMOTE_HIT_FORWARD/ ./my_service
# Flamegraph generation
perf record -F 99 -g ./my_service
perf script | ./FlameGraph/stackcollapse-perf.pl | ./FlameGraph/flamegraph.pl > flamegraph.svg
Real-World Results
After applying these optimizations to our data path:
Before Optimization:
- P50 latency: 250μs
- P99 latency: 2.1ms
- Throughput: 50K req/sec per core
- CPU usage: 80%
After Optimization:
- P50 latency: 45μs (5.5x improvement)
- P99 latency: 180μs (11.6x improvement)
- Throughput: 180K req/sec per core (3.6x improvement)
- CPU usage: 60%
Key wins:
- Zero-copy processing: -80μs
- Lock-free structures: -50μs
- Memory pooling: -40μs
- Cache optimization: -35μs
Continuous Performance Testing
#[cfg(test)]
mod perf_tests {
use super::*;
use std::time::Instant;
#[test]
fn test_latency_regression() {
let data = generate_test_data(1024);
let start = Instant::now();
let iterations = 100_000;
for _ in 0..iterations {
let _ = process_data(&data);
}
let elapsed = start.elapsed();
let avg_latency = elapsed / iterations;
// Assert latency SLO
assert!(
avg_latency.as_micros() < 50,
"Average latency {}μs exceeds SLO of 50μs",
avg_latency.as_micros()
);
}
#[test]
fn test_throughput_regression() {
let data = generate_test_data(1024);
let duration = Duration::from_secs(1);
let start = Instant::now();
let mut count = 0;
while start.elapsed() < duration {
let _ = process_data(&data);
count += 1;
}
let throughput = count;
// Assert throughput SLO
assert!(
throughput > 150_000,
"Throughput {} req/sec below SLO of 150K req/sec",
throughput
);
}
}
Optimization Checklist
- Profile first - Identify actual bottlenecks, don’t guess
- Measure everything - Before/after benchmarks for every change
- Optimize the hot path - Focus on code that runs millions of times
- Avoid allocations - Memory pool, zero-copy, stack allocation
- Minimize locks - Lock-free data structures, atomic operations
- Cache-friendly layout - Struct packing, array-of-structs vs struct-of-arrays
- SIMD when applicable - Parallel processing for data transformations
- Efficient error handling - Zero-allocation errors
- Continuous testing - Performance tests in CI to catch regressions
Conclusion
Achieving microsecond latency requires attention to every detail:
- Understand your latency budget - Know where time is spent
- Eliminate waste - Every allocation, lock, and copy adds latency
- Use the right tools - Rust, SIMD, lock-free structures
- Measure constantly - Benchmarks, profiling, production monitoring
- Never stop optimizing - Performance is a journey, not a destination
The techniques here enabled us to reduce latency by 5x while increasing throughput by 3.6x. The key is systematic measurement, optimization, and validation of every change.