Apache Spark
Spark architecture, RDD to DataFrame evolution, transformations vs actions, Spark SQL, join strategies, performance tuning, and PySpark patterns.
Apache Spark
Apache Spark is the dominant engine for large-scale distributed data processing. Whether you are processing terabytes of log data or building feature pipelines for ML, understanding Spark's internals is essential for writing efficient jobs.
Architecture
Spark runs as a distributed system with a driver-executor model:
┌─────────────────────────────────────────────────┐
│ Driver Program │
│ ┌─────────────┐ ┌──────────┐ ┌────────────┐ │
│ │ SparkContext │ │ DAG │ │ Task │ │
│ │ │ │ Scheduler│ │ Scheduler │ │
│ └─────────────┘ └──────────┘ └────────────┘ │
└───────────────────────┬─────────────────────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Executor │ │ Executor │ │ Executor │
│ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │
│ │Task 1│ │ │ │Task 3│ │ │ │Task 5│ │
│ │Task 2│ │ │ │Task 4│ │ │ │Task 6│ │
│ └──────┘ │ │ └──────┘ │ │ └──────┘ │
│ Cache │ │ Cache │ │ Cache │
└──────────┘ └──────────┘ └──────────┘Key Components
- Driver: The process running your main program. Creates SparkContext, builds the execution plan, and distributes tasks.
- Executors: Worker processes that run tasks and cache data. Each executor runs on a separate JVM.
- Partitions: The unit of parallelism. Data is split into partitions, and each task processes one partition.
- Cluster Manager: Allocates resources (YARN, Kubernetes, Mesos, or Standalone).
Evolution: RDD to DataFrame to Dataset
RDD (Resilient Distributed Dataset)
The original Spark abstraction. Low-level, type-safe, but no query optimization.
# RDD example - avoid in modern Spark
rdd = sc.textFile("hdfs:///logs/access.log")
errors = (
rdd
.filter(lambda line: "ERROR" in line)
.map(lambda line: line.split("\t"))
.map(lambda parts: (parts[0], parts[3])) # (timestamp, message)
)
error_count = errors.count()DataFrame (Spark 2.0+)
Schema-aware, optimized by Catalyst query optimizer. The recommended API for most workloads.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("DataEngineering") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read data with schema inference
orders = spark.read.parquet("s3://data-lake/raw/orders/")
# DataFrame transformations (optimized by Catalyst)
daily_revenue = (
orders
.filter(F.col("status").isin("completed", "shipped"))
.withColumn("order_date", F.to_date("created_at"))
.groupBy("order_date", "product_category")
.agg(
F.sum("total_amount").alias("revenue"),
F.countDistinct("order_id").alias("order_count"),
F.avg("total_amount").alias("avg_order_value"),
)
.orderBy("order_date")
)
daily_revenue.show(5)Dataset (Scala/Java only)
Combines the type safety of RDDs with the optimization of DataFrames. Not available in PySpark since Python is dynamically typed.
API Evolution:
RDD (Spark 1.x) → DataFrame (Spark 2.x) → DataFrame + Catalyst + AQE (Spark 3.x)
Low-level Optimized Auto-optimized
No optimizer Catalyst optimizer Adaptive Query Execution
Manual tuning Schema-aware Runtime optimizationTransformations vs Actions
Understanding this distinction is critical for Spark performance.
Transformations (Lazy)
Transformations define a computation but do not execute it. Spark builds a DAG of transformations.
# None of these lines trigger computation
filtered = orders.filter(F.col("amount") > 100) # Narrow
projected = filtered.select("order_id", "amount") # Narrow
grouped = projected.groupBy("customer_id").sum("amount") # Wide (shuffle)Narrow transformations: Each output partition depends on one input partition (filter, select, map). No data shuffle.
Wide transformations: Each output partition depends on multiple input partitions (groupBy, join, repartition). Requires shuffle.
Actions (Eager)
Actions trigger execution of the entire DAG.
# These trigger actual computation
grouped.count() # Returns a number
grouped.show(10) # Prints to console
grouped.write.parquet("output/") # Writes to storage
grouped.collect() # Returns data to driver (careful with large data!)Spark SQL
Spark SQL lets you query DataFrames using SQL syntax. It shares the same Catalyst optimizer.
# Register DataFrame as a temp view
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")
# Query with SQL
result = spark.sql("""
WITH customer_orders AS (
SELECT
c.customer_id,
c.name,
c.segment,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS lifetime_value,
MIN(o.created_at) AS first_order,
MAX(o.created_at) AS last_order
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE o.status = 'completed'
GROUP BY c.customer_id, c.name, c.segment
)
SELECT
segment,
COUNT(*) AS customer_count,
AVG(lifetime_value) AS avg_ltv,
AVG(order_count) AS avg_orders
FROM customer_orders
GROUP BY segment
ORDER BY avg_ltv DESC
""")
result.show()Joins and Shuffle Strategies
Joins are often the most expensive operation in Spark. Choosing the right strategy matters.
Join Types and Their Cost
# Sort-Merge Join (default for large-large joins)
# Both sides are sorted and shuffled by join key
large_orders.join(large_customers, "customer_id", "inner")
# Broadcast Join (small table broadcast to all executors)
# Avoids shuffle entirely - much faster when one side is small
from pyspark.sql.functions import broadcast
large_orders.join(broadcast(small_dim_table), "product_id", "left")
# Shuffle Hash Join
# Good when one side is much smaller (but too large to broadcast)
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")Join Strategy Comparison
| Strategy | When Used | Shuffle | Memory | Best For |
|---|---|---|---|---|
| Broadcast Hash | One side < 10MB (default) | None | Must fit in memory | Dimension lookups |
| Sort-Merge | Both sides large | Both sides | Moderate | Large-large joins |
| Shuffle Hash | One side moderately smaller | Both sides | One side in memory | Skewed data |
| Bucket Join | Pre-bucketed tables | None | Low | Repeated joins on same key |
Handling Skewed Joins
# Problem: some join keys have millions of rows (skew)
# Solution 1: Salt the join key
from pyspark.sql.functions import lit, rand, explode, array
salt_factor = 10
# Salt the large side
skewed_df = large_table.withColumn(
"salt", (rand() * salt_factor).cast("int")
)
skewed_df = skewed_df.withColumn(
"salted_key", F.concat(F.col("join_key"), lit("_"), F.col("salt"))
)
# Replicate the small side
replicated = small_table.crossJoin(
spark.range(salt_factor).withColumnRenamed("id", "salt")
)
replicated = replicated.withColumn(
"salted_key", F.concat(F.col("join_key"), lit("_"), F.col("salt"))
)
# Join on salted key - distributes skewed data evenly
result = skewed_df.join(replicated, "salted_key")Performance Tuning
Partitioning
# Repartition: full shuffle, creates exactly N partitions
# Use when you need a specific number of evenly-sized partitions
df = df.repartition(200, "date")
# Coalesce: reduces partitions WITHOUT a full shuffle
# Use to reduce partitions (never to increase)
df = df.coalesce(50)
# Good rule of thumb: target 128MB per partition
# For 100GB of data: 100GB / 128MB ≈ 800 partitionsCaching
# Cache when a DataFrame is reused multiple times
from pyspark import StorageLevel
# .cache() = MEMORY_AND_DISK
frequently_used_df = expensive_computation.cache()
# Explicit storage level for more control
frequently_used_df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# ALWAYS unpersist when done
frequently_used_df.unpersist()Broadcast Joins
# Increase broadcast threshold for larger dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
# Force broadcast with hint
result = fact_table.join(broadcast(dim_table), "key")Adaptive Query Execution (AQE)
AQE optimizes query plans at runtime based on actual data statistics.
# Enable AQE (default in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE features:
# 1. Coalescing shuffle partitions (reduces small partitions)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# 2. Switching join strategies at runtime
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
# 3. Optimizing skewed joins automatically
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")Memory Configuration
# Executor memory layout
# spark.executor.memory = 8g (JVM heap)
# ├── Execution memory (joins, sorts, aggregations): 60% default
# ├── Storage memory (cached data): 40% default
# └── (unified, can borrow from each other)
# spark.executor.memoryOverhead = 2g (off-heap: Python, native libs)
# Common configuration for a balanced workload
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")
spark.conf.set("spark.executor.cores", "4")
spark.conf.set("spark.sql.shuffle.partitions", "200")PySpark Patterns
Reading and Writing Data
# Read from multiple formats
parquet_df = spark.read.parquet("s3://bucket/data/")
csv_df = spark.read.option("header", True).option("inferSchema", True).csv("data.csv")
json_df = spark.read.json("events/")
delta_df = spark.read.format("delta").load("s3://bucket/delta-table/")
# Write with partitioning
(
daily_revenue
.write
.mode("overwrite")
.partitionBy("order_date")
.parquet("s3://bucket/output/daily_revenue/")
)
# Write to Delta Lake
(
daily_revenue
.write
.format("delta")
.mode("merge")
.option("mergeSchema", "true")
.save("s3://bucket/delta/daily_revenue/")
)Window Functions
from pyspark.sql.window import Window
# Running total and rank within groups
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
orders_with_metrics = orders.withColumns({
"running_total": F.sum("amount").over(window_spec),
"order_number": F.row_number().over(window_spec),
"prev_order_amount": F.lag("amount", 1).over(window_spec),
"days_since_prev": F.datediff(
F.col("order_date"),
F.lag("order_date", 1).over(window_spec)
),
})UDFs (Use Sparingly)
# Pandas UDFs (vectorized) - much faster than row-at-a-time UDFs
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def calculate_discount(amounts: pd.Series, quantities: pd.Series) -> pd.Series:
"""Vectorized discount calculation."""
base_discount = 0.05
volume_discount = (quantities > 10).astype(float) * 0.1
return amounts * (base_discount + volume_discount)
# Apply the Pandas UDF
orders = orders.withColumn(
"discount",
calculate_discount(F.col("amount"), F.col("quantity"))
)Performance Checklist
Before deploying a Spark job to production, verify:
production_checklist = {
"partitioning": "Target 128MB per partition, use partitionBy for output",
"broadcast": "Broadcast tables under 100MB in joins",
"aqe": "Enable Adaptive Query Execution",
"caching": "Cache only reused DataFrames, unpersist after",
"serialization": "Use Kryo serializer for better performance",
"shuffle": "Minimize wide transformations, watch shuffle spill",
"skew": "Handle skewed keys with salting or AQE",
"file_format": "Use Parquet or Delta (columnar, compressed)",
"predicate_push": "Filter early, let Spark push predicates to source",
"avoid_collect": "Never collect() large datasets to the driver",
}Spark rewards those who understand its execution model. Profile your jobs with the Spark UI, watch for shuffle spill, and iterate on partitioning strategies. The difference between a naive and optimized Spark job can be 10-100x in cost and runtime.