Orchestration
Workflow orchestration with Airflow, Dagster, Prefect, and dbt - DAGs, operators, software-defined assets, models, tests, and best practices.
Orchestration
Orchestration is the coordination of data pipelines - deciding what runs, when, in what order, and what happens when things fail. The right orchestration tool turns fragile scripts into reliable, observable data systems.
Apache Airflow
Airflow is the most widely adopted orchestration tool. It defines workflows as Directed Acyclic Graphs (DAGs) written in Python.
Core Concepts
- DAG: A collection of tasks with dependencies (no cycles)
- Operator: A template for a task (BashOperator, PythonOperator, etc.)
- Task: A single unit of work (an instance of an operator)
- XCom: Cross-communication between tasks (small data passing)
- Connection: Stored credentials for external systems
- Pool: Limits concurrency for resource-constrained operations
A Production DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.task_group import TaskGroup
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="daily_ecommerce_pipeline",
default_args=default_args,
description="Daily pipeline for ecommerce data",
schedule="0 6 * * *", # 6 AM daily
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ecommerce", "daily"],
max_active_runs=1,
) as dag:
# Task 1: Check source data freshness
check_source = SQLExecuteQueryOperator(
task_id="check_source_freshness",
conn_id="source_postgres",
sql="""
SELECT CASE
WHEN MAX(updated_at) > NOW() - INTERVAL '2 hours'
THEN 1 ELSE 0
END AS is_fresh
FROM orders;
""",
)
# Task 2: Extract data
def extract_orders(**context):
"""Extract orders updated since last successful run."""
from airflow.models import Variable
import pandas as pd
from sqlalchemy import create_engine
last_run = context["prev_data_interval_end_success"] or "2025-01-01"
engine = create_engine(Variable.get("source_db_conn"))
df = pd.read_sql(
f"SELECT * FROM orders WHERE updated_at >= '{last_run}'",
engine
)
# Push row count to XCom for downstream validation
context["ti"].xcom_push(key="extracted_rows", value=len(df))
df.to_parquet("/tmp/orders_extract.parquet")
extract = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
)
# Task 3: Run dbt transformations
with TaskGroup(group_id="dbt_transforms") as dbt_group:
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/dbt && dbt run --select tag:daily",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="cd /opt/dbt && dbt test --select tag:daily",
)
dbt_run >> dbt_test
# Task 4: Validate results
def validate_output(**context):
"""Ensure output tables meet quality thresholds."""
extracted = context["ti"].xcom_pull(
task_ids="extract_orders", key="extracted_rows"
)
if extracted == 0:
raise ValueError("No rows extracted - source may be stale")
validate = PythonOperator(
task_id="validate_output",
python_callable=validate_output,
)
# Define task dependencies
check_source >> extract >> dbt_group >> validateAirflow Best Practices
- Keep DAGs idempotent - Every run should produce the same result for the same data interval
- Use
execution_date- Process data based on the logical date, not wall clock time - Avoid heavy logic in DAG files - DAG parsing runs every 30 seconds; keep it lightweight
- Use TaskGroups - Organize related tasks visually
- Set timeouts - Prevent stuck tasks from blocking the scheduler
- Use Connections and Variables - Never hardcode credentials in DAG files
Dagster
Dagster takes a fundamentally different approach with software-defined assets (SDAs). Instead of defining tasks that "do things," you define the data assets that should exist.
Software-Defined Assets
import dagster as dg
import pandas as pd
@dg.asset(
description="Raw orders from the ecommerce database",
group_name="raw",
metadata={"source": "postgres", "table": "orders"},
)
def raw_orders(context: dg.AssetExecutionContext) -> pd.DataFrame:
"""Extract raw orders from the source database."""
engine = create_engine(context.resources.source_db_url)
df = pd.read_sql("SELECT * FROM orders", engine)
context.log.info(f"Extracted {len(df)} orders")
return df
@dg.asset(
description="Cleaned and enriched orders",
group_name="staging",
deps=[raw_orders],
)
def stg_orders(context: dg.AssetExecutionContext, raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Clean and standardize order data."""
df = raw_orders.copy()
df = df.drop_duplicates(subset=["order_id"])
df["order_date"] = pd.to_datetime(df["order_date"]).dt.date
df["total_with_tax"] = df["subtotal"] * (1 + df["tax_rate"])
context.log.info(f"Staged {len(df)} orders after cleaning")
return df
@dg.asset(
description="Daily revenue metrics by product category",
group_name="marts",
deps=[stg_orders],
)
def daily_revenue_by_category(stg_orders: pd.DataFrame) -> pd.DataFrame:
"""Aggregate daily revenue by product category."""
return (
stg_orders
.groupby(["order_date", "category"])
.agg(
revenue=("total_with_tax", "sum"),
order_count=("order_id", "nunique"),
)
.reset_index()
)
# Define resources
defs = dg.Definitions(
assets=[raw_orders, stg_orders, daily_revenue_by_category],
resources={
"source_db_url": dg.EnvVar("SOURCE_DATABASE_URL"),
},
)Why Dagster Over Airflow
| Feature | Airflow | Dagster |
|---|---|---|
| Core abstraction | Tasks (operations) | Assets (data) |
| Testability | Hard to unit test | First-class testing support |
| Local development | Requires full Airflow setup | dagster dev runs locally |
| Type checking | Runtime only | Build-time with Python types |
| Data lineage | Plugin-based | Built-in asset graph |
| Backfills | Manual or custom | Native partition backfills |
Prefect
Prefect positions itself as the simplest orchestration tool. It uses Python decorators to turn functions into observable workflows.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def extract_data(source: str) -> dict:
"""Extract data with automatic retries and caching."""
# Implementation here
return {"rows": 1000, "source": source}
@task
def transform_data(raw_data: dict) -> dict:
"""Transform the extracted data."""
return {"rows": raw_data["rows"], "status": "transformed"}
@task
def load_data(data: dict) -> None:
"""Load data into the warehouse."""
print(f"Loaded {data['rows']} rows")
@flow(name="daily-etl", log_prints=True)
def daily_pipeline(source: str = "postgres"):
"""Main pipeline flow."""
raw = extract_data(source)
transformed = transform_data(raw)
load_data(transformed)
# Run locally or deploy to Prefect Cloud
if __name__ == "__main__":
daily_pipeline()dbt (Data Build Tool)
dbt is not a general orchestrator - it specifically orchestrates SQL transformations inside your warehouse. It has become the de facto standard for the "T" in ELT.
Models
-- models/staging/stg_orders.sql
{{
config(
materialized='view',
schema='staging'
)
}}
WITH source AS (
SELECT * FROM {{ source('ecommerce', 'orders') }}
),
cleaned AS (
SELECT
id AS order_id,
user_id AS customer_id,
CAST(total_cents AS DECIMAL(12,2)) / 100 AS total_dollars,
status,
created_at AS ordered_at,
updated_at
FROM source
WHERE status != 'test'
)
SELECT * FROM cleanedIncremental Models
-- models/marts/fct_page_views.sql
{{
config(
materialized='incremental',
unique_key='page_view_id',
incremental_strategy='merge'
)
}}
SELECT
{{ dbt_utils.generate_surrogate_key(['session_id', 'page_url', 'viewed_at']) }}
AS page_view_id,
session_id,
user_id,
page_url,
referrer_url,
duration_seconds,
viewed_at
FROM {{ ref('stg_page_views') }}
{% if is_incremental() %}
WHERE viewed_at > (SELECT MAX(viewed_at) FROM {{ this }})
{% endif %}Tests
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
description: "Cleaned orders from the ecommerce system"
columns:
- name: order_id
description: "Primary key"
tests:
- unique
- not_null
- name: total_dollars
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
- name: status
tests:
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
sources:
- name: ecommerce
database: raw
schema: public
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
tables:
- name: orders
loaded_at_field: _loaded_atSnapshots
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True,
)
}}
SELECT * FROM {{ source('ecommerce', 'customers') }}
{% endsnapshot %}dbt Project Configuration
# dbt_project.yml
name: ecommerce_analytics
version: '1.0.0'
config-version: 2
profile: 'warehouse'
model-paths: ["models"]
test-paths: ["tests"]
snapshot-paths: ["snapshots"]
macro-paths: ["macros"]
models:
ecommerce_analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analyticsTool Comparison
| Feature | Airflow | Dagster | Prefect | dbt |
|---|---|---|---|---|
| Core use case | General orchestration | Asset-based pipelines | Simple workflows | SQL transformations |
| Language | Python | Python | Python | SQL + Jinja |
| Learning curve | Steep | Moderate | Low | Low-Moderate |
| Local dev | Complex setup | dagster dev | python flow.py | dbt run |
| UI | Web server | Dagit | Prefect Cloud/Server | dbt Cloud / docs |
| Scaling | Celery/K8s executor | K8s, multi-process | Workers, K8s | Warehouse compute |
| Testing | Limited built-in | First-class | Basic | Built-in data tests |
| Best for | Complex multi-system pipelines | Data-asset-centric teams | Simple Python workflows | In-warehouse transforms |
Combining Tools
In practice, most teams combine these tools. A common production setup:
Airflow/Dagster (orchestrator)
├── Task: Run Fivetran sync (extract + load)
├── Task: dbt run --select staging (stage raw data)
├── Task: dbt test --select staging (validate staging)
├── Task: dbt run --select marts (build analytics models)
├── Task: dbt test --select marts (validate marts)
├── Task: Refresh BI dashboards
└── Task: Send Slack notification# Airflow DAG that orchestrates dbt
from airflow.operators.bash import BashOperator
dbt_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="cd /opt/dbt && dbt run --select staging",
)
dbt_test_staging = BashOperator(
task_id="dbt_test_staging",
bash_command="cd /opt/dbt && dbt test --select staging",
)
dbt_marts = BashOperator(
task_id="dbt_run_marts",
bash_command="cd /opt/dbt && dbt run --select marts",
)
dbt_test_marts = BashOperator(
task_id="dbt_test_marts",
bash_command="cd /opt/dbt && dbt test --select marts",
)
dbt_staging >> dbt_test_staging >> dbt_marts >> dbt_test_martsBest Practices
- Idempotent tasks - Every task produces the same result when re-run
- Atomic operations - Tasks either fully succeed or fully fail
- Clear ownership - Every DAG has an owner and alert recipients
- Monitoring - Track run duration, success rate, and data freshness
- Version control - All pipeline code lives in Git
- Testing - Test transformations before deploying to production
- Documentation - Every pipeline has a description of what it does and why
Choose your orchestration stack based on team size, complexity, and existing infrastructure. Start simple (dbt + cron or Prefect) and graduate to Airflow or Dagster as your pipelines grow in number and complexity.