Steven's Knowledge

Apache Spark

Spark architecture, RDD to DataFrame evolution, transformations vs actions, Spark SQL, join strategies, performance tuning, and PySpark patterns.

Apache Spark

Apache Spark is the dominant engine for large-scale distributed data processing. Whether you are processing terabytes of log data or building feature pipelines for ML, understanding Spark's internals is essential for writing efficient jobs.

Architecture

Spark runs as a distributed system with a driver-executor model:

┌─────────────────────────────────────────────────┐
│                  Driver Program                  │
│  ┌─────────────┐  ┌──────────┐  ┌────────────┐  │
│  │ SparkContext │  │ DAG      │  │ Task       │  │
│  │             │  │ Scheduler│  │ Scheduler  │  │
│  └─────────────┘  └──────────┘  └────────────┘  │
└───────────────────────┬─────────────────────────┘

          ┌─────────────┼─────────────┐
          ▼             ▼             ▼
   ┌──────────┐  ┌──────────┐  ┌──────────┐
   │ Executor │  │ Executor │  │ Executor │
   │ ┌──────┐ │  │ ┌──────┐ │  │ ┌──────┐ │
   │ │Task 1│ │  │ │Task 3│ │  │ │Task 5│ │
   │ │Task 2│ │  │ │Task 4│ │  │ │Task 6│ │
   │ └──────┘ │  │ └──────┘ │  │ └──────┘ │
   │  Cache   │  │  Cache   │  │  Cache   │
   └──────────┘  └──────────┘  └──────────┘

Key Components

  • Driver: The process running your main program. Creates SparkContext, builds the execution plan, and distributes tasks.
  • Executors: Worker processes that run tasks and cache data. Each executor runs on a separate JVM.
  • Partitions: The unit of parallelism. Data is split into partitions, and each task processes one partition.
  • Cluster Manager: Allocates resources (YARN, Kubernetes, Mesos, or Standalone).

Evolution: RDD to DataFrame to Dataset

RDD (Resilient Distributed Dataset)

The original Spark abstraction. Low-level, type-safe, but no query optimization.

# RDD example - avoid in modern Spark
rdd = sc.textFile("hdfs:///logs/access.log")
errors = (
    rdd
    .filter(lambda line: "ERROR" in line)
    .map(lambda line: line.split("\t"))
    .map(lambda parts: (parts[0], parts[3]))  # (timestamp, message)
)
error_count = errors.count()

DataFrame (Spark 2.0+)

Schema-aware, optimized by Catalyst query optimizer. The recommended API for most workloads.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("DataEngineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Read data with schema inference
orders = spark.read.parquet("s3://data-lake/raw/orders/")

# DataFrame transformations (optimized by Catalyst)
daily_revenue = (
    orders
    .filter(F.col("status").isin("completed", "shipped"))
    .withColumn("order_date", F.to_date("created_at"))
    .groupBy("order_date", "product_category")
    .agg(
        F.sum("total_amount").alias("revenue"),
        F.countDistinct("order_id").alias("order_count"),
        F.avg("total_amount").alias("avg_order_value"),
    )
    .orderBy("order_date")
)

daily_revenue.show(5)

Dataset (Scala/Java only)

Combines the type safety of RDDs with the optimization of DataFrames. Not available in PySpark since Python is dynamically typed.

API Evolution:
RDD (Spark 1.x)  →  DataFrame (Spark 2.x)  →  DataFrame + Catalyst + AQE (Spark 3.x)
  Low-level            Optimized                  Auto-optimized
  No optimizer         Catalyst optimizer          Adaptive Query Execution
  Manual tuning        Schema-aware                Runtime optimization

Transformations vs Actions

Understanding this distinction is critical for Spark performance.

Transformations (Lazy)

Transformations define a computation but do not execute it. Spark builds a DAG of transformations.

# None of these lines trigger computation
filtered = orders.filter(F.col("amount") > 100)          # Narrow
projected = filtered.select("order_id", "amount")         # Narrow
grouped = projected.groupBy("customer_id").sum("amount")  # Wide (shuffle)

Narrow transformations: Each output partition depends on one input partition (filter, select, map). No data shuffle.

Wide transformations: Each output partition depends on multiple input partitions (groupBy, join, repartition). Requires shuffle.

Actions (Eager)

Actions trigger execution of the entire DAG.

# These trigger actual computation
grouped.count()                    # Returns a number
grouped.show(10)                   # Prints to console
grouped.write.parquet("output/")   # Writes to storage
grouped.collect()                  # Returns data to driver (careful with large data!)

Spark SQL

Spark SQL lets you query DataFrames using SQL syntax. It shares the same Catalyst optimizer.

# Register DataFrame as a temp view
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")

# Query with SQL
result = spark.sql("""
    WITH customer_orders AS (
        SELECT
            c.customer_id,
            c.name,
            c.segment,
            COUNT(o.order_id) AS order_count,
            SUM(o.total_amount) AS lifetime_value,
            MIN(o.created_at) AS first_order,
            MAX(o.created_at) AS last_order
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        WHERE o.status = 'completed'
        GROUP BY c.customer_id, c.name, c.segment
    )
    SELECT
        segment,
        COUNT(*) AS customer_count,
        AVG(lifetime_value) AS avg_ltv,
        AVG(order_count) AS avg_orders
    FROM customer_orders
    GROUP BY segment
    ORDER BY avg_ltv DESC
""")

result.show()

Joins and Shuffle Strategies

Joins are often the most expensive operation in Spark. Choosing the right strategy matters.

Join Types and Their Cost

# Sort-Merge Join (default for large-large joins)
# Both sides are sorted and shuffled by join key
large_orders.join(large_customers, "customer_id", "inner")

# Broadcast Join (small table broadcast to all executors)
# Avoids shuffle entirely - much faster when one side is small
from pyspark.sql.functions import broadcast
large_orders.join(broadcast(small_dim_table), "product_id", "left")

# Shuffle Hash Join
# Good when one side is much smaller (but too large to broadcast)
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

Join Strategy Comparison

StrategyWhen UsedShuffleMemoryBest For
Broadcast HashOne side < 10MB (default)NoneMust fit in memoryDimension lookups
Sort-MergeBoth sides largeBoth sidesModerateLarge-large joins
Shuffle HashOne side moderately smallerBoth sidesOne side in memorySkewed data
Bucket JoinPre-bucketed tablesNoneLowRepeated joins on same key

Handling Skewed Joins

# Problem: some join keys have millions of rows (skew)
# Solution 1: Salt the join key
from pyspark.sql.functions import lit, rand, explode, array

salt_factor = 10

# Salt the large side
skewed_df = large_table.withColumn(
    "salt", (rand() * salt_factor).cast("int")
)
skewed_df = skewed_df.withColumn(
    "salted_key", F.concat(F.col("join_key"), lit("_"), F.col("salt"))
)

# Replicate the small side
replicated = small_table.crossJoin(
    spark.range(salt_factor).withColumnRenamed("id", "salt")
)
replicated = replicated.withColumn(
    "salted_key", F.concat(F.col("join_key"), lit("_"), F.col("salt"))
)

# Join on salted key - distributes skewed data evenly
result = skewed_df.join(replicated, "salted_key")

Performance Tuning

Partitioning

# Repartition: full shuffle, creates exactly N partitions
# Use when you need a specific number of evenly-sized partitions
df = df.repartition(200, "date")

# Coalesce: reduces partitions WITHOUT a full shuffle
# Use to reduce partitions (never to increase)
df = df.coalesce(50)

# Good rule of thumb: target 128MB per partition
# For 100GB of data: 100GB / 128MB ≈ 800 partitions

Caching

# Cache when a DataFrame is reused multiple times
from pyspark import StorageLevel

# .cache() = MEMORY_AND_DISK
frequently_used_df = expensive_computation.cache()

# Explicit storage level for more control
frequently_used_df.persist(StorageLevel.MEMORY_AND_DISK_SER)

# ALWAYS unpersist when done
frequently_used_df.unpersist()

Broadcast Joins

# Increase broadcast threshold for larger dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100MB

# Force broadcast with hint
result = fact_table.join(broadcast(dim_table), "key")

Adaptive Query Execution (AQE)

AQE optimizes query plans at runtime based on actual data statistics.

# Enable AQE (default in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# AQE features:
# 1. Coalescing shuffle partitions (reduces small partitions)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# 2. Switching join strategies at runtime
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# 3. Optimizing skewed joins automatically
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

Memory Configuration

# Executor memory layout
# spark.executor.memory = 8g (JVM heap)
#   ├── Execution memory (joins, sorts, aggregations): 60% default
#   ├── Storage memory (cached data): 40% default
#   └── (unified, can borrow from each other)
# spark.executor.memoryOverhead = 2g (off-heap: Python, native libs)

# Common configuration for a balanced workload
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "2g")
spark.conf.set("spark.executor.cores", "4")
spark.conf.set("spark.sql.shuffle.partitions", "200")

PySpark Patterns

Reading and Writing Data

# Read from multiple formats
parquet_df = spark.read.parquet("s3://bucket/data/")
csv_df = spark.read.option("header", True).option("inferSchema", True).csv("data.csv")
json_df = spark.read.json("events/")
delta_df = spark.read.format("delta").load("s3://bucket/delta-table/")

# Write with partitioning
(
    daily_revenue
    .write
    .mode("overwrite")
    .partitionBy("order_date")
    .parquet("s3://bucket/output/daily_revenue/")
)

# Write to Delta Lake
(
    daily_revenue
    .write
    .format("delta")
    .mode("merge")
    .option("mergeSchema", "true")
    .save("s3://bucket/delta/daily_revenue/")
)

Window Functions

from pyspark.sql.window import Window

# Running total and rank within groups
window_spec = Window.partitionBy("customer_id").orderBy("order_date")

orders_with_metrics = orders.withColumns({
    "running_total": F.sum("amount").over(window_spec),
    "order_number": F.row_number().over(window_spec),
    "prev_order_amount": F.lag("amount", 1).over(window_spec),
    "days_since_prev": F.datediff(
        F.col("order_date"),
        F.lag("order_date", 1).over(window_spec)
    ),
})

UDFs (Use Sparingly)

# Pandas UDFs (vectorized) - much faster than row-at-a-time UDFs
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def calculate_discount(amounts: pd.Series, quantities: pd.Series) -> pd.Series:
    """Vectorized discount calculation."""
    base_discount = 0.05
    volume_discount = (quantities > 10).astype(float) * 0.1
    return amounts * (base_discount + volume_discount)

# Apply the Pandas UDF
orders = orders.withColumn(
    "discount",
    calculate_discount(F.col("amount"), F.col("quantity"))
)

Performance Checklist

Before deploying a Spark job to production, verify:

production_checklist = {
    "partitioning":    "Target 128MB per partition, use partitionBy for output",
    "broadcast":       "Broadcast tables under 100MB in joins",
    "aqe":             "Enable Adaptive Query Execution",
    "caching":         "Cache only reused DataFrames, unpersist after",
    "serialization":   "Use Kryo serializer for better performance",
    "shuffle":         "Minimize wide transformations, watch shuffle spill",
    "skew":            "Handle skewed keys with salting or AQE",
    "file_format":     "Use Parquet or Delta (columnar, compressed)",
    "predicate_push":  "Filter early, let Spark push predicates to source",
    "avoid_collect":   "Never collect() large datasets to the driver",
}

Spark rewards those who understand its execution model. Profile your jobs with the Spark UI, watch for shuffle spill, and iterate on partitioning strategies. The difference between a naive and optimized Spark job can be 10-100x in cost and runtime.

On this page