Steven's Knowledge

ETL vs ELT

ETL and ELT paradigms, modern ELT tools, Change Data Capture, data quality checks, idempotent pipelines, and incremental loading strategies.

ETL vs ELT

The way we move and transform data has fundamentally shifted. Understanding when to use ETL (Extract-Transform-Load) versus ELT (Extract-Load-Transform) is a foundational decision for any data engineering team.

ETL: The Traditional Approach

ETL transforms data before loading it into the target system. This pattern dominated when storage was expensive and compute was limited.

Source → Extract → Transform (staging server) → Load → Data Warehouse

How ETL Works

# Traditional ETL pattern
import pandas as pd
from sqlalchemy import create_engine

def extract(source_conn_string: str, query: str) -> pd.DataFrame:
    """Extract data from source system."""
    engine = create_engine(source_conn_string)
    return pd.read_sql(query, engine)

def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and transform data before loading."""
    # Remove duplicates
    df = df.drop_duplicates(subset=["order_id"])
    # Standardize dates
    df["order_date"] = pd.to_datetime(df["order_date"]).dt.date
    # Derive new columns
    df["total_with_tax"] = df["subtotal"] * (1 + df["tax_rate"])
    # Filter invalid records
    df = df[df["total_with_tax"] > 0]
    return df

def load(df: pd.DataFrame, target_conn_string: str, table: str):
    """Load transformed data into the warehouse."""
    engine = create_engine(target_conn_string)
    df.to_sql(table, engine, if_exists="append", index=False)

# Pipeline execution
raw_orders = extract(SOURCE_DB, "SELECT * FROM orders WHERE updated_at > :last_run")
clean_orders = transform(raw_orders)
load(clean_orders, WAREHOUSE_DB, "dim_orders")

When ETL Makes Sense

  • Strict compliance requirements (data must be masked before landing)
  • Source systems with limited bandwidth (transform to reduce data volume)
  • Legacy warehouses with limited compute capacity
  • Simple, low-volume pipelines

ELT: The Modern Approach

ELT loads raw data first, then transforms it inside the warehouse. This approach leverages the massive compute power of modern cloud warehouses.

Source → Extract → Load (raw) → Transform (in warehouse) → Analytics

Why ELT Won

FactorETLELT
Transform computeStaging server (you manage)Warehouse (managed, scalable)
Raw data preservedNo (transformed before load)Yes (raw layer always available)
Transformation flexibilityFixed at pipeline timeIterate transforms after loading
DebuggingHard (raw data is gone)Easy (replay from raw)
Cost modelServer uptimeQuery-based (pay per transform)
ScalabilityLimited by staging infraNearly unlimited

Modern ELT in Practice

-- Step 1: Raw data lands in the warehouse (loaded by Fivetran/Airbyte)
-- raw.stripe_payments contains the exact JSON from Stripe's API

-- Step 2: Transform in the warehouse using dbt
-- models/staging/stg_payments.sql
WITH source AS (
    SELECT * FROM {{ source('stripe', 'payments') }}
),

renamed AS (
    SELECT
        id AS payment_id,
        order_id,
        amount / 100.0 AS amount_dollars,  -- Stripe stores cents
        currency,
        status,
        created AS payment_created_at,
        _fivetran_synced AS loaded_at
    FROM source
    WHERE status != 'failed'
)

SELECT * FROM renamed

ELT Tools Landscape

Managed Ingestion

Fivetran - Fully managed connectors, zero maintenance:

  • 300+ pre-built connectors
  • Automatic schema migrations
  • Best for teams that want zero connector maintenance

Airbyte - Open-source alternative:

  • Self-hosted or cloud
  • Growing connector library
  • Custom connector SDK for internal sources

Custom Ingestion - When you need full control:

# Custom extraction for an internal API
import requests
import json
from datetime import datetime, timedelta

def extract_api_incremental(api_url: str, last_checkpoint: str) -> list[dict]:
    """Incrementally extract records modified since last checkpoint."""
    params = {
        "modified_after": last_checkpoint,
        "page_size": 1000,
    }
    all_records = []
    page = 1

    while True:
        params["page"] = page
        response = requests.get(api_url, params=params, timeout=30)
        response.raise_for_status()
        data = response.json()

        if not data["results"]:
            break

        all_records.extend(data["results"])
        page += 1

    return all_records

Change Data Capture (CDC)

CDC captures row-level changes (inserts, updates, deletes) from source databases, enabling near-real-time data replication without full table scans.

CDC Methods

MethodHow It WorksLatencyImpact on Source
Log-basedReads database transaction logsSecondsMinimal
Query-basedPolls with WHERE updated_at > XMinutesModerate
Trigger-basedDatabase triggers write to change tableImmediateHigh

Log-Based CDC Example (Debezium)

{
  "schema": { "type": "struct", "fields": [...] },
  "payload": {
    "before": { "id": 1001, "name": "Alice", "email": "alice@old.com" },
    "after":  { "id": 1001, "name": "Alice", "email": "alice@new.com" },
    "source": {
      "version": "2.4.0",
      "connector": "postgresql",
      "ts_ms": 1700000000000,
      "db": "production",
      "table": "customers"
    },
    "op": "u",
    "ts_ms": 1700000000123
  }
}

The op field indicates the operation: c (create), u (update), d (delete), r (read/snapshot).

Data Quality Checks

Data quality is not optional. Catching bad data early prevents cascading failures downstream.

dbt Tests

# schema.yml
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_total
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'cancelled']
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customers')
              field: customer_id

Great Expectations

import great_expectations as gx

context = gx.get_context()

# Define expectations for a payments dataset
validator = context.sources.pandas_default.read_csv("payments.csv")

validator.expect_column_values_to_not_be_null("payment_id")
validator.expect_column_values_to_be_unique("payment_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=50000)
validator.expect_column_values_to_be_in_set(
    "currency", ["USD", "EUR", "GBP", "NZD", "AUD"]
)
validator.expect_column_pair_values_a_to_be_greater_than_b(
    "paid_at", "created_at", or_equal=True
)

results = validator.validate()
if not results.success:
    raise ValueError(f"Data quality check failed: {results.statistics}")

Idempotent Pipelines

An idempotent pipeline produces the same result regardless of how many times it runs. This is critical for reliability.

The Merge Pattern

-- Idempotent upsert using MERGE (works in Snowflake, BigQuery, Databricks)
MERGE INTO warehouse.dim_customers AS target
USING staging.raw_customers AS source
ON target.customer_id = source.customer_id

WHEN MATCHED AND source.updated_at > target.updated_at THEN
    UPDATE SET
        target.name = source.name,
        target.email = source.email,
        target.updated_at = source.updated_at

WHEN NOT MATCHED THEN
    INSERT (customer_id, name, email, created_at, updated_at)
    VALUES (source.customer_id, source.name, source.email,
            source.created_at, source.updated_at);

Delete-and-Replace Pattern

-- For partitioned data: delete the partition, then insert
-- This is naturally idempotent
BEGIN TRANSACTION;

DELETE FROM analytics.fact_orders
WHERE order_date = '2025-01-15';

INSERT INTO analytics.fact_orders
SELECT * FROM staging.orders_transformed
WHERE order_date = '2025-01-15';

COMMIT;

Incremental Loading Strategies

Full table reloads are simple but do not scale. Incremental loads process only new or changed data.

Strategy Comparison

StrategyMechanismHandles DeletesComplexity
Full reloadReplace entire tableYesLow
Timestamp-basedWHERE updated_at > last_runNoMedium
CDC (log-based)Stream transaction logYesHigh
WatermarkHigh-water mark columnNoMedium

dbt Incremental Model

-- models/fact_events.sql
{{
  config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
  )
}}

SELECT
    event_id,
    user_id,
    event_type,
    event_properties,
    occurred_at

FROM {{ source('app', 'events') }}

{% if is_incremental() %}
    -- Only process new events since last run
    WHERE occurred_at > (SELECT MAX(occurred_at) FROM {{ this }})
{% endif %}

Building a Robust ELT Pipeline

Here is a checklist for production-grade ELT pipelines:

  1. Idempotency - Every run produces the same result
  2. Incremental loads - Process only changed data
  3. Data quality gates - Block bad data from reaching consumers
  4. Schema evolution - Handle new columns and type changes gracefully
  5. Monitoring - Alert on pipeline failures, data freshness, and row count anomalies
  6. Lineage - Track where data comes from and how it is transformed
  7. Documentation - Every model and column has a description
# A well-structured ELT pipeline checklist
pipeline_checklist = {
    "idempotent":         True,
    "incremental":        True,
    "quality_checks":     ["not_null", "unique", "freshness", "row_count"],
    "schema_evolution":   "append_new_columns",
    "monitoring":         ["slack_alerts", "pagerduty_oncall"],
    "lineage":            "dbt_docs + datahub",
    "documentation":      "schema.yml for every model",
}

The shift from ETL to ELT reflects a broader trend: decouple ingestion from transformation, preserve raw data, and iterate on transformations without re-extracting. Modern tools like dbt, Fivetran, and cloud warehouses make this approach accessible to teams of any size.

On this page