The shift from batch to real-time data processing fundamentally changes how systems are built. After migrating multiple batch pipelines to streaming, processing 100M+ events daily in real-time, hereβs what actually works.
Why Streaming?
Traditional batch processing has inherent limitations:
# Batch processing: Process data every hour
def batch_job():
# Wait for data to accumulate
data = load_data_from_last_hour()
# Process in bulk
results = process_batch(data)
# Write results
save_results(results)
# Problems:
# - Data is stale (up to 1 hour old)
# - Resource waste (idle between batches)
# - All-or-nothing failures
# - Hard to scale specific stages
Streaming solves these problems:
# Streaming: Process data as it arrives
def stream_processor():
for event in event_stream:
# Process immediately
result = process_event(event)
# Emit result
output_stream.send(result)
# Benefits:
# - Fresh data (millisecond latency)
# - Continuous resource utilization
# - Granular error handling
# - Independent scaling per stage
Migration Strategy
Phase 1: Dual Write
Run batch and streaming in parallel:
public class DualWriteService {
private final BatchWriter batchWriter;
private final StreamWriter streamWriter;
private final FeatureToggle toggle;
public void processData(Data data) {
// Continue writing to batch (existing system)
batchWriter.write(data);
// Also write to stream (new system)
if (toggle.isEnabled("streaming-write")) {
try {
streamWriter.write(data);
} catch (Exception e) {
// Don't fail if streaming fails
logger.error("Streaming write failed", e);
metrics.counter("streaming.write.errors").increment();
}
}
}
}
Phase 2: Dual Read
Verify streaming output matches batch:
class DualReadValidator:
"""
Compare batch and streaming results
"""
def validate_results(self, entity_id: str):
# Get result from batch system
batch_result = self.batch_store.get(entity_id)
# Get result from streaming system
stream_result = self.stream_store.get(entity_id)
# Compare
if not self.are_equivalent(batch_result, stream_result):
self.log_discrepancy(entity_id, batch_result, stream_result)
# Record metrics
self.metrics.counter('validation.mismatches').inc()
# Alert if mismatch rate is high
if self.get_mismatch_rate() > 0.01: # 1%
self.alert_high_mismatch_rate()
def are_equivalent(self, batch: dict, stream: dict) -> bool:
"""
Check if results are equivalent
Allow for small numerical differences due to floating point
"""
if set(batch.keys()) != set(stream.keys()):
return False
for key in batch.keys():
batch_val = batch[key]
stream_val = stream[key]
if isinstance(batch_val, float):
# Allow small difference for floats
if abs(batch_val - stream_val) > 1e-6:
return False
elif batch_val != stream_val:
return False
return True
Phase 3: Full Cutover
Switch to streaming, keep batch as backup:
type DataProcessor struct {
streamingEnabled bool
batchFallback bool
}
func (p *DataProcessor) GetData(id string) (Data, error) {
if p.streamingEnabled {
// Try streaming first
data, err := p.streamStore.Get(id)
if err == nil {
metrics.Counter("reads.streaming").Inc()
return data, nil
}
// Fallback to batch if streaming fails
if p.batchFallback {
logger.Warn("Streaming read failed, falling back to batch",
"id", id, "error", err)
metrics.Counter("reads.batch_fallback").Inc()
return p.batchStore.Get(id)
}
return Data{}, err
}
// Still on batch system
metrics.Counter("reads.batch").Inc()
return p.batchStore.Get(id)
}
Streaming Aggregations
Window operations are fundamental to streaming:
// Tumbling window: non-overlapping fixed windows
public class TumblingWindowAggregator {
public KTable<Windowed<String>, Long> countByWindow(
KStream<String, Event> events) {
return events
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.as("counts-store"));
}
// Query results
public Long getCount(String key, Instant windowStart) {
ReadOnlyWindowStore<String, Long> store =
streams.store("counts-store", QueryableStoreTypes.windowStore());
TimeWindow window = new TimeWindow(
windowStart.toEpochMilli(),
windowStart.plus(Duration.ofMinutes(5)).toEpochMilli()
);
WindowStoreIterator<Long> iter = store.fetch(
key,
windowStart.toEpochMilli(),
windowStart.plus(Duration.ofMinutes(5)).toEpochMilli()
);
if (iter.hasNext()) {
return iter.next().value;
}
return 0L;
}
}
// Session window: dynamic windows based on activity
public class SessionWindowAggregator {
public KTable<Windowed<String>, Session> detectSessions(
KStream<String, Activity> activities) {
return activities
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.aggregate(
Session::new,
(key, activity, session) -> {
session.addActivity(activity);
return session;
},
(key, session1, session2) -> {
// Merge sessions if gap < 30 minutes
return session1.merge(session2);
},
Materialized.as("sessions-store")
);
}
}
Late Data Handling
Dealing with out-of-order events:
from datetime import datetime, timedelta
class LateDataHandler:
"""
Handle late-arriving events gracefully
"""
def __init__(self, allowed_lateness: timedelta = timedelta(minutes=10)):
self.allowed_lateness = allowed_lateness
self.watermark = datetime.min
def process_event(self, event):
"""
Process event with watermarking
"""
event_time = event.timestamp
# Update watermark (max event time seen)
self.watermark = max(self.watermark, event_time)
# Calculate how late this event is
lateness = self.watermark - event_time
if lateness > self.allowed_lateness:
# Too late, send to late data topic
self.metrics.counter('events.too_late').inc()
self.late_events_topic.send(event)
logger.info(f"Event too late: {event.id}, lateness: {lateness}")
return None
# Within allowed lateness, process normally
return self.process(event)
def get_current_watermark(self) -> datetime:
"""Expose watermark for downstream processors"""
return self.watermark
Stateful Processing
Maintaining state in streaming:
public class StatefulProcessor {
private final String storeName = "user-state-store";
public KStream<String, UserUpdate> processWithState(
KStream<String, Event> events) {
// Create state store
StoreBuilder<KeyValueStore<String, UserState>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
new UserStateSerde()
)
.withCachingEnabled()
.withLoggingEnabled(Map.of(
"cleanup.policy", "compact",
"min.cleanable.dirty.ratio", "0.01"
));
// Add store to topology
builder.addStateStore(storeBuilder);
// Transform with state
return events.transformValues(
() -> new ValueTransformerWithKey<String, Event, UserUpdate>() {
private KeyValueStore<String, UserState> stateStore;
@Override
public void init(ProcessorContext context) {
this.stateStore = context.getStateStore(storeName);
}
@Override
public UserUpdate transform(String key, Event event) {
// Get current state
UserState state = stateStore.get(key);
if (state == null) {
state = new UserState(key);
}
// Update state based on event
UserUpdate update = state.apply(event);
// Save updated state
stateStore.put(key, state);
return update;
}
@Override
public void close() {}
},
storeName // Attach to state store
);
}
}
Stream Processing Patterns
Pattern 1: Enrichment
class StreamEnricher:
"""
Enrich stream with data from external sources
"""
def __init__(self):
self.user_cache = {} # Local cache
self.user_service = UserServiceClient()
async def enrich_event(self, event):
"""
Add user information to event
"""
user_id = event['user_id']
# Check cache
if user_id in self.user_cache:
user_info = self.user_cache[user_id]
else:
# Fetch from service
user_info = await self.user_service.get_user(user_id)
self.user_cache[user_id] = user_info
# Create enriched event
return {
**event,
'user_name': user_info['name'],
'user_segment': user_info['segment'],
'user_tier': user_info['tier']
}
def enrich_stream(self, input_stream):
"""
Enrich entire stream
"""
return input_stream.map_async(self.enrich_event)
Pattern 2: Filtering
type StreamFilter struct {
rules []FilterRule
}
type FilterRule interface {
Matches(event Event) bool
Name() string
}
// Allow only specific event types
type EventTypeFilter struct {
allowedTypes map[string]bool
}
func (f *EventTypeFilter) Matches(event Event) bool {
return f.allowedTypes[event.Type]
}
// Filter by user segment
type UserSegmentFilter struct {
allowedSegments map[string]bool
userService *UserService
}
func (f *UserSegmentFilter) Matches(event Event) bool {
user, err := f.userService.GetUser(event.UserID)
if err != nil {
return false
}
return f.allowedSegments[user.Segment]
}
// Apply filters to stream
func (sf *StreamFilter) Filter(input <-chan Event) <-chan Event {
output := make(chan Event, 1000)
go func() {
defer close(output)
for event := range input {
// Apply all filter rules
pass := true
for _, rule := range sf.rules {
if !rule.Matches(event) {
metrics.Counter("events.filtered",
"rule", rule.Name()).Inc()
pass = false
break
}
}
if pass {
output <- event
}
}
}()
return output
}
Pattern 3: Fan-Out
class StreamFanOut:
"""
Send events to multiple downstream processors
"""
def __init__(self, processors: list):
self.processors = processors
async def fan_out(self, event):
"""
Send to all processors concurrently
"""
tasks = [
processor.process(event)
for processor in self.processors
]
# Wait for all processors
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for errors
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(
f"Processor {self.processors[i].name} failed: {result}"
)
self.metrics.counter(
'fanout.errors',
tags={'processor': self.processors[i].name}
).inc()
return results
Monitoring Streaming Pipelines
class StreamingMetrics:
"""
Comprehensive metrics for streaming pipelines
"""
def __init__(self):
self.metrics = MetricsCollector()
def record_event_processed(self, stage: str, duration_ms: float):
"""Record successful processing"""
self.metrics.histogram(
'stream.processing.duration',
duration_ms,
tags={'stage': stage}
)
self.metrics.counter(
'stream.events.processed',
tags={'stage': stage}
).inc()
def record_event_failed(self, stage: str, error_type: str):
"""Record processing failure"""
self.metrics.counter(
'stream.events.failed',
tags={'stage': stage, 'error_type': error_type}
).inc()
def record_watermark(self, stage: str, watermark: datetime):
"""Track watermark progress"""
lag_seconds = (datetime.now() - watermark).total_seconds()
self.metrics.gauge(
'stream.watermark.lag',
lag_seconds,
tags={'stage': stage}
)
def record_state_size(self, store_name: str, size_bytes: int):
"""Track state store size"""
self.metrics.gauge(
'stream.state.size',
size_bytes,
tags={'store': store_name}
)
def check_pipeline_health(self) -> bool:
"""Overall pipeline health check"""
checks = {
'processing_rate': self.check_processing_rate(),
'error_rate': self.check_error_rate(),
'watermark_lag': self.check_watermark_lag(),
'state_size': self.check_state_size()
}
all_healthy = all(checks.values())
if not all_healthy:
failed = [k for k, v in checks.items() if not v]
logger.warning(f"Pipeline health check failed: {failed}")
return all_healthy
def check_processing_rate(self) -> bool:
"""Ensure we're processing enough events"""
rate = self.metrics.get_rate('stream.events.processed', window_seconds=60)
return rate > 100 # At least 100 events/second
def check_error_rate(self) -> bool:
"""Error rate should be low"""
total = self.metrics.get_count('stream.events.processed')
errors = self.metrics.get_count('stream.events.failed')
if total == 0:
return True
error_rate = errors / total
return error_rate < 0.01 # Less than 1%
def check_watermark_lag(self) -> bool:
"""Watermark shouldn't fall too far behind"""
lag = self.metrics.get_gauge('stream.watermark.lag')
return lag < 300 # Less than 5 minutes
Key Takeaways
- Migrate incrementally: Dual write β dual read β cutover
- Handle late data: Use watermarks and allowed lateness
- State management is critical: Choose appropriate storage
- Window wisely: Tumbling for simple aggregations, session for user behavior
- Enrich carefully: Cache external lookups
- Monitor comprehensively: Processing rate, errors, lag, state size
- Plan for failures: Streaming systems must handle partial failures gracefully
Real-time streaming enables new capabilities but requires different thinking than batch. Start with a single pipeline, learn the patterns, then scale to dozens of streams.