The shift from batch to real-time data processing fundamentally changes how systems are built. After migrating multiple batch pipelines to streaming, processing 100M+ events daily in real-time, here’s what actually works.

Why Streaming?

Traditional batch processing has inherent limitations:

# Batch processing: Process data every hour
def batch_job():
    # Wait for data to accumulate
    data = load_data_from_last_hour()

    # Process in bulk
    results = process_batch(data)

    # Write results
    save_results(results)

# Problems:
# - Data is stale (up to 1 hour old)
# - Resource waste (idle between batches)
# - All-or-nothing failures
# - Hard to scale specific stages

Streaming solves these problems:

# Streaming: Process data as it arrives
def stream_processor():
    for event in event_stream:
        # Process immediately
        result = process_event(event)

        # Emit result
        output_stream.send(result)

# Benefits:
# - Fresh data (millisecond latency)
# - Continuous resource utilization
# - Granular error handling
# - Independent scaling per stage

Migration Strategy

Phase 1: Dual Write

Run batch and streaming in parallel:

public class DualWriteService {

    private final BatchWriter batchWriter;
    private final StreamWriter streamWriter;
    private final FeatureToggle toggle;

    public void processData(Data data) {
        // Continue writing to batch (existing system)
        batchWriter.write(data);

        // Also write to stream (new system)
        if (toggle.isEnabled("streaming-write")) {
            try {
                streamWriter.write(data);
            } catch (Exception e) {
                // Don't fail if streaming fails
                logger.error("Streaming write failed", e);
                metrics.counter("streaming.write.errors").increment();
            }
        }
    }
}

Phase 2: Dual Read

Verify streaming output matches batch:

class DualReadValidator:
    """
    Compare batch and streaming results
    """

    def validate_results(self, entity_id: str):
        # Get result from batch system
        batch_result = self.batch_store.get(entity_id)

        # Get result from streaming system
        stream_result = self.stream_store.get(entity_id)

        # Compare
        if not self.are_equivalent(batch_result, stream_result):
            self.log_discrepancy(entity_id, batch_result, stream_result)

            # Record metrics
            self.metrics.counter('validation.mismatches').inc()

            # Alert if mismatch rate is high
            if self.get_mismatch_rate() > 0.01:  # 1%
                self.alert_high_mismatch_rate()

    def are_equivalent(self, batch: dict, stream: dict) -> bool:
        """
        Check if results are equivalent
        Allow for small numerical differences due to floating point
        """
        if set(batch.keys()) != set(stream.keys()):
            return False

        for key in batch.keys():
            batch_val = batch[key]
            stream_val = stream[key]

            if isinstance(batch_val, float):
                # Allow small difference for floats
                if abs(batch_val - stream_val) > 1e-6:
                    return False
            elif batch_val != stream_val:
                return False

        return True

Phase 3: Full Cutover

Switch to streaming, keep batch as backup:

type DataProcessor struct {
    streamingEnabled bool
    batchFallback   bool
}

func (p *DataProcessor) GetData(id string) (Data, error) {
    if p.streamingEnabled {
        // Try streaming first
        data, err := p.streamStore.Get(id)
        if err == nil {
            metrics.Counter("reads.streaming").Inc()
            return data, nil
        }

        // Fallback to batch if streaming fails
        if p.batchFallback {
            logger.Warn("Streaming read failed, falling back to batch",
                "id", id, "error", err)
            metrics.Counter("reads.batch_fallback").Inc()
            return p.batchStore.Get(id)
        }

        return Data{}, err
    }

    // Still on batch system
    metrics.Counter("reads.batch").Inc()
    return p.batchStore.Get(id)
}

Streaming Aggregations

Window operations are fundamental to streaming:

// Tumbling window: non-overlapping fixed windows
public class TumblingWindowAggregator {

    public KTable<Windowed<String>, Long> countByWindow(
            KStream<String, Event> events) {

        return events
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count(Materialized.as("counts-store"));
    }

    // Query results
    public Long getCount(String key, Instant windowStart) {
        ReadOnlyWindowStore<String, Long> store =
            streams.store("counts-store", QueryableStoreTypes.windowStore());

        TimeWindow window = new TimeWindow(
            windowStart.toEpochMilli(),
            windowStart.plus(Duration.ofMinutes(5)).toEpochMilli()
        );

        WindowStoreIterator<Long> iter = store.fetch(
            key,
            windowStart.toEpochMilli(),
            windowStart.plus(Duration.ofMinutes(5)).toEpochMilli()
        );

        if (iter.hasNext()) {
            return iter.next().value;
        }

        return 0L;
    }
}

// Session window: dynamic windows based on activity
public class SessionWindowAggregator {

    public KTable<Windowed<String>, Session> detectSessions(
            KStream<String, Activity> activities) {

        return activities
            .groupByKey()
            .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
            .aggregate(
                Session::new,
                (key, activity, session) -> {
                    session.addActivity(activity);
                    return session;
                },
                (key, session1, session2) -> {
                    // Merge sessions if gap < 30 minutes
                    return session1.merge(session2);
                },
                Materialized.as("sessions-store")
            );
    }
}

Late Data Handling

Dealing with out-of-order events:

from datetime import datetime, timedelta

class LateDataHandler:
    """
    Handle late-arriving events gracefully
    """

    def __init__(self, allowed_lateness: timedelta = timedelta(minutes=10)):
        self.allowed_lateness = allowed_lateness
        self.watermark = datetime.min

    def process_event(self, event):
        """
        Process event with watermarking
        """
        event_time = event.timestamp

        # Update watermark (max event time seen)
        self.watermark = max(self.watermark, event_time)

        # Calculate how late this event is
        lateness = self.watermark - event_time

        if lateness > self.allowed_lateness:
            # Too late, send to late data topic
            self.metrics.counter('events.too_late').inc()
            self.late_events_topic.send(event)

            logger.info(f"Event too late: {event.id}, lateness: {lateness}")
            return None

        # Within allowed lateness, process normally
        return self.process(event)

    def get_current_watermark(self) -> datetime:
        """Expose watermark for downstream processors"""
        return self.watermark

Stateful Processing

Maintaining state in streaming:

public class StatefulProcessor {

    private final String storeName = "user-state-store";

    public KStream<String, UserUpdate> processWithState(
            KStream<String, Event> events) {

        // Create state store
        StoreBuilder<KeyValueStore<String, UserState>> storeBuilder =
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(storeName),
                Serdes.String(),
                new UserStateSerde()
            )
            .withCachingEnabled()
            .withLoggingEnabled(Map.of(
                "cleanup.policy", "compact",
                "min.cleanable.dirty.ratio", "0.01"
            ));

        // Add store to topology
        builder.addStateStore(storeBuilder);

        // Transform with state
        return events.transformValues(
            () -> new ValueTransformerWithKey<String, Event, UserUpdate>() {

                private KeyValueStore<String, UserState> stateStore;

                @Override
                public void init(ProcessorContext context) {
                    this.stateStore = context.getStateStore(storeName);
                }

                @Override
                public UserUpdate transform(String key, Event event) {
                    // Get current state
                    UserState state = stateStore.get(key);
                    if (state == null) {
                        state = new UserState(key);
                    }

                    // Update state based on event
                    UserUpdate update = state.apply(event);

                    // Save updated state
                    stateStore.put(key, state);

                    return update;
                }

                @Override
                public void close() {}
            },
            storeName  // Attach to state store
        );
    }
}

Stream Processing Patterns

Pattern 1: Enrichment

class StreamEnricher:
    """
    Enrich stream with data from external sources
    """

    def __init__(self):
        self.user_cache = {}  # Local cache
        self.user_service = UserServiceClient()

    async def enrich_event(self, event):
        """
        Add user information to event
        """
        user_id = event['user_id']

        # Check cache
        if user_id in self.user_cache:
            user_info = self.user_cache[user_id]
        else:
            # Fetch from service
            user_info = await self.user_service.get_user(user_id)
            self.user_cache[user_id] = user_info

        # Create enriched event
        return {
            **event,
            'user_name': user_info['name'],
            'user_segment': user_info['segment'],
            'user_tier': user_info['tier']
        }

    def enrich_stream(self, input_stream):
        """
        Enrich entire stream
        """
        return input_stream.map_async(self.enrich_event)

Pattern 2: Filtering

type StreamFilter struct {
    rules []FilterRule
}

type FilterRule interface {
    Matches(event Event) bool
    Name() string
}

// Allow only specific event types
type EventTypeFilter struct {
    allowedTypes map[string]bool
}

func (f *EventTypeFilter) Matches(event Event) bool {
    return f.allowedTypes[event.Type]
}

// Filter by user segment
type UserSegmentFilter struct {
    allowedSegments map[string]bool
    userService     *UserService
}

func (f *UserSegmentFilter) Matches(event Event) bool {
    user, err := f.userService.GetUser(event.UserID)
    if err != nil {
        return false
    }
    return f.allowedSegments[user.Segment]
}

// Apply filters to stream
func (sf *StreamFilter) Filter(input <-chan Event) <-chan Event {
    output := make(chan Event, 1000)

    go func() {
        defer close(output)

        for event := range input {
            // Apply all filter rules
            pass := true
            for _, rule := range sf.rules {
                if !rule.Matches(event) {
                    metrics.Counter("events.filtered",
                        "rule", rule.Name()).Inc()
                    pass = false
                    break
                }
            }

            if pass {
                output <- event
            }
        }
    }()

    return output
}

Pattern 3: Fan-Out

class StreamFanOut:
    """
    Send events to multiple downstream processors
    """

    def __init__(self, processors: list):
        self.processors = processors

    async def fan_out(self, event):
        """
        Send to all processors concurrently
        """
        tasks = [
            processor.process(event)
            for processor in self.processors
        ]

        # Wait for all processors
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Check for errors
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.error(
                    f"Processor {self.processors[i].name} failed: {result}"
                )
                self.metrics.counter(
                    'fanout.errors',
                    tags={'processor': self.processors[i].name}
                ).inc()

        return results

Monitoring Streaming Pipelines

class StreamingMetrics:
    """
    Comprehensive metrics for streaming pipelines
    """

    def __init__(self):
        self.metrics = MetricsCollector()

    def record_event_processed(self, stage: str, duration_ms: float):
        """Record successful processing"""
        self.metrics.histogram(
            'stream.processing.duration',
            duration_ms,
            tags={'stage': stage}
        )
        self.metrics.counter(
            'stream.events.processed',
            tags={'stage': stage}
        ).inc()

    def record_event_failed(self, stage: str, error_type: str):
        """Record processing failure"""
        self.metrics.counter(
            'stream.events.failed',
            tags={'stage': stage, 'error_type': error_type}
        ).inc()

    def record_watermark(self, stage: str, watermark: datetime):
        """Track watermark progress"""
        lag_seconds = (datetime.now() - watermark).total_seconds()
        self.metrics.gauge(
            'stream.watermark.lag',
            lag_seconds,
            tags={'stage': stage}
        )

    def record_state_size(self, store_name: str, size_bytes: int):
        """Track state store size"""
        self.metrics.gauge(
            'stream.state.size',
            size_bytes,
            tags={'store': store_name}
        )

    def check_pipeline_health(self) -> bool:
        """Overall pipeline health check"""
        checks = {
            'processing_rate': self.check_processing_rate(),
            'error_rate': self.check_error_rate(),
            'watermark_lag': self.check_watermark_lag(),
            'state_size': self.check_state_size()
        }

        all_healthy = all(checks.values())

        if not all_healthy:
            failed = [k for k, v in checks.items() if not v]
            logger.warning(f"Pipeline health check failed: {failed}")

        return all_healthy

    def check_processing_rate(self) -> bool:
        """Ensure we're processing enough events"""
        rate = self.metrics.get_rate('stream.events.processed', window_seconds=60)
        return rate > 100  # At least 100 events/second

    def check_error_rate(self) -> bool:
        """Error rate should be low"""
        total = self.metrics.get_count('stream.events.processed')
        errors = self.metrics.get_count('stream.events.failed')

        if total == 0:
            return True

        error_rate = errors / total
        return error_rate < 0.01  # Less than 1%

    def check_watermark_lag(self) -> bool:
        """Watermark shouldn't fall too far behind"""
        lag = self.metrics.get_gauge('stream.watermark.lag')
        return lag < 300  # Less than 5 minutes

Key Takeaways

  1. Migrate incrementally: Dual write β†’ dual read β†’ cutover
  2. Handle late data: Use watermarks and allowed lateness
  3. State management is critical: Choose appropriate storage
  4. Window wisely: Tumbling for simple aggregations, session for user behavior
  5. Enrich carefully: Cache external lookups
  6. Monitor comprehensively: Processing rate, errors, lag, state size
  7. Plan for failures: Streaming systems must handle partial failures gracefully

Real-time streaming enables new capabilities but requires different thinking than batch. Start with a single pipeline, learn the patterns, then scale to dozens of streams.