Data pipeline architecture is one of the most critical decisions in building a modern data platform. The choice between Lambda, Kappa, and Delta architectures impacts everything from system complexity to latency to operational overhead. After implementing all three patterns at scale, I’ll share when to use each and how to implement them effectively.

The Problem: Serving Fresh Data at Scale

Modern applications need both:

  • Real-time data: For dashboards, recommendations, alerting (millisecond to second latency)
  • Historical data: For analytics, ML training, reporting (complete and accurate)

Traditional batch processing (running nightly jobs) doesn’t meet real-time needs. Pure streaming is hard to maintain and debug. The three main architectures solve this differently.

Lambda Architecture

Lambda architecture uses separate batch and streaming pipelines:

Data Sources β†’ Batch Layer  β†’ Batch Views  β†’ Query
            ↓                                  ↑
            β†’ Speed Layer β†’ Real-time Views β†’  ↑

Implementation Example

class LambdaArchitecture:
    """
    Dual pipeline: batch for accuracy, streaming for speed
    """

    def __init__(self):
        self.batch_processor = BatchProcessor()
        self.stream_processor = StreamProcessor()
        self.serving_layer = ServingLayer()

    # Batch Layer: Complete and accurate, runs hourly/daily
    class BatchProcessor:
        def process_batch(self, start_time: datetime, end_time: datetime):
            """
            Process complete dataset for accuracy
            """
            # Read from data warehouse
            events = spark.read.parquet("s3://raw-events/") \
                .filter(col('timestamp').between(start_time, end_time))

            # Complex aggregations possible
            user_stats = events.groupBy('user_id').agg(
                count('*').alias('event_count'),
                countDistinct('session_id').alias('session_count'),
                sum('revenue').alias('total_revenue'),
                avg('engagement_score').alias('avg_engagement'),
                # Expensive calculations acceptable in batch
                percentile_approx('session_duration', 0.5).alias('median_duration')
            )

            # Write to batch view
            user_stats.write \
                .mode('overwrite') \
                .partitionBy('processing_date') \
                .parquet('s3://batch-views/user-stats/')

    # Speed Layer: Real-time, approximate
    class StreamProcessor:
        def __init__(self):
            self.kafka_consumer = KafkaConsumer('events')
            self.cache = Redis()

        def process_stream(self):
            """
            Process events in real-time for low latency
            """
            for message in self.kafka_consumer:
                event = json.loads(message.value)

                # Simple, fast aggregations
                user_id = event['user_id']
                cache_key = f"realtime:user:{user_id}"

                # Increment counters
                self.cache.hincrby(cache_key, 'event_count', 1)
                self.cache.hincrby(cache_key, 'revenue',
                                  event.get('revenue', 0))

                # Set TTL - batch layer will take over
                self.cache.expire(cache_key, 3600 * 24)  # 24 hours

    # Serving Layer: Merge batch and real-time views
    class ServingLayer:
        def get_user_stats(self, user_id: str) -> Dict:
            """
            Merge batch (complete, old) and streaming (incomplete, fresh) data
            """
            # Get batch view (complete up to last batch run)
            batch_stats = self.get_batch_stats(user_id)
            batch_time = batch_stats.get('as_of_time')

            # Get real-time view (events since last batch)
            realtime_stats = self.cache.hgetall(f"realtime:user:{user_id}")

            # Merge
            return {
                'event_count': (batch_stats.get('event_count', 0) +
                              int(realtime_stats.get('event_count', 0))),
                'total_revenue': (batch_stats.get('total_revenue', 0) +
                                int(realtime_stats.get('revenue', 0))),
                'batch_as_of': batch_time,
                'realtime_as_of': datetime.now()
            }

Lambda Architecture Pros and Cons

Pros:

  • Batch layer provides accuracy and supports complex queries
  • Speed layer provides low latency
  • Can reprocess historical data easily

Cons:

  • Two codebases to maintain (batch and streaming)
  • Complex serving layer that merges views
  • Higher operational overhead
  • Data consistency challenges

Use Lambda when:

  • You need both complex analytics and real-time updates
  • You have legacy batch pipelines to maintain
  • Your team has expertise in both batch and stream processing

Kappa Architecture

Kappa architecture uses only streaming, with replayable event logs:

Data Sources β†’ Stream Processing β†’ Materialized Views β†’ Query
                     ↓
              Event Log (Kafka)
                (replayable)

Implementation Example

/**
 * Kappa Architecture: Everything is a stream
 */
public class KappaArchitecture {

    private final StreamsBuilder builder = new StreamsBuilder();

    public void buildPipeline() {
        // Single pipeline for all processing
        KStream<String, Event> events = builder.stream("events");

        // Real-time aggregation
        KTable<String, UserStats> userStats = events
            .groupBy((key, event) -> event.getUserId())
            .aggregate(
                UserStats::new,
                (userId, event, stats) -> {
                    stats.incrementEventCount();
                    stats.addRevenue(event.getRevenue());
                    stats.updateLastSeen(event.getTimestamp());
                    return stats;
                },
                Materialized.<String, UserStats, KeyValueStore<Bytes, byte[]>>as(
                    "user-stats-store"
                )
                .withKeySerde(Serdes.String())
                .withValueSerde(new UserStatsSerde())
            );

        // Materialize to serving database
        userStats.toStream().to("user-stats-changelog",
            Produced.with(Serdes.String(), new UserStatsSerde()));
    }

    // Reprocessing: replay from beginning
    public void reprocessAllData() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,
                 "user-stats-reprocessing");  // New consumer group
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaStreams streams = new KafkaStreams(
            buildTopology(),
            props
        );

        // This will reprocess all data from the beginning
        streams.start();
    }
}

Kappa with Change Data Capture

class KappaWithCDC:
    """
    Use CDC (Change Data Capture) to stream database changes
    """

    def setup_cdc_pipeline(self):
        """
        Debezium captures database changes to Kafka
        """
        # Configure Debezium connector
        connector_config = {
            "name": "postgres-cdc",
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "postgres-db",
            "database.port": "5432",
            "database.user": "debezium",
            "database.dbname": "app_db",
            "database.server.name": "app",
            "table.include.list": "public.users,public.orders",
            # Changes published to Kafka
            "topic.prefix": "cdc"
        }

        # Database changes flow to Kafka topics:
        # cdc.public.users
        # cdc.public.orders

    def process_cdc_stream(self):
        """
        Process database changes as events
        """
        consumer = KafkaConsumer('cdc.public.orders')

        for message in consumer:
            change = json.loads(message.value)

            operation = change['op']  # c=create, u=update, d=delete
            before = change.get('before')
            after = change.get('after')

            if operation == 'c' or operation == 'u':
                # New or updated order
                self.update_analytics(after)
            elif operation == 'd':
                # Deleted order
                self.handle_deletion(before)

    def update_analytics(self, order: Dict):
        """
        Update real-time analytics from order change
        """
        user_id = order['user_id']
        order_total = order['total']

        # Update cached aggregates
        self.redis.hincrby(f"user:{user_id}", 'order_count', 1)
        self.redis.hincrbyfloat(f"user:{user_id}", 'lifetime_value',
                               order_total)

Kappa Architecture Pros and Cons

Pros:

  • Single codebase for all processing
  • Simpler architecture
  • Everything is replayable
  • No serving layer complexity

Cons:

  • All processing must be stream-compatible
  • Reprocessing can be slow (must replay entire log)
  • Limited to operations that work on streams
  • Kafka retention costs for long histories

Use Kappa when:

  • Your processing logic is relatively simple
  • You can express everything as stream transformations
  • You need operational simplicity
  • Your data volume allows retaining full history in Kafka

Delta Architecture

Delta Architecture builds on Kappa but adds a data lake for efficiency:

Data Sources β†’ Stream Processing β†’ Delta Lake β†’ Query
                                       ↓
                                 Batch Processing
                                   (on demand)

Implementation with Delta Lake

from delta import *
from pyspark.sql import SparkSession

class DeltaArchitecture:
    """
    Streaming writes, batch reads, ACID guarantees
    """

    def __init__(self):
        self.spark = SparkSession.builder \
            .config("spark.sql.extensions",
                   "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog",
                   "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()

    def stream_to_delta(self):
        """
        Streaming ingestion with exactly-once semantics
        """
        events = self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "events") \
            .load()

        # Parse JSON
        from pyspark.sql.functions import from_json, col

        events_parsed = events.select(
            from_json(col("value").cast("string"), event_schema).alias("data")
        ).select("data.*")

        # Write to Delta Lake with streaming
        query = events_parsed.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", "/tmp/checkpoints/events") \
            .partitionBy("date") \
            .start("/delta/events")

    def serve_real_time_aggregates(self):
        """
        Real-time aggregation with state
        """
        events = self.spark.readStream \
            .format("delta") \
            .load("/delta/events")

        # Stateful aggregation
        from pyspark.sql.functions import window, sum, count

        user_stats = events \
            .withWatermark("timestamp", "10 minutes") \
            .groupBy(
                col("user_id"),
                window("timestamp", "5 minutes", "1 minute")
            ) \
            .agg(
                count("*").alias("event_count"),
                sum("revenue").alias("total_revenue")
            )

        # Write to serving database
        query = user_stats.writeStream \
            .format("delta") \
            .outputMode("update") \
            .option("checkpointLocation", "/tmp/checkpoints/user-stats") \
            .start("/delta/user-stats")

    def batch_analytics(self):
        """
        Complex batch analytics on the same data
        """
        # Read same Delta table for batch processing
        events = self.spark.read \
            .format("delta") \
            .load("/delta/events")

        # Complex analytics (too expensive for streaming)
        ml_features = events \
            .groupBy("user_id") \
            .agg(
                count("*").alias("total_events"),
                countDistinct("session_id").alias("sessions"),
                sum("revenue").alias("lifetime_value"),
                # Expensive operations
                percentile_approx("session_duration", [0.5, 0.95]).alias("duration_percentiles"),
                collect_list("event_type").alias("event_sequence")
            )

        # Write for ML training
        ml_features.write \
            .format("delta") \
            .mode("overwrite") \
            .save("/delta/ml-features")

    def time_travel_query(self):
        """
        Query historical versions
        """
        # Read as of specific version
        df_v1 = self.spark.read \
            .format("delta") \
            .option("versionAsOf", 0) \
            .load("/delta/events")

        # Read as of timestamp
        df_yesterday = self.spark.read \
            .format("delta") \
            .option("timestampAsOf", "2021-06-16") \
            .load("/delta/events")

    def optimize_storage(self):
        """
        Delta Lake optimization
        """
        from delta.tables import DeltaTable

        delta_table = DeltaTable.forPath(self.spark, "/delta/events")

        # Compact small files
        delta_table.optimize().executeCompaction()

        # Z-ordering for faster queries
        delta_table.optimize().executeZOrderBy("user_id")

        # Vacuum old versions
        delta_table.vacuum(retentionHours=168)  # 7 days

Delta Architecture Pros and Cons

Pros:

  • Single storage layer (Delta Lake)
  • Both streaming and batch on same data
  • ACID transactions
  • Time travel and versioning
  • Automatic schema evolution
  • Efficient storage with compression

Cons:

  • Requires Delta Lake or similar (Iceberg, Hudi)
  • More complex than pure Kappa
  • Cloud storage costs for large datasets

Use Delta when:

  • You need both real-time and complex batch analytics
  • You want unified storage
  • ACID guarantees are important
  • You’re in a cloud environment (S3, Azure, GCS)

Comparison Table

AspectLambdaKappaDelta
Codebase2 (batch + stream)1 (stream)1 (unified)
ComplexityHighMediumMedium
LatencyLow (speed layer)LowLow
AccuracyHigh (batch layer)MediumHigh
ReprocessingEasy (batch)Slow (replay)Medium (batch on Delta)
Storage CostMediumHigh (Kafka retention)Medium
ACIDNoNoYes
Best ForComplex analytics + real-timeSimple stream processingCloud-native platforms

Hybrid Approach

In practice, you often combine elements:

class HybridDataPlatform:
    """
    Combine patterns based on use case
    """

    def __init__(self):
        # Delta Lake for source of truth
        self.delta_lake = DeltaLake()

        # Kafka for real-time events
        self.kafka = KafkaCluster()

        # Redis for serving layer
        self.cache = Redis()

    def ingest_events(self):
        """
        Real-time ingestion (Kappa pattern)
        """
        # Stream to Delta Lake
        self.kafka.consume('events').write_to_delta('/delta/events')

    def real_time_aggregates(self):
        """
        Real-time aggregation (Kappa pattern)
        """
        # Stream processing -> cache
        for event in self.kafka.consume('events'):
            self.update_cache(event)

    def batch_analytics(self):
        """
        Complex analytics (Lambda batch layer)
        """
        # Batch processing on Delta Lake
        spark.read.delta('/delta/events').run_complex_analytics()

    def ml_features(self):
        """
        Feature engineering (Delta pattern)
        """
        # Both streaming and batch from Delta
        streaming_features = self.delta_lake.read_stream()
        batch_features = self.delta_lake.read_batch()

Choosing Your Architecture

Start with Kappa if:

  • Team is small
  • Processing is simple
  • Real-time is primary use case

Add Lambda if:

  • Need complex batch analytics
  • Have legacy batch systems
  • Team has both skill sets

Choose Delta if:

  • Cloud-native platform
  • Need ACID guarantees
  • Want unified batch/stream
  • Using Spark already

Conclusion

There’s no one-size-fits-all architecture. Consider:

  1. Team skills: What does your team know?
  2. Use cases: Real-time vs. analytics needs
  3. Scale: Data volume and query patterns
  4. Existing infrastructure: What do you already have?
  5. Complexity tolerance: How much operational overhead can you handle?

Most large platforms evolve to hybrid approaches, using the right pattern for each use case. Start simple and add complexity only when needed.