After another year of operating event streaming systems processing over 100 million events daily, I’ve refined our practices and learned new lessons. This post shares advanced patterns that work at scale—beyond the basics.

Schema Evolution at Scale

Schema changes are inevitable. Handle them gracefully:

from dataclasses import dataclass
from typing import Optional
import json

@dataclass
class EventV1:
    """Original schema"""
    user_id: str
    action: str
    timestamp: int

@dataclass
class EventV2:
    """Evolved schema - added optional fields"""
    user_id: str
    action: str
    timestamp: int
    metadata: Optional[dict] = None  # New field
    source: str = "web"  # New field with default

class CompatibleEventSerializer:
    """
    Serialize events with version information
    """

    def serialize(self, event: EventV2) -> bytes:
        """Add schema version to every event"""
        envelope = {
            '__schema_version': 2,
            '__event_type': 'user_action',
            'payload': {
                'user_id': event.user_id,
                'action': event.action,
                'timestamp': event.timestamp,
                'metadata': event.metadata,
                'source': event.source
            }
        }
        return json.dumps(envelope).encode('utf-8')

    def deserialize(self, data: bytes):
        """Handle multiple schema versions"""
        envelope = json.loads(data.decode('utf-8'))
        version = envelope.get('__schema_version', 1)

        payload = envelope['payload']

        if version == 1:
            # Migrate v1 to v2 on read
            return EventV2(
                user_id=payload['user_id'],
                action=payload['action'],
                timestamp=payload['timestamp'],
                metadata=None,  # Not present in v1
                source='web'    # Default for v1 events
            )
        elif version == 2:
            return EventV2(**payload)
        else:
            raise ValueError(f"Unknown schema version: {version}")

Dead Letter Queue Patterns

DLQs are essential but often implemented poorly:

public class SmartDLQHandler {

    private final KafkaProducer<String, byte[]> dlqProducer;
    private final KafkaProducer<String, byte[]> retryProducer;
    private final MetricsCollector metrics;

    public void handleFailedEvent(ConsumerRecord<String, byte[]> record,
                                  Exception error,
                                  int attemptCount) {

        // Classify the error
        ErrorType errorType = classifyError(error);

        // Add metadata about the failure
        Map<String, String> headers = new HashMap<>();
        headers.put("original-topic", record.topic());
        headers.put("original-partition", String.valueOf(record.partition()));
        headers.put("original-offset", String.valueOf(record.offset()));
        headers.put("error-type", errorType.name());
        headers.put("error-message", error.getMessage());
        headers.put("attempt-count", String.valueOf(attemptCount));
        headers.put("failed-at", Instant.now().toString());
        headers.put("stack-trace", getStackTrace(error));

        if (errorType.isRetriable() && attemptCount < 3) {
            // Send to retry topic with exponential backoff
            long delayMs = calculateBackoff(attemptCount);
            headers.put("retry-after", String.valueOf(
                System.currentTimeMillis() + delayMs
            ));

            retryProducer.send(new ProducerRecord<>(
                "retry-topic",
                null,
                System.currentTimeMillis() + delayMs,
                record.key(),
                record.value(),
                createHeaders(headers)
            ));

            metrics.counter("events.retried",
                "error_type", errorType.name()).increment();

        } else {
            // Send to DLQ for manual review
            dlqProducer.send(new ProducerRecord<>(
                "dlq-topic",
                record.key(),
                record.value(),
                createHeaders(headers)
            ));

            metrics.counter("events.dlq",
                "error_type", errorType.name()).increment();

            // Alert on high DLQ rate
            if (getDLQRate() > 0.01) { // 1%
                alerting.sendAlert("High DLQ rate: " + getDLQRate());
            }
        }
    }

    private ErrorType classifyError(Exception error) {
        if (error instanceof JsonParseException) {
            return ErrorType.MALFORMED_DATA; // Not retriable
        } else if (error instanceof TimeoutException) {
            return ErrorType.TIMEOUT; // Retriable
        } else if (error instanceof DatabaseException) {
            return ErrorType.DATABASE_ERROR; // Retriable
        } else {
            return ErrorType.UNKNOWN; // Retriable with caution
        }
    }

    private long calculateBackoff(int attemptCount) {
        // Exponential backoff: 1s, 2s, 4s
        return (long) Math.pow(2, attemptCount) * 1000;
    }
}

Exactly-Once Semantics in Practice

True exactly-once is hard. Here’s what works:

class ExactlyOnceProcessor:
    """
    Idempotent event processing with deduplication
    """

    def __init__(self):
        self.kafka_consumer = KafkaConsumer(
            enable_auto_commit=False  # Manual commit control
        )
        self.db = Database()
        self.processed_events = BloomFilter()  # Quick dedup check

    def process_batch(self, events):
        """
        Process batch with exactly-once guarantees
        """
        # Start transaction
        with self.db.transaction() as tx:
            processed_count = 0

            for event in events:
                # Fast path: check bloom filter
                if event.id in self.processed_events:
                    # Likely already processed, verify in DB
                    if self.db.is_processed(event.id):
                        continue  # Skip duplicate

                try:
                    # Process event
                    result = self.process_event(event)

                    # Store result and mark as processed atomically
                    tx.execute(
                        "INSERT INTO results (event_id, data) VALUES (?, ?)",
                        event.id, result
                    )
                    tx.execute(
                        "INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)",
                        event.id, datetime.now()
                    )

                    # Update bloom filter
                    self.processed_events.add(event.id)

                    processed_count += 1

                except IntegrityError:
                    # Event already processed by another consumer
                    # (race condition between bloom filter check and DB insert)
                    continue

            # Commit Kafka offsets within same transaction
            tx.execute(
                "INSERT INTO kafka_offsets (topic, partition, offset) VALUES (?, ?, ?) " +
                "ON CONFLICT (topic, partition) DO UPDATE SET offset = ?",
                event.topic, event.partition, event.offset, event.offset
            )

            # Commit transaction (DB + Kafka offsets together)
            tx.commit()

            # Commit to Kafka
            self.kafka_consumer.commit()

            return processed_count

Stream Partitioning Strategies

Choosing the right partitioning key is critical:

type PartitioningStrategy interface {
    GetPartition(event Event, numPartitions int) int
}

// Strategy 1: By Entity ID (preserves ordering per entity)
type EntityIDPartitioner struct{}

func (p *EntityIDPartitioner) GetPartition(event Event, numPartitions int) int {
    // All events for same entity go to same partition
    hash := fnv.New32a()
    hash.Write([]byte(event.EntityID))
    return int(hash.Sum32()) % numPartitions
}

// Strategy 2: By Time Window (enables parallel processing)
type TimeWindowPartitioner struct {
    WindowSize time.Duration
}

func (p *TimeWindowPartitioner) GetPartition(event Event, numPartitions int) int {
    // Events in same time window go to same partition
    window := event.Timestamp.Truncate(p.WindowSize).Unix()
    return int(window) % numPartitions
}

// Strategy 3: By Hash of Multiple Fields (distribute hot keys)
type CompositeKeyPartitioner struct {
    Fields []string
}

func (p *CompositeKeyPartitioner) GetPartition(event Event, numPartitions int) int {
    // Combine multiple fields to distribute load
    hash := fnv.New32a()
    for _, field := range p.Fields {
        hash.Write([]byte(event.GetField(field)))
    }
    return int(hash.Sum32()) % numPartitions
}

// Strategy 4: Sticky Partitioning (maximize batching)
type StickyPartitioner struct {
    currentPartition int
    messageCount     int
    rotateAfter      int
}

func (p *StickyPartitioner) GetPartition(event Event, numPartitions int) int {
    p.messageCount++

    if p.messageCount >= p.rotateAfter {
        p.currentPartition = (p.currentPartition + 1) % numPartitions
        p.messageCount = 0
    }

    return p.currentPartition
}

Consumer Lag Management

Lag monitoring and auto-scaling:

class LagMonitor:
    """
    Monitor and respond to consumer lag
    """

    def __init__(self):
        self.kafka_admin = KafkaAdminClient()
        self.metrics = MetricsCollector()
        self.autoscaler = ConsumerAutoscaler()

    def monitor_lag(self, group_id: str, topic: str):
        """
        Monitor lag and take action
        """
        # Get current lag for each partition
        offsets = self.kafka_admin.list_consumer_group_offsets(group_id)
        end_offsets = self.kafka_admin.list_offsets(topic)

        total_lag = 0
        max_lag = 0
        partition_lags = {}

        for partition, offset_info in offsets[topic].items():
            current_offset = offset_info.offset
            end_offset = end_offsets[topic][partition]
            lag = end_offset - current_offset

            partition_lags[partition] = lag
            total_lag += lag
            max_lag = max(max_lag, lag)

        # Record metrics
        self.metrics.gauge('consumer.lag.total', total_lag,
                          tags={'group': group_id, 'topic': topic})
        self.metrics.gauge('consumer.lag.max', max_lag,
                          tags={'group': group_id, 'topic': topic})

        # Check for hot partitions
        avg_lag = total_lag / len(partition_lags)
        for partition, lag in partition_lags.items():
            if lag > avg_lag * 2:  # 2x average
                self.alert_hot_partition(group_id, topic, partition, lag)

        # Auto-scale based on lag
        if total_lag > 100000:  # High lag threshold
            num_consumers = self.get_consumer_count(group_id)
            num_partitions = len(partition_lags)

            if num_consumers < num_partitions:
                # Can scale up
                self.autoscaler.scale_up(group_id, min(num_partitions, num_consumers + 2))
                self.alert('Scaling up consumers due to high lag')

        elif total_lag < 1000 and self.get_consumer_count(group_id) > 1:
            # Low lag, can scale down
            self.autoscaler.scale_down(group_id, max(1, num_consumers - 1))

    def calculate_time_to_catch_up(self, group_id: str, topic: str) -> float:
        """
        Estimate time to catch up based on current processing rate
        """
        lag = self.get_total_lag(group_id, topic)
        processing_rate = self.get_processing_rate(group_id, topic)

        if processing_rate == 0:
            return float('inf')

        return lag / processing_rate  # seconds

Multi-Datacenter Replication

Replicate streams across datacenters:

public class MultiDCReplication {

    private final Map<String, KafkaProducer> producersByDC;
    private final ReplicationConfig config;

    public void replicateEvent(Event event, String sourceDC) {
        // Don't replicate back to source
        Set<String> targetDCs = getTargetDCs(sourceDC);

        // Replicate to each DC in parallel
        List<CompletableFuture<RecordMetadata>> futures = targetDCs.stream()
            .map(dc -> replicateToDatacenter(event, dc))
            .collect(Collectors.toList());

        // Wait for all replications (or timeout)
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .get(config.getReplicationTimeout(), TimeUnit.MILLISECONDS);

        } catch (TimeoutException e) {
            // Some DCs didn't respond in time
            // Log and continue - replication will catch up
            logger.warn("Replication timeout for event {}", event.getId());
        }
    }

    private CompletableFuture<RecordMetadata> replicateToDatacenter(
            Event event, String dc) {

        KafkaProducer producer = producersByDC.get(dc);

        // Add metadata about replication
        ProducerRecord<String, Event> record = new ProducerRecord<>(
            config.getTopicName(),
            event.getId(),
            event
        );

        record.headers().add("source-dc", sourceDC.getBytes());
        record.headers().add("replicated-at",
            String.valueOf(System.currentTimeMillis()).getBytes());

        return CompletableFuture.supplyAsync(() -> {
            try {
                return producer.send(record).get();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        });
    }

    // Detect replication loops
    private boolean isReplicationLoop(Event event) {
        String sourceDC = new String(
            event.headers().lastHeader("source-dc").value()
        );

        // Count how many times this event has been replicated
        int replicationCount = 0;
        for (Header header : event.headers()) {
            if (header.key().equals("replicated-at")) {
                replicationCount++;
            }
        }

        if (replicationCount > config.getMaxReplicationHops()) {
            logger.error("Replication loop detected for event {}", event.getId());
            return true;
        }

        return false;
    }
}

Performance Tuning

Real-world performance optimization:

# Producer optimization
producer_config = {
    # Batching
    'linger.ms': 100,  # Wait up to 100ms to batch
    'batch.size': 32768,  # 32KB batches

    # Compression
    'compression.type': 'lz4',  # Fast compression

    # Memory
    'buffer.memory': 67108864,  # 64MB buffer

    # Acks
    'acks': 'all',  # Wait for all replicas (durability)
    'max.in.flight.requests.per.connection': 5,

    # Idempotence
    'enable.idempotence': True,

    # Retries
    'retries': Integer.MAX_VALUE,
    'delivery.timeout.ms': 120000,  # 2 minutes
}

# Consumer optimization
consumer_config = {
    # Fetching
    'fetch.min.bytes': 1024,  # Wait for 1KB
    'fetch.max.wait.ms': 500,  # Or 500ms

    # Max records per poll
    'max.poll.records': 500,

    # Session timeout
    'session.timeout.ms': 30000,
    'heartbeat.interval.ms': 3000,

    # Processing timeout
    'max.poll.interval.ms': 300000,  # 5 minutes

    # Auto offset reset
    'auto.offset.reset': 'earliest',

    # Disable auto commit (manual commit for control)
    'enable.auto.commit': False,
}

Key Takeaways

  1. Schema evolution is inevitable: Plan for it from day one
  2. DLQs are essential: But categorize errors and retry appropriately
  3. Exactly-once is hard: Use idempotent processing + transaction
  4. Partition wisely: The partition key determines scalability
  5. Monitor lag religiously: It’s your canary
  6. Test multi-DC carefully: Replication loops are real
  7. Tune for your workload: Default configs are rarely optimal

Event streaming at scale requires careful attention to detail, comprehensive monitoring, and defensive programming. These patterns have proven effective across billions of events.