FC-Redirect generates massive amounts of flow data: with 12K concurrent flows and statistics collected every second, weβre producing millions of data points daily. Traditional analysis tools canβt handle this scale. Enter Apache Spark, the emerging big data processing framework thatβs revolutionizing how we analyze our flow data.
The Data Deluge
Our monitoring infrastructure collects comprehensive flow statistics:
typedef struct flow_statistics {
uint64_t timestamp;
flow_id_t flow_id;
wwpn_t src_wwpn;
wwpn_t dst_wwpn;
uint64_t packets;
uint64_t bytes;
uint32_t latency_us;
uint8_t qos_class;
// ... 20 more fields
} flow_statistics_t;
// Volume of data:
// 12,000 flows Γ 1 sample/sec Γ 86,400 sec/day = 1.04 billion samples/day
// At ~200 bytes per sample: 208 GB/day
Per month, weβre collecting over 6TB of flow statistics. We need scalable analysis.
Why Spark?
I evaluated several options:
Traditional SQL Databases:
- Pros: Familiar query language
- Cons: Donβt scale to TB datasets, slow for analytics
Hadoop MapReduce:
- Pros: Scales to petabytes
- Cons: Slow (disk-based), complex programming model
Apache Spark:
- Pros: Fast (in-memory), scalable, expressive API
- Cons: Relatively new (1.0 released mid-2014)
Spark emerged as the clear winner for our use case.
Spark Architecture for Flow Analytics
I set up a Spark cluster for FC-Redirect analytics:
βββββββββββββββββββββββββββββββββββββββ
β Spark Master (Coordinator) β
βββββββββββββ¬ββββββββββββββββββββββββββ
β
βββββββββ΄βββββββ¬βββββββββββ¬βββββββββββ
β β β β
βββββΌβββββ βββββΌβββββ ββββΌββββββ ββββΌββββββ
βWorker 1β βWorker 2β βWorker 3β βWorker 4β
β 8 coresβ β 8 coresβ β 8 coresβ β 8 coresβ
β 64GB β β 64GB β β 64GB β β 64GB β
ββββββββββ ββββββββββ ββββββββββ ββββββββββ
Storage: HDFS cluster (100TB capacity)
This cluster can process our flow data in parallel across 32 cores and 256GB RAM.
Data Ingestion Pipeline
Flow statistics are written to HDFS in Parquet format:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext("spark://master:7077", "FC-Redirect Analytics")
sqlContext = SQLContext(sc)
# Define schema
flow_schema = StructType([
StructField("timestamp", LongType(), False),
StructField("flow_id", LongType(), False),
StructField("src_wwpn", StringType(), False),
StructField("dst_wwpn", StringType(), False),
StructField("packets", LongType(), False),
StructField("bytes", LongType(), False),
StructField("latency_us", IntegerType(), False),
StructField("qos_class", StringType(), False)
])
# Read from HDFS
flow_data = sqlContext.read.parquet("hdfs://namenode/fc_redirect/flows")
# Register as temp table for SQL queries
flow_data.registerTempTable("flows")
Parquet provides efficient columnar storage and compression (5x better than JSON).
Analytics Use Cases
1. Top Talkers Analysis
Find flows consuming most bandwidth:
# Top 100 flows by bandwidth
top_flows = sqlContext.sql("""
SELECT
src_wwpn,
dst_wwpn,
SUM(bytes) as total_bytes,
SUM(packets) as total_packets,
AVG(latency_us) as avg_latency
FROM flows
WHERE timestamp >= unix_timestamp() - 86400 -- Last 24 hours
GROUP BY src_wwpn, dst_wwpn
ORDER BY total_bytes DESC
LIMIT 100
""")
top_flows.show()
# Example output:
# +------------------+------------------+-------------+---------------+------------+
# |src_wwpn |dst_wwpn |total_bytes |total_packets |avg_latency |
# +------------------+------------------+-------------+---------------+------------+
# |20:00:00:25:b5:...|50:00:09:73:00:...|8589934592000|8589934592 |1850 |
# |20:00:00:25:b5:...|50:00:09:73:00:...|4294967296000|4294967296 |2100 |
# ...
This analysis used to take hours with SQL. Spark completes it in seconds.
2. Latency Distribution Analysis
Understand latency patterns:
from pyspark.sql.functions import *
# Latency percentiles per hour
latency_percentiles = flow_data.groupBy(
from_unixtime(col("timestamp") / 1000 / 3600).alias("hour")
).agg(
percentile_approx("latency_us", 0.5).alias("p50"),
percentile_approx("latency_us", 0.99).alias("p99"),
percentile_approx("latency_us", 0.999).alias("p999")
)
latency_percentiles.show()
# Visualize with matplotlib
import matplotlib.pyplot as plt
df = latency_percentiles.toPandas()
plt.figure(figsize=(12, 6))
plt.plot(df['hour'], df['p50'], label='P50')
plt.plot(df['hour'], df['p99'], label='P99')
plt.plot(df['hour'], df['p999'], label='P999')
plt.xlabel('Hour')
plt.ylabel('Latency (ΞΌs)')
plt.title('Flow Latency Distribution Over Time')
plt.legend()
plt.savefig('latency_distribution.png')
This reveals latency trends and anomalies that would be invisible in aggregated metrics.
3. Anomaly Detection
Detect unusual flow patterns:
# Machine learning for anomaly detection
from pyspark.mllib.clustering import KMeans
from numpy import array
# Extract features: packets, bytes, latency
features = flow_data.map(lambda row: array([
row['packets'],
row['bytes'],
row['latency_us']
]))
# Normalize features
from pyspark.mllib.feature import StandardScaler
scaler = StandardScaler(withMean=True, withStd=True).fit(features)
features_normalized = scaler.transform(features)
# Train KMeans clustering model
clusters = KMeans.train(features_normalized, k=5, maxIterations=20)
# Predict cluster for each flow
def assign_cluster(row):
features = array([row['packets'], row['bytes'], row['latency_us']])
features_norm = scaler.transform(features)
cluster = clusters.predict(features_norm)
return (row, cluster)
flow_clusters = flow_data.map(assign_cluster)
# Find flows in anomalous clusters (clusters with few members)
cluster_counts = flow_clusters.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)
# Clusters with <1% of flows are anomalous
total_flows = flow_data.count()
anomalous_clusters = cluster_counts.filter(
lambda x: x[1] < total_flows * 0.01
).map(lambda x: x[0]).collect()
# Get anomalous flows
anomalous_flows = flow_clusters.filter(
lambda x: x[1] in anomalous_clusters
).map(lambda x: x[0])
anomalous_flows.take(10)
This machine learning approach detects unusual flow behavior automatically.
4. Capacity Planning
Predict future capacity needs:
# Trend analysis for capacity planning
from pyspark.sql.functions import *
# Daily bandwidth growth
daily_bandwidth = flow_data.groupBy(
from_unixtime(col("timestamp") / 1000 / 86400).alias("day")
).agg(
sum("bytes").alias("total_bytes")
)
# Linear regression for trend
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
# Prepare data for regression
training_data = daily_bandwidth.map(lambda row:
LabeledPoint(row['total_bytes'], [row['day']])
)
# Train model
model = LinearRegressionWithSGD.train(training_data, iterations=100)
# Predict bandwidth for next 90 days
import time
current_day = time.time() / 86400
predictions = []
for i in range(90):
future_day = current_day + i
predicted_bytes = model.predict([future_day])
predictions.append((future_day, predicted_bytes))
# Plot predictions
import pandas as pd
df = pd.DataFrame(predictions, columns=['day', 'predicted_bytes'])
plt.figure(figsize=(12, 6))
plt.plot(df['day'], df['predicted_bytes'])
plt.xlabel('Day')
plt.ylabel('Bandwidth (bytes)')
plt.title('Predicted Bandwidth Growth')
plt.savefig('capacity_forecast.png')
This forecasting helps customers plan infrastructure upgrades.
Real-Time Stream Processing
Spark Streaming enables real-time analytics:
from pyspark.streaming import StreamingContext
# Create streaming context (1 second batches)
ssc = StreamingContext(sc, 1)
# Read flow statistics from Kafka
from pyspark.streaming.kafka import KafkaUtils
flow_stream = KafkaUtils.createStream(
ssc,
"zookeeper:2181",
"fc-redirect-analytics",
{"flow-stats": 1}
)
# Parse flow statistics
def parse_flow_stat(json_str):
import json
return json.loads(json_str)
flows = flow_stream.map(lambda x: parse_flow_stat(x[1]))
# Real-time top flows (sliding window)
def update_top_flows(new_values, running_count):
if running_count is None:
running_count = 0
return sum(new_values, running_count)
flow_bytes = flows.map(lambda f: ((f['src_wwpn'], f['dst_wwpn']), f['bytes']))
running_totals = flow_bytes.updateStateByKey(update_top_flows)
# Print top 10 flows every second
running_totals.transform(
lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False).take(10)
).pprint()
ssc.start()
ssc.awaitTermination()
Real-time analytics enable immediate response to issues.
Performance Optimizations
Several optimizations improved Spark job performance:
1. Partitioning
Partition data by day for efficient queries:
# Write data partitioned by day
flow_data.write.partitionBy("day").parquet("hdfs://namenode/fc_redirect/flows")
# Queries on specific days only read relevant partitions
todays_flows = sqlContext.read.parquet(
"hdfs://namenode/fc_redirect/flows/day=2014-07-18"
)
# 100x faster than scanning all data
2. Caching
Cache frequently-accessed datasets:
# Cache flow data in memory
flow_data.cache()
# Subsequent queries are much faster
query1 = flow_data.filter(col("qos_class") == "high").count()
query2 = flow_data.filter(col("latency_us") > 10000).count()
# Both queries use cached data (no disk I/O)
3. Column Pruning
Only read needed columns:
# Instead of reading all columns
all_columns = flow_data.select("*")
# Read only needed columns (much faster)
needed_columns = flow_data.select("src_wwpn", "dst_wwpn", "bytes")
# Parquet's columnar format makes this efficient
Integration with Grafana
Visualize Spark analytics in Grafana dashboards:
# Export aggregated data to PostgreSQL for Grafana
from pyspark.sql import DataFrameWriter
# Hourly aggregated statistics
hourly_stats = sqlContext.sql("""
SELECT
from_unixtime(timestamp / 1000 / 3600) as hour,
COUNT(*) as flow_count,
SUM(bytes) as total_bytes,
AVG(latency_us) as avg_latency,
MAX(latency_us) as max_latency
FROM flows
GROUP BY from_unixtime(timestamp / 1000 / 3600)
""")
# Write to PostgreSQL
hourly_stats.write.jdbc(
url="jdbc:postgresql://grafana-db:5432/metrics",
table="flow_hourly_stats",
mode="append",
properties={
"user": "grafana",
"password": "password",
"driver": "org.postgresql.Driver"
}
)
Grafana queries PostgreSQL for visualizations, powered by Spark analytics.
Results and Impact
Spark-based analytics delivered significant value:
Performance:
- Analysis speed: 100x faster than SQL (hours β seconds)
- Data volume: Analyzing 6TB/month easily
- Real-time: Sub-second latency for streaming analytics
Insights:
- Identified top 1% of flows consuming 60% of bandwidth
- Detected anomalous flow patterns causing performance issues
- Capacity forecasting accuracy: Β±5% over 3 months
Cost Savings:
- Reduced analysis infrastructure costs by 70% (vs SQL cluster)
- Enabled proactive issue detection (preventing outages)
- Better capacity planning (avoiding over-provisioning)
Lessons Learned
Working with Spark taught me:
1. In-Memory Processing Is Transformative
Sparkβs in-memory processing is 10-100x faster than disk-based approaches. Worth the RAM investment.
2. Partitioning Is Critical
Good partitioning strategy (by date, by customer, etc.) makes or breaks performance.
3. Schema Matters
Columnar formats (Parquet, ORC) dramatically outperform row-oriented formats for analytics.
4. Machine Learning Is Accessible
Spark MLlib makes machine learning approachable for systems engineers, not just data scientists.
5. Real-Time + Batch = Powerful Combination
Combining batch analytics (deep historical analysis) with streaming (real-time monitoring) provides comprehensive insights.
Looking Forward
Spark opens new possibilities:
- Predictive analytics (predict failures before they occur)
- Customer behavior analysis (understand usage patterns)
- Automated optimization (use ML to tune parameters)
- Cross-customer benchmarking (anonymized comparison)
As data volumes grow, Sparkβs scalability ensures we can keep up. The framework scales from gigabytes to petabytes without architectural changes.
Big data tools like Spark are no longer just for web companies. Infrastructure teams benefit enormously from scalable analytics. The insights from our flow data drive better products and happier customers.
Data is the new oil, but only if you can refine it. Spark is our refinery.