Stream processing with Kafka has evolved from a niche technology to a critical component of modern data architectures. After building and operating stream processing pipelines handling billions of events, I’ve learned that the gap between proof-of-concept and production-ready systems is substantial. This post bridges that gap with practical patterns and hard-won lessons.
Understanding Stream Processing Fundamentals
Stream processing differs fundamentally from batch processing. Instead of waiting for complete datasets, stream processors react to events as they arrive:
// Batch mindset
List<Event> events = database.fetchAll();
results = events.stream()
.filter(e -> e.timestamp > cutoff)
.map(this::transform)
.collect(toList());
// Stream mindset
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> stream = builder.stream("events-topic");
stream
.filter((key, event) -> event.timestamp > cutoff)
.mapValues(this::transform)
.to("processed-events");
Architecture Patterns for Stream Processing
1. Stateless vs Stateful Processing
Stateless transformations are straightforward:
public class StatelessProcessor {
public KStream<String, Event> process(KStream<String, Event> input) {
return input
.filter((k, v) -> v.isValid())
.mapValues(event -> {
event.enrichWithMetadata();
return event;
});
}
}
Stateful processing requires careful state management:
public class StatefulAggregator {
public KTable<String, UserStats> aggregateUserActivity(
KStream<String, Activity> activities) {
return activities
.groupByKey()
.aggregate(
UserStats::new, // Initializer
(key, activity, stats) -> {
// State store is automatically managed
stats.addActivity(activity);
return stats;
},
Materialized.<String, UserStats, KeyValueStore<Bytes, byte[]>>as(
"user-stats-store"
)
.withKeySerde(Serdes.String())
.withValueSerde(new UserStatsSerde())
);
}
}
2. Windowing Strategies
Windows are essential for time-based aggregations. Choose wisely:
public class WindowedAnalytics {
// Tumbling windows: non-overlapping, fixed size
public KTable<Windowed<String>, Long> tumblingCount(
KStream<String, Event> stream) {
return stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
}
// Hopping windows: overlapping, fixed size
public KTable<Windowed<String>, Long> hoppingCount(
KStream<String, Event> stream) {
return stream
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1))
)
.count();
}
// Session windows: dynamic size based on activity
public KTable<Windowed<String>, Long> sessionCount(
KStream<String, Event> stream) {
return stream
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.count();
}
}
3. Stream-Table Joins
Joining streams with tables is a common pattern for enrichment:
from kafka import KafkaConsumer, KafkaProducer
import json
class StreamTableJoiner:
def __init__(self):
self.user_cache = {} # Local cache of user table
self.consumer = KafkaConsumer('activity-stream')
self.producer = KafkaProducer()
def process(self):
for message in self.consumer:
activity = json.loads(message.value)
user_id = activity['userId']
# Lookup user data (cached or fetched)
user_data = self.get_user_data(user_id)
# Enrich activity with user data
enriched = {
**activity,
'userName': user_data['name'],
'userSegment': user_data['segment']
}
self.producer.send('enriched-activity',
json.dumps(enriched).encode())
def get_user_data(self, user_id):
if user_id not in self.user_cache:
# Populate from compacted topic or database
self.user_cache[user_id] = self.fetch_user(user_id)
return self.user_cache[user_id]
Performance Optimization Techniques
1. Parallelism and Partitioning
Kafka Streams parallelism equals the number of input partitions:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processor");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); // Threads per instance
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// With 12 partitions and 3 instances with 4 threads each:
// 12 partitions / 12 threads = 1 partition per thread (optimal)
2. State Store Configuration
State stores impact performance significantly:
public class OptimizedStateStore {
public StoreBuilder<KeyValueStore<String, Event>> createStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("events-store"),
Serdes.String(),
new EventSerde()
)
.withCachingEnabled() // Reduces disk I/O
.withLoggingEnabled(Map.of(
// Compact changelog topic
"cleanup.policy", "compact",
"segment.ms", "3600000",
"min.cleanable.dirty.ratio", "0.01"
));
}
}
3. Batch Processing Optimization
Tune batch sizes for throughput:
class OptimizedProcessor:
def __init__(self):
self.config = {
'linger.ms': 100, # Wait up to 100ms to batch
'batch.size': 16384, # 16KB batches
'buffer.memory': 33554432, # 32MB buffer
'compression.type': 'snappy', # Fast compression
}
Handling Late and Out-of-Order Events
Event time vs processing time is critical:
public class LateEventHandler {
public KStream<String, Event> handleLateEvents(
KStream<String, Event> stream) {
// Define grace period for late events
Duration gracePeriod = Duration.ofMinutes(10);
return stream
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(5))
.grace(gracePeriod) // Accept late events
)
.aggregate(
EventAggregate::new,
(key, event, aggregate) -> {
// Check if event is too late
if (event.isLaterThan(gracePeriod)) {
sendToLateEventTopic(key, event);
return aggregate;
}
aggregate.add(event);
return aggregate;
},
Materialized.as("windowed-aggregates")
)
.toStream();
}
}
Error Handling and Reliability
1. Deserialization Error Handling
public class RobustDeserializer implements Deserializer<Event> {
@Override
public Event deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, Event.class);
} catch (Exception e) {
// Log and send to DLQ
logger.error("Deserialization failed", e);
sendToDeadLetterQueue(topic, data, e);
return null; // Skip this record
}
}
}
// Configure in streams
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
2. Processing Error Handling
type StreamProcessor struct {
errorHandler ErrorHandler
metrics MetricsCollector
}
func (p *StreamProcessor) ProcessEvent(event Event) error {
defer func() {
if r := recover(); r != nil {
p.errorHandler.HandlePanic(event, r)
p.metrics.RecordPanicRecovery()
}
}()
if err := p.validate(event); err != nil {
p.errorHandler.HandleValidationError(event, err)
return nil // Continue processing
}
if err := p.transform(event); err != nil {
// Decide: retry, skip, or DLQ
if isRetriable(err) {
return err // Retry
}
p.errorHandler.SendToDLQ(event, err)
return nil // Continue
}
return nil
}
Monitoring and Observability
Critical Metrics
class StreamProcessorMetrics:
def __init__(self):
self.metrics = {
# Throughput metrics
'records_processed_total': Counter(),
'processing_rate': Gauge(),
# Latency metrics
'processing_latency_ms': Histogram(
buckets=[10, 50, 100, 500, 1000, 5000]
),
'end_to_end_latency_ms': Histogram(),
# State metrics
'state_store_size_bytes': Gauge(),
'state_store_entries': Gauge(),
# Error metrics
'deserialization_errors': Counter(),
'processing_errors': Counter(),
'dlq_messages': Counter(),
# Kafka-specific
'consumer_lag': Gauge(),
'rebalances_total': Counter(),
}
def record_processing(self, event, duration_ms):
self.metrics['records_processed_total'].inc()
self.metrics['processing_latency_ms'].observe(duration_ms)
# Calculate end-to-end latency
e2e_latency = time.now() - event.timestamp
self.metrics['end_to_end_latency_ms'].observe(e2e_latency)
Alerting Strategy
# Production-tested alert rules
alerts:
- name: HighConsumerLag
condition: consumer_lag > 100000
duration: 5m
severity: critical
- name: ProcessingLatencyHigh
condition: p99_latency_ms > 5000
duration: 10m
severity: warning
- name: ErrorRateHigh
condition: error_rate > 0.01 # 1%
duration: 5m
severity: critical
- name: StateStoreGrowing
condition: rate(state_store_size_bytes[1h]) > threshold
duration: 30m
severity: warning
Deployment and Operational Best Practices
1. Rolling Deployments
# Deploy stream processors with zero downtime
#!/bin/bash
# Scale up new version
kubectl scale deployment stream-processor-v2 --replicas=3
# Wait for new pods to be ready and consuming
sleep 60
# Scale down old version gradually
kubectl scale deployment stream-processor-v1 --replicas=2
sleep 30
kubectl scale deployment stream-processor-v1 --replicas=1
sleep 30
kubectl scale deployment stream-processor-v1 --replicas=0
2. State Store Management
public class StateStoreManager {
// Enable standby replicas for faster recovery
public Properties configureHighAvailability() {
Properties props = new Properties();
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 10000L);
return props;
}
// Clean up state on application reset (use with caution!)
public void resetApplication() {
KafkaStreams streams = new KafkaStreams(topology, props);
streams.cleanUp(); // Deletes local state stores
}
}
Testing Stream Processing Applications
public class StreamProcessorTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Event> inputTopic;
private TestOutputTopic<String, ProcessedEvent> outputTopic;
@Before
public void setup() {
testDriver = new TopologyTestDriver(
buildTopology(),
getTestConfig()
);
inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
new EventSerializer()
);
outputTopic = testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
new ProcessedEventDeserializer()
);
}
@Test
public void testEventProcessing() {
// Given
Event event = new Event("user-1", "login", Instant.now());
// When
inputTopic.pipeInput("user-1", event);
// Then
ProcessedEvent result = outputTopic.readValue();
assertEquals("user-1", result.getUserId());
assertTrue(result.isProcessed());
}
}
Conclusion
Stream processing with Kafka enables real-time data processing at massive scale, but production success requires attention to parallelism, state management, error handling, and observability. The patterns and practices shared here come from operating pipelines processing billions of events daily.
Key takeaways:
- Choose the right windowing strategy for your use case
- Monitor consumer lag religiously
- Plan for late events and out-of-order data
- Implement comprehensive error handling with DLQs
- Test thoroughly with TopologyTestDriver
- Deploy carefully with proper state management
Start simple, measure everything, and scale incrementally as you understand your workload characteristics.