Data pipeline architecture is one of the most critical decisions in building a modern data platform. The choice between Lambda, Kappa, and Delta architectures impacts everything from system complexity to latency to operational overhead. After implementing all three patterns at scale, Iβll share when to use each and how to implement them effectively.
The Problem: Serving Fresh Data at Scale
Modern applications need both:
- Real-time data: For dashboards, recommendations, alerting (millisecond to second latency)
- Historical data: For analytics, ML training, reporting (complete and accurate)
Traditional batch processing (running nightly jobs) doesnβt meet real-time needs. Pure streaming is hard to maintain and debug. The three main architectures solve this differently.
Lambda Architecture
Lambda architecture uses separate batch and streaming pipelines:
Data Sources β Batch Layer β Batch Views β Query
β β
β Speed Layer β Real-time Views β β
Implementation Example
class LambdaArchitecture:
"""
Dual pipeline: batch for accuracy, streaming for speed
"""
def __init__(self):
self.batch_processor = BatchProcessor()
self.stream_processor = StreamProcessor()
self.serving_layer = ServingLayer()
# Batch Layer: Complete and accurate, runs hourly/daily
class BatchProcessor:
def process_batch(self, start_time: datetime, end_time: datetime):
"""
Process complete dataset for accuracy
"""
# Read from data warehouse
events = spark.read.parquet("s3://raw-events/") \
.filter(col('timestamp').between(start_time, end_time))
# Complex aggregations possible
user_stats = events.groupBy('user_id').agg(
count('*').alias('event_count'),
countDistinct('session_id').alias('session_count'),
sum('revenue').alias('total_revenue'),
avg('engagement_score').alias('avg_engagement'),
# Expensive calculations acceptable in batch
percentile_approx('session_duration', 0.5).alias('median_duration')
)
# Write to batch view
user_stats.write \
.mode('overwrite') \
.partitionBy('processing_date') \
.parquet('s3://batch-views/user-stats/')
# Speed Layer: Real-time, approximate
class StreamProcessor:
def __init__(self):
self.kafka_consumer = KafkaConsumer('events')
self.cache = Redis()
def process_stream(self):
"""
Process events in real-time for low latency
"""
for message in self.kafka_consumer:
event = json.loads(message.value)
# Simple, fast aggregations
user_id = event['user_id']
cache_key = f"realtime:user:{user_id}"
# Increment counters
self.cache.hincrby(cache_key, 'event_count', 1)
self.cache.hincrby(cache_key, 'revenue',
event.get('revenue', 0))
# Set TTL - batch layer will take over
self.cache.expire(cache_key, 3600 * 24) # 24 hours
# Serving Layer: Merge batch and real-time views
class ServingLayer:
def get_user_stats(self, user_id: str) -> Dict:
"""
Merge batch (complete, old) and streaming (incomplete, fresh) data
"""
# Get batch view (complete up to last batch run)
batch_stats = self.get_batch_stats(user_id)
batch_time = batch_stats.get('as_of_time')
# Get real-time view (events since last batch)
realtime_stats = self.cache.hgetall(f"realtime:user:{user_id}")
# Merge
return {
'event_count': (batch_stats.get('event_count', 0) +
int(realtime_stats.get('event_count', 0))),
'total_revenue': (batch_stats.get('total_revenue', 0) +
int(realtime_stats.get('revenue', 0))),
'batch_as_of': batch_time,
'realtime_as_of': datetime.now()
}
Lambda Architecture Pros and Cons
Pros:
- Batch layer provides accuracy and supports complex queries
- Speed layer provides low latency
- Can reprocess historical data easily
Cons:
- Two codebases to maintain (batch and streaming)
- Complex serving layer that merges views
- Higher operational overhead
- Data consistency challenges
Use Lambda when:
- You need both complex analytics and real-time updates
- You have legacy batch pipelines to maintain
- Your team has expertise in both batch and stream processing
Kappa Architecture
Kappa architecture uses only streaming, with replayable event logs:
Data Sources β Stream Processing β Materialized Views β Query
β
Event Log (Kafka)
(replayable)
Implementation Example
/**
* Kappa Architecture: Everything is a stream
*/
public class KappaArchitecture {
private final StreamsBuilder builder = new StreamsBuilder();
public void buildPipeline() {
// Single pipeline for all processing
KStream<String, Event> events = builder.stream("events");
// Real-time aggregation
KTable<String, UserStats> userStats = events
.groupBy((key, event) -> event.getUserId())
.aggregate(
UserStats::new,
(userId, event, stats) -> {
stats.incrementEventCount();
stats.addRevenue(event.getRevenue());
stats.updateLastSeen(event.getTimestamp());
return stats;
},
Materialized.<String, UserStats, KeyValueStore<Bytes, byte[]>>as(
"user-stats-store"
)
.withKeySerde(Serdes.String())
.withValueSerde(new UserStatsSerde())
);
// Materialize to serving database
userStats.toStream().to("user-stats-changelog",
Produced.with(Serdes.String(), new UserStatsSerde()));
}
// Reprocessing: replay from beginning
public void reprocessAllData() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"user-stats-reprocessing"); // New consumer group
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaStreams streams = new KafkaStreams(
buildTopology(),
props
);
// This will reprocess all data from the beginning
streams.start();
}
}
Kappa with Change Data Capture
class KappaWithCDC:
"""
Use CDC (Change Data Capture) to stream database changes
"""
def setup_cdc_pipeline(self):
"""
Debezium captures database changes to Kafka
"""
# Configure Debezium connector
connector_config = {
"name": "postgres-cdc",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-db",
"database.port": "5432",
"database.user": "debezium",
"database.dbname": "app_db",
"database.server.name": "app",
"table.include.list": "public.users,public.orders",
# Changes published to Kafka
"topic.prefix": "cdc"
}
# Database changes flow to Kafka topics:
# cdc.public.users
# cdc.public.orders
def process_cdc_stream(self):
"""
Process database changes as events
"""
consumer = KafkaConsumer('cdc.public.orders')
for message in consumer:
change = json.loads(message.value)
operation = change['op'] # c=create, u=update, d=delete
before = change.get('before')
after = change.get('after')
if operation == 'c' or operation == 'u':
# New or updated order
self.update_analytics(after)
elif operation == 'd':
# Deleted order
self.handle_deletion(before)
def update_analytics(self, order: Dict):
"""
Update real-time analytics from order change
"""
user_id = order['user_id']
order_total = order['total']
# Update cached aggregates
self.redis.hincrby(f"user:{user_id}", 'order_count', 1)
self.redis.hincrbyfloat(f"user:{user_id}", 'lifetime_value',
order_total)
Kappa Architecture Pros and Cons
Pros:
- Single codebase for all processing
- Simpler architecture
- Everything is replayable
- No serving layer complexity
Cons:
- All processing must be stream-compatible
- Reprocessing can be slow (must replay entire log)
- Limited to operations that work on streams
- Kafka retention costs for long histories
Use Kappa when:
- Your processing logic is relatively simple
- You can express everything as stream transformations
- You need operational simplicity
- Your data volume allows retaining full history in Kafka
Delta Architecture
Delta Architecture builds on Kappa but adds a data lake for efficiency:
Data Sources β Stream Processing β Delta Lake β Query
β
Batch Processing
(on demand)
Implementation with Delta Lake
from delta import *
from pyspark.sql import SparkSession
class DeltaArchitecture:
"""
Streaming writes, batch reads, ACID guarantees
"""
def __init__(self):
self.spark = SparkSession.builder \
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
def stream_to_delta(self):
"""
Streaming ingestion with exactly-once semantics
"""
events = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Parse JSON
from pyspark.sql.functions import from_json, col
events_parsed = events.select(
from_json(col("value").cast("string"), event_schema).alias("data")
).select("data.*")
# Write to Delta Lake with streaming
query = events_parsed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoints/events") \
.partitionBy("date") \
.start("/delta/events")
def serve_real_time_aggregates(self):
"""
Real-time aggregation with state
"""
events = self.spark.readStream \
.format("delta") \
.load("/delta/events")
# Stateful aggregation
from pyspark.sql.functions import window, sum, count
user_stats = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
col("user_id"),
window("timestamp", "5 minutes", "1 minute")
) \
.agg(
count("*").alias("event_count"),
sum("revenue").alias("total_revenue")
)
# Write to serving database
query = user_stats.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpoints/user-stats") \
.start("/delta/user-stats")
def batch_analytics(self):
"""
Complex batch analytics on the same data
"""
# Read same Delta table for batch processing
events = self.spark.read \
.format("delta") \
.load("/delta/events")
# Complex analytics (too expensive for streaming)
ml_features = events \
.groupBy("user_id") \
.agg(
count("*").alias("total_events"),
countDistinct("session_id").alias("sessions"),
sum("revenue").alias("lifetime_value"),
# Expensive operations
percentile_approx("session_duration", [0.5, 0.95]).alias("duration_percentiles"),
collect_list("event_type").alias("event_sequence")
)
# Write for ML training
ml_features.write \
.format("delta") \
.mode("overwrite") \
.save("/delta/ml-features")
def time_travel_query(self):
"""
Query historical versions
"""
# Read as of specific version
df_v1 = self.spark.read \
.format("delta") \
.option("versionAsOf", 0) \
.load("/delta/events")
# Read as of timestamp
df_yesterday = self.spark.read \
.format("delta") \
.option("timestampAsOf", "2021-06-16") \
.load("/delta/events")
def optimize_storage(self):
"""
Delta Lake optimization
"""
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(self.spark, "/delta/events")
# Compact small files
delta_table.optimize().executeCompaction()
# Z-ordering for faster queries
delta_table.optimize().executeZOrderBy("user_id")
# Vacuum old versions
delta_table.vacuum(retentionHours=168) # 7 days
Delta Architecture Pros and Cons
Pros:
- Single storage layer (Delta Lake)
- Both streaming and batch on same data
- ACID transactions
- Time travel and versioning
- Automatic schema evolution
- Efficient storage with compression
Cons:
- Requires Delta Lake or similar (Iceberg, Hudi)
- More complex than pure Kappa
- Cloud storage costs for large datasets
Use Delta when:
- You need both real-time and complex batch analytics
- You want unified storage
- ACID guarantees are important
- Youβre in a cloud environment (S3, Azure, GCS)
Comparison Table
| Aspect | Lambda | Kappa | Delta |
|---|---|---|---|
| Codebase | 2 (batch + stream) | 1 (stream) | 1 (unified) |
| Complexity | High | Medium | Medium |
| Latency | Low (speed layer) | Low | Low |
| Accuracy | High (batch layer) | Medium | High |
| Reprocessing | Easy (batch) | Slow (replay) | Medium (batch on Delta) |
| Storage Cost | Medium | High (Kafka retention) | Medium |
| ACID | No | No | Yes |
| Best For | Complex analytics + real-time | Simple stream processing | Cloud-native platforms |
Hybrid Approach
In practice, you often combine elements:
class HybridDataPlatform:
"""
Combine patterns based on use case
"""
def __init__(self):
# Delta Lake for source of truth
self.delta_lake = DeltaLake()
# Kafka for real-time events
self.kafka = KafkaCluster()
# Redis for serving layer
self.cache = Redis()
def ingest_events(self):
"""
Real-time ingestion (Kappa pattern)
"""
# Stream to Delta Lake
self.kafka.consume('events').write_to_delta('/delta/events')
def real_time_aggregates(self):
"""
Real-time aggregation (Kappa pattern)
"""
# Stream processing -> cache
for event in self.kafka.consume('events'):
self.update_cache(event)
def batch_analytics(self):
"""
Complex analytics (Lambda batch layer)
"""
# Batch processing on Delta Lake
spark.read.delta('/delta/events').run_complex_analytics()
def ml_features(self):
"""
Feature engineering (Delta pattern)
"""
# Both streaming and batch from Delta
streaming_features = self.delta_lake.read_stream()
batch_features = self.delta_lake.read_batch()
Choosing Your Architecture
Start with Kappa if:
- Team is small
- Processing is simple
- Real-time is primary use case
Add Lambda if:
- Need complex batch analytics
- Have legacy batch systems
- Team has both skill sets
Choose Delta if:
- Cloud-native platform
- Need ACID guarantees
- Want unified batch/stream
- Using Spark already
Conclusion
Thereβs no one-size-fits-all architecture. Consider:
- Team skills: What does your team know?
- Use cases: Real-time vs. analytics needs
- Scale: Data volume and query patterns
- Existing infrastructure: What do you already have?
- Complexity tolerance: How much operational overhead can you handle?
Most large platforms evolve to hybrid approaches, using the right pattern for each use case. Start simple and add complexity only when needed.