Steven's Knowledge

Orchestration

Workflow orchestration with Airflow, Dagster, Prefect, and dbt - DAGs, operators, software-defined assets, models, tests, and best practices.

Orchestration

Orchestration is the coordination of data pipelines - deciding what runs, when, in what order, and what happens when things fail. The right orchestration tool turns fragile scripts into reliable, observable data systems.

Apache Airflow

Airflow is the most widely adopted orchestration tool. It defines workflows as Directed Acyclic Graphs (DAGs) written in Python.

Core Concepts

  • DAG: A collection of tasks with dependencies (no cycles)
  • Operator: A template for a task (BashOperator, PythonOperator, etc.)
  • Task: A single unit of work (an instance of an operator)
  • XCom: Cross-communication between tasks (small data passing)
  • Connection: Stored credentials for external systems
  • Pool: Limits concurrency for resource-constrained operations

A Production DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-alerts@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=2),
}

with DAG(
    dag_id="daily_ecommerce_pipeline",
    default_args=default_args,
    description="Daily pipeline for ecommerce data",
    schedule="0 6 * * *",  # 6 AM daily
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["ecommerce", "daily"],
    max_active_runs=1,
) as dag:

    # Task 1: Check source data freshness
    check_source = SQLExecuteQueryOperator(
        task_id="check_source_freshness",
        conn_id="source_postgres",
        sql="""
            SELECT CASE
                WHEN MAX(updated_at) > NOW() - INTERVAL '2 hours'
                THEN 1 ELSE 0
            END AS is_fresh
            FROM orders;
        """,
    )

    # Task 2: Extract data
    def extract_orders(**context):
        """Extract orders updated since last successful run."""
        from airflow.models import Variable
        import pandas as pd
        from sqlalchemy import create_engine

        last_run = context["prev_data_interval_end_success"] or "2025-01-01"
        engine = create_engine(Variable.get("source_db_conn"))
        df = pd.read_sql(
            f"SELECT * FROM orders WHERE updated_at >= '{last_run}'",
            engine
        )
        # Push row count to XCom for downstream validation
        context["ti"].xcom_push(key="extracted_rows", value=len(df))
        df.to_parquet("/tmp/orders_extract.parquet")

    extract = PythonOperator(
        task_id="extract_orders",
        python_callable=extract_orders,
    )

    # Task 3: Run dbt transformations
    with TaskGroup(group_id="dbt_transforms") as dbt_group:
        dbt_run = BashOperator(
            task_id="dbt_run",
            bash_command="cd /opt/dbt && dbt run --select tag:daily",
        )

        dbt_test = BashOperator(
            task_id="dbt_test",
            bash_command="cd /opt/dbt && dbt test --select tag:daily",
        )

        dbt_run >> dbt_test

    # Task 4: Validate results
    def validate_output(**context):
        """Ensure output tables meet quality thresholds."""
        extracted = context["ti"].xcom_pull(
            task_ids="extract_orders", key="extracted_rows"
        )
        if extracted == 0:
            raise ValueError("No rows extracted - source may be stale")

    validate = PythonOperator(
        task_id="validate_output",
        python_callable=validate_output,
    )

    # Define task dependencies
    check_source >> extract >> dbt_group >> validate

Airflow Best Practices

  1. Keep DAGs idempotent - Every run should produce the same result for the same data interval
  2. Use execution_date - Process data based on the logical date, not wall clock time
  3. Avoid heavy logic in DAG files - DAG parsing runs every 30 seconds; keep it lightweight
  4. Use TaskGroups - Organize related tasks visually
  5. Set timeouts - Prevent stuck tasks from blocking the scheduler
  6. Use Connections and Variables - Never hardcode credentials in DAG files

Dagster

Dagster takes a fundamentally different approach with software-defined assets (SDAs). Instead of defining tasks that "do things," you define the data assets that should exist.

Software-Defined Assets

import dagster as dg
import pandas as pd

@dg.asset(
    description="Raw orders from the ecommerce database",
    group_name="raw",
    metadata={"source": "postgres", "table": "orders"},
)
def raw_orders(context: dg.AssetExecutionContext) -> pd.DataFrame:
    """Extract raw orders from the source database."""
    engine = create_engine(context.resources.source_db_url)
    df = pd.read_sql("SELECT * FROM orders", engine)
    context.log.info(f"Extracted {len(df)} orders")
    return df

@dg.asset(
    description="Cleaned and enriched orders",
    group_name="staging",
    deps=[raw_orders],
)
def stg_orders(context: dg.AssetExecutionContext, raw_orders: pd.DataFrame) -> pd.DataFrame:
    """Clean and standardize order data."""
    df = raw_orders.copy()
    df = df.drop_duplicates(subset=["order_id"])
    df["order_date"] = pd.to_datetime(df["order_date"]).dt.date
    df["total_with_tax"] = df["subtotal"] * (1 + df["tax_rate"])
    context.log.info(f"Staged {len(df)} orders after cleaning")
    return df

@dg.asset(
    description="Daily revenue metrics by product category",
    group_name="marts",
    deps=[stg_orders],
)
def daily_revenue_by_category(stg_orders: pd.DataFrame) -> pd.DataFrame:
    """Aggregate daily revenue by product category."""
    return (
        stg_orders
        .groupby(["order_date", "category"])
        .agg(
            revenue=("total_with_tax", "sum"),
            order_count=("order_id", "nunique"),
        )
        .reset_index()
    )

# Define resources
defs = dg.Definitions(
    assets=[raw_orders, stg_orders, daily_revenue_by_category],
    resources={
        "source_db_url": dg.EnvVar("SOURCE_DATABASE_URL"),
    },
)

Why Dagster Over Airflow

FeatureAirflowDagster
Core abstractionTasks (operations)Assets (data)
TestabilityHard to unit testFirst-class testing support
Local developmentRequires full Airflow setupdagster dev runs locally
Type checkingRuntime onlyBuild-time with Python types
Data lineagePlugin-basedBuilt-in asset graph
BackfillsManual or customNative partition backfills

Prefect

Prefect positions itself as the simplest orchestration tool. It uses Python decorators to turn functions into observable workflows.

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_data(source: str) -> dict:
    """Extract data with automatic retries and caching."""
    # Implementation here
    return {"rows": 1000, "source": source}

@task
def transform_data(raw_data: dict) -> dict:
    """Transform the extracted data."""
    return {"rows": raw_data["rows"], "status": "transformed"}

@task
def load_data(data: dict) -> None:
    """Load data into the warehouse."""
    print(f"Loaded {data['rows']} rows")

@flow(name="daily-etl", log_prints=True)
def daily_pipeline(source: str = "postgres"):
    """Main pipeline flow."""
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed)

# Run locally or deploy to Prefect Cloud
if __name__ == "__main__":
    daily_pipeline()

dbt (Data Build Tool)

dbt is not a general orchestrator - it specifically orchestrates SQL transformations inside your warehouse. It has become the de facto standard for the "T" in ELT.

Models

-- models/staging/stg_orders.sql
{{
  config(
    materialized='view',
    schema='staging'
  )
}}

WITH source AS (
    SELECT * FROM {{ source('ecommerce', 'orders') }}
),

cleaned AS (
    SELECT
        id AS order_id,
        user_id AS customer_id,
        CAST(total_cents AS DECIMAL(12,2)) / 100 AS total_dollars,
        status,
        created_at AS ordered_at,
        updated_at
    FROM source
    WHERE status != 'test'
)

SELECT * FROM cleaned

Incremental Models

-- models/marts/fct_page_views.sql
{{
  config(
    materialized='incremental',
    unique_key='page_view_id',
    incremental_strategy='merge'
  )
}}

SELECT
    {{ dbt_utils.generate_surrogate_key(['session_id', 'page_url', 'viewed_at']) }}
        AS page_view_id,
    session_id,
    user_id,
    page_url,
    referrer_url,
    duration_seconds,
    viewed_at

FROM {{ ref('stg_page_views') }}

{% if is_incremental() %}
WHERE viewed_at > (SELECT MAX(viewed_at) FROM {{ this }})
{% endif %}

Tests

# models/staging/schema.yml
version: 2

models:
  - name: stg_orders
    description: "Cleaned orders from the ecommerce system"
    columns:
      - name: order_id
        description: "Primary key"
        tests:
          - unique
          - not_null
      - name: total_dollars
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']

sources:
  - name: ecommerce
    database: raw
    schema: public
    freshness:
      warn_after: { count: 12, period: hour }
      error_after: { count: 24, period: hour }
    tables:
      - name: orders
        loaded_at_field: _loaded_at

Snapshots

-- snapshots/snap_customers.sql
{% snapshot snap_customers %}

{{
  config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='timestamp',
    updated_at='updated_at',
    invalidate_hard_deletes=True,
  )
}}

SELECT * FROM {{ source('ecommerce', 'customers') }}

{% endsnapshot %}

dbt Project Configuration

# dbt_project.yml
name: ecommerce_analytics
version: '1.0.0'
config-version: 2
profile: 'warehouse'

model-paths: ["models"]
test-paths: ["tests"]
snapshot-paths: ["snapshots"]
macro-paths: ["macros"]

models:
  ecommerce_analytics:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: analytics

Tool Comparison

FeatureAirflowDagsterPrefectdbt
Core use caseGeneral orchestrationAsset-based pipelinesSimple workflowsSQL transformations
LanguagePythonPythonPythonSQL + Jinja
Learning curveSteepModerateLowLow-Moderate
Local devComplex setupdagster devpython flow.pydbt run
UIWeb serverDagitPrefect Cloud/Serverdbt Cloud / docs
ScalingCelery/K8s executorK8s, multi-processWorkers, K8sWarehouse compute
TestingLimited built-inFirst-classBasicBuilt-in data tests
Best forComplex multi-system pipelinesData-asset-centric teamsSimple Python workflowsIn-warehouse transforms

Combining Tools

In practice, most teams combine these tools. A common production setup:

Airflow/Dagster (orchestrator)
├── Task: Run Fivetran sync (extract + load)
├── Task: dbt run --select staging (stage raw data)
├── Task: dbt test --select staging (validate staging)
├── Task: dbt run --select marts (build analytics models)
├── Task: dbt test --select marts (validate marts)
├── Task: Refresh BI dashboards
└── Task: Send Slack notification
# Airflow DAG that orchestrates dbt
from airflow.operators.bash import BashOperator

dbt_staging = BashOperator(
    task_id="dbt_run_staging",
    bash_command="cd /opt/dbt && dbt run --select staging",
)

dbt_test_staging = BashOperator(
    task_id="dbt_test_staging",
    bash_command="cd /opt/dbt && dbt test --select staging",
)

dbt_marts = BashOperator(
    task_id="dbt_run_marts",
    bash_command="cd /opt/dbt && dbt run --select marts",
)

dbt_test_marts = BashOperator(
    task_id="dbt_test_marts",
    bash_command="cd /opt/dbt && dbt test --select marts",
)

dbt_staging >> dbt_test_staging >> dbt_marts >> dbt_test_marts

Best Practices

  1. Idempotent tasks - Every task produces the same result when re-run
  2. Atomic operations - Tasks either fully succeed or fully fail
  3. Clear ownership - Every DAG has an owner and alert recipients
  4. Monitoring - Track run duration, success rate, and data freshness
  5. Version control - All pipeline code lives in Git
  6. Testing - Test transformations before deploying to production
  7. Documentation - Every pipeline has a description of what it does and why

Choose your orchestration stack based on team size, complexity, and existing infrastructure. Start simple (dbt + cron or Prefect) and graduate to Airflow or Dagster as your pipelines grow in number and complexity.

On this page