FC-Redirect generates massive amounts of flow data: with 12K concurrent flows and statistics collected every second, we’re producing millions of data points daily. Traditional analysis tools can’t handle this scale. Enter Apache Spark, the emerging big data processing framework that’s revolutionizing how we analyze our flow data.

The Data Deluge

Our monitoring infrastructure collects comprehensive flow statistics:

typedef struct flow_statistics {
    uint64_t timestamp;
    flow_id_t flow_id;
    wwpn_t src_wwpn;
    wwpn_t dst_wwpn;
    uint64_t packets;
    uint64_t bytes;
    uint32_t latency_us;
    uint8_t qos_class;
    // ... 20 more fields
} flow_statistics_t;

// Volume of data:
// 12,000 flows Γ— 1 sample/sec Γ— 86,400 sec/day = 1.04 billion samples/day
// At ~200 bytes per sample: 208 GB/day

Per month, we’re collecting over 6TB of flow statistics. We need scalable analysis.

Why Spark?

I evaluated several options:

Traditional SQL Databases:

  • Pros: Familiar query language
  • Cons: Don’t scale to TB datasets, slow for analytics

Hadoop MapReduce:

  • Pros: Scales to petabytes
  • Cons: Slow (disk-based), complex programming model

Apache Spark:

  • Pros: Fast (in-memory), scalable, expressive API
  • Cons: Relatively new (1.0 released mid-2014)

Spark emerged as the clear winner for our use case.

Spark Architecture for Flow Analytics

I set up a Spark cluster for FC-Redirect analytics:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚      Spark Master (Coordinator)     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚              β”‚          β”‚          β”‚
β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β”    β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β” β”Œβ”€β”€β–Όβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β–Όβ”€β”€β”€β”€β”€β”
β”‚Worker 1β”‚    β”‚Worker 2β”‚ β”‚Worker 3β”‚ β”‚Worker 4β”‚
β”‚ 8 coresβ”‚    β”‚ 8 coresβ”‚ β”‚ 8 coresβ”‚ β”‚ 8 coresβ”‚
β”‚ 64GB   β”‚    β”‚ 64GB   β”‚ β”‚ 64GB   β”‚ β”‚ 64GB   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Storage: HDFS cluster (100TB capacity)

This cluster can process our flow data in parallel across 32 cores and 256GB RAM.

Data Ingestion Pipeline

Flow statistics are written to HDFS in Parquet format:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("spark://master:7077", "FC-Redirect Analytics")
sqlContext = SQLContext(sc)

# Define schema
flow_schema = StructType([
    StructField("timestamp", LongType(), False),
    StructField("flow_id", LongType(), False),
    StructField("src_wwpn", StringType(), False),
    StructField("dst_wwpn", StringType(), False),
    StructField("packets", LongType(), False),
    StructField("bytes", LongType(), False),
    StructField("latency_us", IntegerType(), False),
    StructField("qos_class", StringType(), False)
])

# Read from HDFS
flow_data = sqlContext.read.parquet("hdfs://namenode/fc_redirect/flows")

# Register as temp table for SQL queries
flow_data.registerTempTable("flows")

Parquet provides efficient columnar storage and compression (5x better than JSON).

Analytics Use Cases

1. Top Talkers Analysis

Find flows consuming most bandwidth:

# Top 100 flows by bandwidth
top_flows = sqlContext.sql("""
    SELECT
        src_wwpn,
        dst_wwpn,
        SUM(bytes) as total_bytes,
        SUM(packets) as total_packets,
        AVG(latency_us) as avg_latency
    FROM flows
    WHERE timestamp >= unix_timestamp() - 86400  -- Last 24 hours
    GROUP BY src_wwpn, dst_wwpn
    ORDER BY total_bytes DESC
    LIMIT 100
""")

top_flows.show()

# Example output:
# +------------------+------------------+-------------+---------------+------------+
# |src_wwpn          |dst_wwpn          |total_bytes  |total_packets  |avg_latency |
# +------------------+------------------+-------------+---------------+------------+
# |20:00:00:25:b5:...|50:00:09:73:00:...|8589934592000|8589934592     |1850        |
# |20:00:00:25:b5:...|50:00:09:73:00:...|4294967296000|4294967296     |2100        |
# ...

This analysis used to take hours with SQL. Spark completes it in seconds.

2. Latency Distribution Analysis

Understand latency patterns:

from pyspark.sql.functions import *

# Latency percentiles per hour
latency_percentiles = flow_data.groupBy(
    from_unixtime(col("timestamp") / 1000 / 3600).alias("hour")
).agg(
    percentile_approx("latency_us", 0.5).alias("p50"),
    percentile_approx("latency_us", 0.99).alias("p99"),
    percentile_approx("latency_us", 0.999).alias("p999")
)

latency_percentiles.show()

# Visualize with matplotlib
import matplotlib.pyplot as plt

df = latency_percentiles.toPandas()

plt.figure(figsize=(12, 6))
plt.plot(df['hour'], df['p50'], label='P50')
plt.plot(df['hour'], df['p99'], label='P99')
plt.plot(df['hour'], df['p999'], label='P999')
plt.xlabel('Hour')
plt.ylabel('Latency (ΞΌs)')
plt.title('Flow Latency Distribution Over Time')
plt.legend()
plt.savefig('latency_distribution.png')

This reveals latency trends and anomalies that would be invisible in aggregated metrics.

3. Anomaly Detection

Detect unusual flow patterns:

# Machine learning for anomaly detection
from pyspark.mllib.clustering import KMeans
from numpy import array

# Extract features: packets, bytes, latency
features = flow_data.map(lambda row: array([
    row['packets'],
    row['bytes'],
    row['latency_us']
]))

# Normalize features
from pyspark.mllib.feature import StandardScaler
scaler = StandardScaler(withMean=True, withStd=True).fit(features)
features_normalized = scaler.transform(features)

# Train KMeans clustering model
clusters = KMeans.train(features_normalized, k=5, maxIterations=20)

# Predict cluster for each flow
def assign_cluster(row):
    features = array([row['packets'], row['bytes'], row['latency_us']])
    features_norm = scaler.transform(features)
    cluster = clusters.predict(features_norm)
    return (row, cluster)

flow_clusters = flow_data.map(assign_cluster)

# Find flows in anomalous clusters (clusters with few members)
cluster_counts = flow_clusters.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

# Clusters with <1% of flows are anomalous
total_flows = flow_data.count()
anomalous_clusters = cluster_counts.filter(
    lambda x: x[1] < total_flows * 0.01
).map(lambda x: x[0]).collect()

# Get anomalous flows
anomalous_flows = flow_clusters.filter(
    lambda x: x[1] in anomalous_clusters
).map(lambda x: x[0])

anomalous_flows.take(10)

This machine learning approach detects unusual flow behavior automatically.

4. Capacity Planning

Predict future capacity needs:

# Trend analysis for capacity planning
from pyspark.sql.functions import *

# Daily bandwidth growth
daily_bandwidth = flow_data.groupBy(
    from_unixtime(col("timestamp") / 1000 / 86400).alias("day")
).agg(
    sum("bytes").alias("total_bytes")
)

# Linear regression for trend
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD

# Prepare data for regression
training_data = daily_bandwidth.map(lambda row:
    LabeledPoint(row['total_bytes'], [row['day']])
)

# Train model
model = LinearRegressionWithSGD.train(training_data, iterations=100)

# Predict bandwidth for next 90 days
import time
current_day = time.time() / 86400

predictions = []
for i in range(90):
    future_day = current_day + i
    predicted_bytes = model.predict([future_day])
    predictions.append((future_day, predicted_bytes))

# Plot predictions
import pandas as pd

df = pd.DataFrame(predictions, columns=['day', 'predicted_bytes'])
plt.figure(figsize=(12, 6))
plt.plot(df['day'], df['predicted_bytes'])
plt.xlabel('Day')
plt.ylabel('Bandwidth (bytes)')
plt.title('Predicted Bandwidth Growth')
plt.savefig('capacity_forecast.png')

This forecasting helps customers plan infrastructure upgrades.

Real-Time Stream Processing

Spark Streaming enables real-time analytics:

from pyspark.streaming import StreamingContext

# Create streaming context (1 second batches)
ssc = StreamingContext(sc, 1)

# Read flow statistics from Kafka
from pyspark.streaming.kafka import KafkaUtils

flow_stream = KafkaUtils.createStream(
    ssc,
    "zookeeper:2181",
    "fc-redirect-analytics",
    {"flow-stats": 1}
)

# Parse flow statistics
def parse_flow_stat(json_str):
    import json
    return json.loads(json_str)

flows = flow_stream.map(lambda x: parse_flow_stat(x[1]))

# Real-time top flows (sliding window)
def update_top_flows(new_values, running_count):
    if running_count is None:
        running_count = 0
    return sum(new_values, running_count)

flow_bytes = flows.map(lambda f: ((f['src_wwpn'], f['dst_wwpn']), f['bytes']))

running_totals = flow_bytes.updateStateByKey(update_top_flows)

# Print top 10 flows every second
running_totals.transform(
    lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False).take(10)
).pprint()

ssc.start()
ssc.awaitTermination()

Real-time analytics enable immediate response to issues.

Performance Optimizations

Several optimizations improved Spark job performance:

1. Partitioning

Partition data by day for efficient queries:

# Write data partitioned by day
flow_data.write.partitionBy("day").parquet("hdfs://namenode/fc_redirect/flows")

# Queries on specific days only read relevant partitions
todays_flows = sqlContext.read.parquet(
    "hdfs://namenode/fc_redirect/flows/day=2014-07-18"
)

# 100x faster than scanning all data

2. Caching

Cache frequently-accessed datasets:

# Cache flow data in memory
flow_data.cache()

# Subsequent queries are much faster
query1 = flow_data.filter(col("qos_class") == "high").count()
query2 = flow_data.filter(col("latency_us") > 10000).count()

# Both queries use cached data (no disk I/O)

3. Column Pruning

Only read needed columns:

# Instead of reading all columns
all_columns = flow_data.select("*")

# Read only needed columns (much faster)
needed_columns = flow_data.select("src_wwpn", "dst_wwpn", "bytes")

# Parquet's columnar format makes this efficient

Integration with Grafana

Visualize Spark analytics in Grafana dashboards:

# Export aggregated data to PostgreSQL for Grafana
from pyspark.sql import DataFrameWriter

# Hourly aggregated statistics
hourly_stats = sqlContext.sql("""
    SELECT
        from_unixtime(timestamp / 1000 / 3600) as hour,
        COUNT(*) as flow_count,
        SUM(bytes) as total_bytes,
        AVG(latency_us) as avg_latency,
        MAX(latency_us) as max_latency
    FROM flows
    GROUP BY from_unixtime(timestamp / 1000 / 3600)
""")

# Write to PostgreSQL
hourly_stats.write.jdbc(
    url="jdbc:postgresql://grafana-db:5432/metrics",
    table="flow_hourly_stats",
    mode="append",
    properties={
        "user": "grafana",
        "password": "password",
        "driver": "org.postgresql.Driver"
    }
)

Grafana queries PostgreSQL for visualizations, powered by Spark analytics.

Results and Impact

Spark-based analytics delivered significant value:

Performance:

  • Analysis speed: 100x faster than SQL (hours β†’ seconds)
  • Data volume: Analyzing 6TB/month easily
  • Real-time: Sub-second latency for streaming analytics

Insights:

  • Identified top 1% of flows consuming 60% of bandwidth
  • Detected anomalous flow patterns causing performance issues
  • Capacity forecasting accuracy: Β±5% over 3 months

Cost Savings:

  • Reduced analysis infrastructure costs by 70% (vs SQL cluster)
  • Enabled proactive issue detection (preventing outages)
  • Better capacity planning (avoiding over-provisioning)

Lessons Learned

Working with Spark taught me:

1. In-Memory Processing Is Transformative

Spark’s in-memory processing is 10-100x faster than disk-based approaches. Worth the RAM investment.

2. Partitioning Is Critical

Good partitioning strategy (by date, by customer, etc.) makes or breaks performance.

3. Schema Matters

Columnar formats (Parquet, ORC) dramatically outperform row-oriented formats for analytics.

4. Machine Learning Is Accessible

Spark MLlib makes machine learning approachable for systems engineers, not just data scientists.

5. Real-Time + Batch = Powerful Combination

Combining batch analytics (deep historical analysis) with streaming (real-time monitoring) provides comprehensive insights.

Looking Forward

Spark opens new possibilities:

  • Predictive analytics (predict failures before they occur)
  • Customer behavior analysis (understand usage patterns)
  • Automated optimization (use ML to tune parameters)
  • Cross-customer benchmarking (anonymized comparison)

As data volumes grow, Spark’s scalability ensures we can keep up. The framework scales from gigabytes to petabytes without architectural changes.

Big data tools like Spark are no longer just for web companies. Infrastructure teams benefit enormously from scalable analytics. The insights from our flow data drive better products and happier customers.

Data is the new oil, but only if you can refine it. Spark is our refinery.