After another year of operating event streaming systems processing over 100 million events daily, I’ve refined our practices and learned new lessons. This post shares advanced patterns that work at scale—beyond the basics.
Schema Evolution at Scale
Schema changes are inevitable. Handle them gracefully:
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class EventV1:
"""Original schema"""
user_id: str
action: str
timestamp: int
@dataclass
class EventV2:
"""Evolved schema - added optional fields"""
user_id: str
action: str
timestamp: int
metadata: Optional[dict] = None # New field
source: str = "web" # New field with default
class CompatibleEventSerializer:
"""
Serialize events with version information
"""
def serialize(self, event: EventV2) -> bytes:
"""Add schema version to every event"""
envelope = {
'__schema_version': 2,
'__event_type': 'user_action',
'payload': {
'user_id': event.user_id,
'action': event.action,
'timestamp': event.timestamp,
'metadata': event.metadata,
'source': event.source
}
}
return json.dumps(envelope).encode('utf-8')
def deserialize(self, data: bytes):
"""Handle multiple schema versions"""
envelope = json.loads(data.decode('utf-8'))
version = envelope.get('__schema_version', 1)
payload = envelope['payload']
if version == 1:
# Migrate v1 to v2 on read
return EventV2(
user_id=payload['user_id'],
action=payload['action'],
timestamp=payload['timestamp'],
metadata=None, # Not present in v1
source='web' # Default for v1 events
)
elif version == 2:
return EventV2(**payload)
else:
raise ValueError(f"Unknown schema version: {version}")
Dead Letter Queue Patterns
DLQs are essential but often implemented poorly:
public class SmartDLQHandler {
private final KafkaProducer<String, byte[]> dlqProducer;
private final KafkaProducer<String, byte[]> retryProducer;
private final MetricsCollector metrics;
public void handleFailedEvent(ConsumerRecord<String, byte[]> record,
Exception error,
int attemptCount) {
// Classify the error
ErrorType errorType = classifyError(error);
// Add metadata about the failure
Map<String, String> headers = new HashMap<>();
headers.put("original-topic", record.topic());
headers.put("original-partition", String.valueOf(record.partition()));
headers.put("original-offset", String.valueOf(record.offset()));
headers.put("error-type", errorType.name());
headers.put("error-message", error.getMessage());
headers.put("attempt-count", String.valueOf(attemptCount));
headers.put("failed-at", Instant.now().toString());
headers.put("stack-trace", getStackTrace(error));
if (errorType.isRetriable() && attemptCount < 3) {
// Send to retry topic with exponential backoff
long delayMs = calculateBackoff(attemptCount);
headers.put("retry-after", String.valueOf(
System.currentTimeMillis() + delayMs
));
retryProducer.send(new ProducerRecord<>(
"retry-topic",
null,
System.currentTimeMillis() + delayMs,
record.key(),
record.value(),
createHeaders(headers)
));
metrics.counter("events.retried",
"error_type", errorType.name()).increment();
} else {
// Send to DLQ for manual review
dlqProducer.send(new ProducerRecord<>(
"dlq-topic",
record.key(),
record.value(),
createHeaders(headers)
));
metrics.counter("events.dlq",
"error_type", errorType.name()).increment();
// Alert on high DLQ rate
if (getDLQRate() > 0.01) { // 1%
alerting.sendAlert("High DLQ rate: " + getDLQRate());
}
}
}
private ErrorType classifyError(Exception error) {
if (error instanceof JsonParseException) {
return ErrorType.MALFORMED_DATA; // Not retriable
} else if (error instanceof TimeoutException) {
return ErrorType.TIMEOUT; // Retriable
} else if (error instanceof DatabaseException) {
return ErrorType.DATABASE_ERROR; // Retriable
} else {
return ErrorType.UNKNOWN; // Retriable with caution
}
}
private long calculateBackoff(int attemptCount) {
// Exponential backoff: 1s, 2s, 4s
return (long) Math.pow(2, attemptCount) * 1000;
}
}
Exactly-Once Semantics in Practice
True exactly-once is hard. Here’s what works:
class ExactlyOnceProcessor:
"""
Idempotent event processing with deduplication
"""
def __init__(self):
self.kafka_consumer = KafkaConsumer(
enable_auto_commit=False # Manual commit control
)
self.db = Database()
self.processed_events = BloomFilter() # Quick dedup check
def process_batch(self, events):
"""
Process batch with exactly-once guarantees
"""
# Start transaction
with self.db.transaction() as tx:
processed_count = 0
for event in events:
# Fast path: check bloom filter
if event.id in self.processed_events:
# Likely already processed, verify in DB
if self.db.is_processed(event.id):
continue # Skip duplicate
try:
# Process event
result = self.process_event(event)
# Store result and mark as processed atomically
tx.execute(
"INSERT INTO results (event_id, data) VALUES (?, ?)",
event.id, result
)
tx.execute(
"INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)",
event.id, datetime.now()
)
# Update bloom filter
self.processed_events.add(event.id)
processed_count += 1
except IntegrityError:
# Event already processed by another consumer
# (race condition between bloom filter check and DB insert)
continue
# Commit Kafka offsets within same transaction
tx.execute(
"INSERT INTO kafka_offsets (topic, partition, offset) VALUES (?, ?, ?) " +
"ON CONFLICT (topic, partition) DO UPDATE SET offset = ?",
event.topic, event.partition, event.offset, event.offset
)
# Commit transaction (DB + Kafka offsets together)
tx.commit()
# Commit to Kafka
self.kafka_consumer.commit()
return processed_count
Stream Partitioning Strategies
Choosing the right partitioning key is critical:
type PartitioningStrategy interface {
GetPartition(event Event, numPartitions int) int
}
// Strategy 1: By Entity ID (preserves ordering per entity)
type EntityIDPartitioner struct{}
func (p *EntityIDPartitioner) GetPartition(event Event, numPartitions int) int {
// All events for same entity go to same partition
hash := fnv.New32a()
hash.Write([]byte(event.EntityID))
return int(hash.Sum32()) % numPartitions
}
// Strategy 2: By Time Window (enables parallel processing)
type TimeWindowPartitioner struct {
WindowSize time.Duration
}
func (p *TimeWindowPartitioner) GetPartition(event Event, numPartitions int) int {
// Events in same time window go to same partition
window := event.Timestamp.Truncate(p.WindowSize).Unix()
return int(window) % numPartitions
}
// Strategy 3: By Hash of Multiple Fields (distribute hot keys)
type CompositeKeyPartitioner struct {
Fields []string
}
func (p *CompositeKeyPartitioner) GetPartition(event Event, numPartitions int) int {
// Combine multiple fields to distribute load
hash := fnv.New32a()
for _, field := range p.Fields {
hash.Write([]byte(event.GetField(field)))
}
return int(hash.Sum32()) % numPartitions
}
// Strategy 4: Sticky Partitioning (maximize batching)
type StickyPartitioner struct {
currentPartition int
messageCount int
rotateAfter int
}
func (p *StickyPartitioner) GetPartition(event Event, numPartitions int) int {
p.messageCount++
if p.messageCount >= p.rotateAfter {
p.currentPartition = (p.currentPartition + 1) % numPartitions
p.messageCount = 0
}
return p.currentPartition
}
Consumer Lag Management
Lag monitoring and auto-scaling:
class LagMonitor:
"""
Monitor and respond to consumer lag
"""
def __init__(self):
self.kafka_admin = KafkaAdminClient()
self.metrics = MetricsCollector()
self.autoscaler = ConsumerAutoscaler()
def monitor_lag(self, group_id: str, topic: str):
"""
Monitor lag and take action
"""
# Get current lag for each partition
offsets = self.kafka_admin.list_consumer_group_offsets(group_id)
end_offsets = self.kafka_admin.list_offsets(topic)
total_lag = 0
max_lag = 0
partition_lags = {}
for partition, offset_info in offsets[topic].items():
current_offset = offset_info.offset
end_offset = end_offsets[topic][partition]
lag = end_offset - current_offset
partition_lags[partition] = lag
total_lag += lag
max_lag = max(max_lag, lag)
# Record metrics
self.metrics.gauge('consumer.lag.total', total_lag,
tags={'group': group_id, 'topic': topic})
self.metrics.gauge('consumer.lag.max', max_lag,
tags={'group': group_id, 'topic': topic})
# Check for hot partitions
avg_lag = total_lag / len(partition_lags)
for partition, lag in partition_lags.items():
if lag > avg_lag * 2: # 2x average
self.alert_hot_partition(group_id, topic, partition, lag)
# Auto-scale based on lag
if total_lag > 100000: # High lag threshold
num_consumers = self.get_consumer_count(group_id)
num_partitions = len(partition_lags)
if num_consumers < num_partitions:
# Can scale up
self.autoscaler.scale_up(group_id, min(num_partitions, num_consumers + 2))
self.alert('Scaling up consumers due to high lag')
elif total_lag < 1000 and self.get_consumer_count(group_id) > 1:
# Low lag, can scale down
self.autoscaler.scale_down(group_id, max(1, num_consumers - 1))
def calculate_time_to_catch_up(self, group_id: str, topic: str) -> float:
"""
Estimate time to catch up based on current processing rate
"""
lag = self.get_total_lag(group_id, topic)
processing_rate = self.get_processing_rate(group_id, topic)
if processing_rate == 0:
return float('inf')
return lag / processing_rate # seconds
Multi-Datacenter Replication
Replicate streams across datacenters:
public class MultiDCReplication {
private final Map<String, KafkaProducer> producersByDC;
private final ReplicationConfig config;
public void replicateEvent(Event event, String sourceDC) {
// Don't replicate back to source
Set<String> targetDCs = getTargetDCs(sourceDC);
// Replicate to each DC in parallel
List<CompletableFuture<RecordMetadata>> futures = targetDCs.stream()
.map(dc -> replicateToDatacenter(event, dc))
.collect(Collectors.toList());
// Wait for all replications (or timeout)
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(config.getReplicationTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// Some DCs didn't respond in time
// Log and continue - replication will catch up
logger.warn("Replication timeout for event {}", event.getId());
}
}
private CompletableFuture<RecordMetadata> replicateToDatacenter(
Event event, String dc) {
KafkaProducer producer = producersByDC.get(dc);
// Add metadata about replication
ProducerRecord<String, Event> record = new ProducerRecord<>(
config.getTopicName(),
event.getId(),
event
);
record.headers().add("source-dc", sourceDC.getBytes());
record.headers().add("replicated-at",
String.valueOf(System.currentTimeMillis()).getBytes());
return CompletableFuture.supplyAsync(() -> {
try {
return producer.send(record).get();
} catch (Exception e) {
throw new CompletionException(e);
}
});
}
// Detect replication loops
private boolean isReplicationLoop(Event event) {
String sourceDC = new String(
event.headers().lastHeader("source-dc").value()
);
// Count how many times this event has been replicated
int replicationCount = 0;
for (Header header : event.headers()) {
if (header.key().equals("replicated-at")) {
replicationCount++;
}
}
if (replicationCount > config.getMaxReplicationHops()) {
logger.error("Replication loop detected for event {}", event.getId());
return true;
}
return false;
}
}
Performance Tuning
Real-world performance optimization:
# Producer optimization
producer_config = {
# Batching
'linger.ms': 100, # Wait up to 100ms to batch
'batch.size': 32768, # 32KB batches
# Compression
'compression.type': 'lz4', # Fast compression
# Memory
'buffer.memory': 67108864, # 64MB buffer
# Acks
'acks': 'all', # Wait for all replicas (durability)
'max.in.flight.requests.per.connection': 5,
# Idempotence
'enable.idempotence': True,
# Retries
'retries': Integer.MAX_VALUE,
'delivery.timeout.ms': 120000, # 2 minutes
}
# Consumer optimization
consumer_config = {
# Fetching
'fetch.min.bytes': 1024, # Wait for 1KB
'fetch.max.wait.ms': 500, # Or 500ms
# Max records per poll
'max.poll.records': 500,
# Session timeout
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 3000,
# Processing timeout
'max.poll.interval.ms': 300000, # 5 minutes
# Auto offset reset
'auto.offset.reset': 'earliest',
# Disable auto commit (manual commit for control)
'enable.auto.commit': False,
}
Key Takeaways
- Schema evolution is inevitable: Plan for it from day one
- DLQs are essential: But categorize errors and retry appropriately
- Exactly-once is hard: Use idempotent processing + transaction
- Partition wisely: The partition key determines scalability
- Monitor lag religiously: It’s your canary
- Test multi-DC carefully: Replication loops are real
- Tune for your workload: Default configs are rarely optimal
Event streaming at scale requires careful attention to detail, comprehensive monitoring, and defensive programming. These patterns have proven effective across billions of events.