Steven's Knowledge

Lakehouse Architecture

Data lake vs warehouse vs lakehouse, table formats (Delta Lake, Iceberg, Hudi), medallion architecture, schema evolution, time travel, and ACID on object storage.

Lakehouse Architecture

The lakehouse combines the best of data lakes and data warehouses into a single platform. It brings structure, governance, and performance to the flexibility and cost-efficiency of object storage.

Data Lake vs Warehouse vs Lakehouse

Data Lake

A data lake stores raw data in its native format on cheap object storage (S3, GCS, ADLS). It is schema-on-read, meaning structure is applied when you query, not when you store.

Pros: Cheap storage, any format, any data type
Cons: No ACID, no schema enforcement, "data swamp" risk

Data Warehouse

A data warehouse stores structured, curated data optimized for analytical queries. It is schema-on-write with strong governance.

Pros: ACID transactions, fast queries, governance
Cons: Expensive storage, only structured data, vendor lock-in

Lakehouse

A lakehouse adds a transactional metadata layer on top of object storage, giving you warehouse-like features at data lake cost.

Pros: ACID on object storage, open formats, unified batch + streaming
Cons: Newer ecosystem, requires careful architecture

Comparison

FeatureData LakeData WarehouseLakehouse
Storage costLow (object storage)High (proprietary)Low (object storage)
ACID transactionsNoYesYes
Schema enforcementNo (schema-on-read)Yes (schema-on-write)Yes (schema-on-write)
Data formatsAny (Parquet, JSON, CSV)ProprietaryOpen (Parquet-based)
Query performanceVariableHighHigh (with optimization)
Streaming supportYesLimitedYes
ML/DS workloadsYes (direct access)Limited (export needed)Yes (direct access)
GovernanceWeakStrongStrong
Vendor lock-inLowHighLow

Table Formats

Table formats are the metadata layer that makes a lakehouse possible. They track which files belong to a table, manage transactions, and enable features like time travel and schema evolution.

Delta Lake

Created by Databricks, Delta Lake uses a transaction log (_delta_log/) to track table changes.

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Write a Delta table
(
    orders_df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("order_date")
    .save("s3://lakehouse/tables/orders/")
)

# Upsert (merge) into Delta table
delta_table = DeltaTable.forPath(spark, "s3://lakehouse/tables/orders/")

delta_table.alias("target").merge(
    new_orders.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

Apache Iceberg

Iceberg was created at Netflix and is now an Apache top-level project. It uses a tree of metadata files for snapshot isolation.

# Configure Spark for Iceberg
spark = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0") \
    .config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.lakehouse.type", "hadoop") \
    .config("spark.sql.catalog.lakehouse.warehouse", "s3://lakehouse/iceberg/") \
    .getOrCreate()

# Create an Iceberg table
spark.sql("""
    CREATE TABLE lakehouse.analytics.orders (
        order_id    BIGINT,
        customer_id BIGINT,
        total       DECIMAL(12, 2),
        status      STRING,
        order_date  DATE
    )
    USING iceberg
    PARTITIONED BY (days(order_date))
""")

# Insert data
spark.sql("""
    INSERT INTO lakehouse.analytics.orders
    SELECT * FROM staging.raw_orders
""")

Apache Hudi

Hudi (Hadoop Upserts Deletes and Incrementals) was created at Uber and excels at incremental processing and upserts.

# Write to Hudi table
(
    orders_df.write
    .format("hudi")
    .option("hoodie.table.name", "orders")
    .option("hoodie.datasource.write.recordkey.field", "order_id")
    .option("hoodie.datasource.write.precombine.field", "updated_at")
    .option("hoodie.datasource.write.operation", "upsert")
    .option("hoodie.datasource.write.partitionpath.field", "order_date")
    .mode("append")
    .save("s3://lakehouse/hudi/orders/")
)

Table Format Comparison

FeatureDelta LakeApache IcebergApache Hudi
OriginDatabricksNetflix / ApacheUber / Apache
Transaction logJSON-based _delta_logSnapshot metadata treeTimeline-based
ACID transactionsYesYesYes
Time travelYesYesYes
Schema evolutionYesYes (full)Yes
Partition evolutionLimited (overwrite)Yes (hidden partitioning)Limited
Engine supportSpark, Flink, TrinoSpark, Flink, Trino, DremioSpark, Flink, Presto
StreamingStructured StreamingFlink integrationBuilt-in incremental
Best forDatabricks usersMulti-engine environmentsUpsert-heavy workloads
CommunityLarge (Databricks-led)Rapidly growingModerate

Medallion Architecture

The medallion (multi-hop) architecture organizes data into layers of increasing quality and structure.

┌──────────┐      ┌──────────┐      ┌──────────┐
│  Bronze  │  →   │  Silver  │  →   │   Gold   │
│  (Raw)   │      │ (Cleaned)│      │ (Curated)│
└──────────┘      └──────────┘      └──────────┘
  Raw ingestion     Validated,         Business-level
  Append-only       deduplicated,      aggregations,
  Any format        standardized       ready for BI

Bronze Layer (Raw)

# Bronze: ingest raw data as-is, append-only
raw_events = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "user-events")
    .load()
)

# Write to Bronze Delta table
(
    raw_events
    .selectExpr(
        "CAST(key AS STRING) AS event_key",
        "CAST(value AS STRING) AS event_payload",
        "topic",
        "partition",
        "offset",
        "timestamp AS kafka_timestamp",
        "current_timestamp() AS ingested_at"
    )
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://lakehouse/checkpoints/bronze_events/")
    .toTable("bronze.user_events")
)

Silver Layer (Cleaned)

# Silver: clean, validate, and standardize
bronze_events = spark.readStream.table("bronze.user_events")

silver_events = (
    bronze_events
    .withColumn("event", F.from_json("event_payload", event_schema))
    .select(
        F.col("event.event_id").alias("event_id"),
        F.col("event.user_id").alias("user_id"),
        F.col("event.event_type").alias("event_type"),
        F.col("event.properties").alias("properties"),
        F.to_timestamp("event.timestamp").alias("event_timestamp"),
        F.col("ingested_at"),
    )
    # Deduplicate within watermark
    .withWatermark("event_timestamp", "1 hour")
    .dropDuplicates(["event_id"])
    # Filter invalid records
    .filter(F.col("user_id").isNotNull())
    .filter(F.col("event_type").isNotNull())
)

(
    silver_events
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://lakehouse/checkpoints/silver_events/")
    .toTable("silver.user_events")
)

Gold Layer (Curated)

-- Gold: business-level aggregations for BI
-- This could be a dbt model or a Spark job

CREATE OR REPLACE TABLE gold.daily_user_engagement AS
SELECT
    date_trunc('day', event_timestamp) AS activity_date,
    COUNT(DISTINCT user_id) AS dau,
    COUNT(*) AS total_events,
    COUNT(DISTINCT CASE WHEN event_type = 'purchase' THEN user_id END)
        AS purchasers,
    SUM(CASE WHEN event_type = 'purchase'
        THEN CAST(get_json_object(properties, '$.amount') AS DECIMAL(12,2))
        ELSE 0 END) AS revenue,
    COUNT(*) / COUNT(DISTINCT user_id) AS avg_events_per_user
FROM silver.user_events
WHERE event_timestamp >= current_date() - INTERVAL 90 DAYS
GROUP BY 1
ORDER BY 1;

Schema Evolution

Table formats handle schema changes gracefully, so you do not need to rebuild tables when sources add new fields.

Adding Columns

# Delta Lake: merge schema on write
(
    new_data_with_extra_columns
    .write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .save("s3://lakehouse/tables/orders/")
)

# Iceberg: explicit schema evolution
spark.sql("""
    ALTER TABLE lakehouse.analytics.orders
    ADD COLUMNS (
        shipping_address STRING,
        delivery_date    DATE
    )
""")

# Iceberg: rename columns (not possible in most formats)
spark.sql("""
    ALTER TABLE lakehouse.analytics.orders
    RENAME COLUMN total TO total_amount
""")

Safe Type Changes

# Iceberg supports safe type promotion
spark.sql("""
    ALTER TABLE lakehouse.analytics.orders
    ALTER COLUMN total_amount TYPE DECIMAL(14, 2)  -- widen precision
""")

Time Travel

Table formats maintain a history of changes, allowing you to query data as it existed at any point in time.

Delta Lake Time Travel

-- Query data as of a specific version
SELECT * FROM orders VERSION AS OF 5;

-- Query data as of a specific timestamp
SELECT * FROM orders TIMESTAMP AS OF '2025-01-15 10:00:00';

-- View table history
DESCRIBE HISTORY orders;

-- Restore to a previous version
RESTORE TABLE orders TO VERSION AS OF 5;

Iceberg Time Travel

-- Query a specific snapshot
SELECT * FROM lakehouse.analytics.orders
FOR SYSTEM_TIME AS OF TIMESTAMP '2025-01-15 10:00:00';

-- Query by snapshot ID
SELECT * FROM lakehouse.analytics.orders
FOR SYSTEM_VERSION AS OF 7654321;

-- List snapshots
SELECT * FROM lakehouse.analytics.orders.snapshots;

-- Rollback to a snapshot
CALL lakehouse.system.rollback_to_snapshot('analytics.orders', 7654321);

Time Travel Use Cases

# 1. Audit: what did the data look like when the report was generated?
report_date_data = spark.read.format("delta") \
    .option("timestampAsOf", "2025-01-15") \
    .load("s3://lakehouse/tables/orders/")

# 2. Debugging: compare current vs previous state
current = spark.read.format("delta").load("s3://lakehouse/tables/orders/")
previous = spark.read.format("delta") \
    .option("versionAsOf", 10) \
    .load("s3://lakehouse/tables/orders/")

# Find what changed
changes = current.subtract(previous)
print(f"Changed rows: {changes.count()}")

# 3. Reproducible ML: train on exact same data
training_data = spark.read.format("delta") \
    .option("versionAsOf", 42) \
    .load("s3://lakehouse/tables/features/")

ACID on Object Storage

Table formats achieve ACID transactions on eventually-consistent object storage through optimistic concurrency control.

How It Works

Write Transaction:
1. Write new data files to object storage (Parquet files)
2. Create a new metadata file referencing the new + existing files
3. Atomically update the pointer to the latest metadata
   - Delta: add a new JSON commit to _delta_log/
   - Iceberg: atomically swap the metadata pointer
   - Hudi: update the timeline

Conflict Resolution:
- If two writers try to commit simultaneously:
  - One succeeds (first to commit wins)
  - The other detects the conflict and retries
  - File-level conflict detection (not row-level)

ACID Properties

PropertyHow It Is Achieved
AtomicityCommit is a single metadata update; partial writes are invisible
ConsistencySchema enforcement prevents invalid data
IsolationSnapshot isolation via versioned metadata
DurabilityData files on object storage (11 nines durability)

Table Maintenance

-- Delta Lake: optimize small files (compaction)
OPTIMIZE orders;

-- Delta Lake: Z-order for multi-dimensional clustering
OPTIMIZE orders ZORDER BY (customer_id, order_date);

-- Iceberg: compact small files
CALL lakehouse.system.rewrite_data_files(
    table => 'analytics.orders',
    strategy => 'binpack',
    options => map('target-file-size-bytes', '134217728')  -- 128MB
);

-- Clean up old snapshots (free storage)
-- Delta Lake
VACUUM orders RETAIN 168 HOURS;  -- Keep 7 days of history

-- Iceberg
CALL lakehouse.system.expire_snapshots(
    table => 'analytics.orders',
    older_than => TIMESTAMP '2025-01-08 00:00:00',
    retain_last => 10
);

Putting It All Together

A complete lakehouse architecture:

Data Sources                    Lakehouse Platform
─────────────                   ──────────────────────
APIs          ─┐
Databases      ├─→  Ingestion  ─→  Bronze (raw Delta/Iceberg tables)
Event Streams  │    (Fivetran,      │
Files         ─┘     Kafka,         ▼
                     custom)    Silver (cleaned, validated)


                                Gold (business aggregations)

                          ┌─────────┼─────────┐
                          ▼         ▼         ▼
                        BI Tools   ML/DS    APIs
                        (Looker)  (Notebooks) (REST)
# Summary: lakehouse design principles
lakehouse_principles = {
    "open_formats":      "Delta Lake or Iceberg on Parquet",
    "object_storage":    "S3, GCS, or ADLS as the foundation",
    "medallion_layers":  "Bronze → Silver → Gold",
    "acid_transactions": "Table formats provide ACID guarantees",
    "schema_evolution":  "Handle upstream changes gracefully",
    "time_travel":       "Audit, debug, and reproduce with versioning",
    "unified_batch_streaming": "Same tables for batch and streaming",
    "governance":        "Unity Catalog, Polaris, or Nessie for access control",
}

The lakehouse is not just a buzzword -- it represents a genuine architectural shift. By combining open table formats with object storage, organizations get warehouse-grade reliability at data lake prices, with the flexibility to support BI, ML, and streaming workloads on a single platform.

On this page