Pipeline Orchestration: Coordinating Data Workflows
Airflow, Dagster, and Prefect coordinate complex data workflows. Learn orchestration patterns, DAG design, and failure handling.
Pipeline Orchestration: Coordinating Complex Data Workflows
A daily pipeline runs extraction from PostgreSQL, transforms in dbt, loads to Snowflake, runs BI report generation, and sends a Slack notification. Some steps must run sequentially. Others can run in parallel. If the dbt step fails, the report generation should not run, but the Slack notification should still fire with a failure alert.
This is pipeline orchestration. Orchestration frameworks manage the execution order, dependencies, retries, and monitoring of complex workflows. They are the operating system for your data pipelines.
What Orchestration Provides
Without orchestration, you have scripts. Scripts can be scheduled with cron, but cron does not handle dependencies, retries, or failure recovery well.
Orchestration frameworks provide:
Directed Acyclic Graphs (DAGs): Define tasks and their dependencies. The framework ensures tasks run in the correct order.
Scheduling: Trigger pipeline runs on schedules (daily at 2am), on events (new file arrives), or manually.
Retry logic: Automatic retry on failure with configurable backoff. A transient database timeout does not require manual intervention.
Logging and observability: Centralized logs for all tasks. Visibility into which tasks ran, how long they took, and what failed.
Alerting: Integration with PagerDuty, Slack, and email for failure notifications.
Resource management: Allocation of compute resources to tasks. Isolation between tasks that should not interfere.
Airflow: The Industry Standard
Apache Airflow is the dominant orchestration platform. Airflow DAGs are Python files that define the workflow structure.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2am
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
extract_task = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_from_postgres,
)
transform_task = PythonOperator(
task_id='transform_orders',
python_callable=run_dbt_transform,
)
load_task = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_snowflake,
)
report_task = PythonOperator(
task_id='generate_reports',
python_callable=generate_bi_reports,
trigger_rule='all_success', # Only run if all upstream succeeded
)
notify_task = PythonOperator(
task_id='notify_slack',
python_callable=send_slack_notification,
trigger_rule='none_failed', # Run even if upstream failed (to alert on failure)
)
extract_task >> transform_task >> load_task >> report_task
[extract_task, transform_task, load_task, report_task] >> notify_task
Airflow’s strength is its ecosystem. Every data tool has an Airflow operator. Every monitoring platform integrates with Airflow. Airflow is the de facto standard for data pipeline orchestration.
The weakness is operational complexity. Airflow requires a scheduler, web server, database (metadata store), and workers. LocalExecutor, CeleryExecutor, and KubernetesExecutor offer different scalability trade-offs. Running Airflow reliably in production requires significant operational investment.
Dagster: Modern Orchestration for Data Assets
Dagster takes a different approach. Rather than orchestrating tasks, Dagster orchestrates data assets. An asset is a table, file, or ML model that your pipeline produces.
from dagster import asset, Definitions
@asset
def orders_raw():
"""Extract raw orders from PostgreSQL."""
return extract_orders()
@asset
def orders_cleaned(orders_raw):
"""Clean and deduplicate orders."""
return clean_orders(orders_raw)
@asset
def dim_orders(orders_cleaned):
"""Build the orders dimension table."""
return build_dim_orders(orders_cleaned)
@asset
def fact_orders(dim_orders):
"""Build the fact orders table."""
return build_fact_orders(dim_orders)
defs = Definitions(assets=[orders_raw, orders_cleaned, dim_orders, fact_orders])
Dagster’s asset-centric model makes it natural to think about data lineage and data quality at the pipeline level. Each asset can have its own tests, freshness policies, and partitioning scheme.
Dagster also has a better operational story than Airflow. The Dagster web server and daemon can run as a single process for small deployments and scale to a hybrid model for larger ones.
Prefect: Pythonic and Flexible
Prefect is a Python-native orchestration framework that focuses on developer experience. Prefect 2.0 introduced a hybrid model where flows can run anywhere (local, cloud, or self-hosted) while still being orchestrated by Prefect Cloud.
from prefect import flow, task
@task
def extract_orders():
return query_database("SELECT * FROM orders WHERE ...")
@task
def transform_orders(orders):
return clean_and_deduplicate(orders)
@task
def load_to_warehouse(orders):
return snowflake_load(orders)
@flow
def daily_etl():
orders = extract_orders()
cleaned = transform_orders(orders)
load_to_warehouse(cleaned)
Prefect’s hybrid model is appealing: you can run flows locally for testing without any cloud dependency, then register them with Prefect Cloud for production orchestration.
Trigger Rules and Conditional Logic
Pipeline tasks need more than simple sequential execution. They need conditional logic.
Trigger rules determine when a task should run based on upstream outcomes:
all_success(default): All upstream tasks succeededall_failed: All upstream tasks failednone_failed: No upstream task failed (succeeded or skipped)none_skipped: No upstream task was skippedalways: Run regardless of upstream status
# Only generate reports if the warehouse load succeeded
report_task = PythonOperator(
task_id='generate_reports',
python_callable=generate_reports,
trigger_rule='all_success',
)
# Always send notification (whether pipeline succeeded or failed)
notify_task = PythonOperator(
task_id='notify',
python_callable=send_notification,
trigger_rule='none_failed',
)
Failure Handling and Retries
Transient failures (network timeouts, temporary database unavailability) should be handled with automatic retries, not human intervention.
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
}
For non-transient failures, the pipeline should fail gracefully. Store partial results for debugging. Do not leave the destination in a corrupted state. Use staging tables and atomic commits so that a failed pipeline leaves the previous successful state intact.
-- Use a staging table approach for atomic pipeline updates
BEGIN TRANSACTION;
-- Delete current production table
DELETE FROM dim_customers WHERE 1=1;
-- Copy from staging
INSERT INTO dim_customers SELECT * FROM dim_customers_staging;
-- Validate
ALTER TABLE dim_customers VALIDATE;
-- If validation passes, commit
COMMIT;
-- If validation fails, rollback
ROLLBACK;
Observability: Beyond Logging
Pipeline observability means knowing the state of your pipelines without digging through logs.
Pipeline-level metrics: Duration, success/failure rate, SLAs met. These aggregate up to give a dashboard view of overall pipeline health.
Task-level metrics: Per-task duration, retry count, resource usage. These help identify bottlenecks and failures.
Data freshness: When was each table last updated? How stale is the data?
# Example: Prometheus metrics from Airflow
metrics:
- name: pipeline_duration_seconds
type: gauge
labels: [pipeline_name, status]
- name: pipeline_tasks_total
type: counter
labels: [pipeline_name, task_name, status]
- name: data_freshness_minutes
type: gauge
labels: [table_name]
For more on monitoring, see Metrics, Monitoring, and Alerting.
When to Use Pipeline Orchestration
Use orchestration when:
- You have multiple pipeline steps with complex dependencies that are hard to manage in scripts
- Pipelines need retry logic, failure handling, and alerting beyond what cron provides
- Multiple teams need to understand pipeline state without digging through logs
- You need visibility into pipeline SLAs, freshness, and historical performance
Do not use orchestration when:
- Pipelines are simple and linear with no branching dependencies
- One-off data movements that will never run again
- The overhead of maintaining orchestration infrastructure outweighs the benefits for trivial pipelines
DAG Structure Example
A DAG defines task dependencies. Here is a typical ETL DAG with branching logic:
flowchart TD
Start[Daily Trigger] --> Extract[Extract from PostgreSQL]
Extract -->|raw data| Load[Load to Snowflake]
Load -->|staged data| Transform1[dbt: stg_orders]
Load -->|staged data| Transform2[dbt: stg_customers]
Transform1 -->|cleaned| Transform3[dbt: dim_orders]
Transform2 -->|cleaned| Transform3
Transform3 -->|mart| Report[Generate BI Reports]
Report --> Notify[Send Slack Notification]
Extract -->|on failure| Alert[PagerDuty Alert]
Transform3 -->|on failure| Alert
The DAG structure makes dependencies explicit. You can see at a glance that dim_orders depends on both stg_orders and stg_customers, which depend on the initial load.
Trade-off Table: Airflow vs Dagster vs Prefect
| Aspect | Airflow | Dagster | Prefect |
|---|---|---|---|
| Mental model | Tasks in a DAG | Data assets | Tasks and flows |
| Scheduling | Built-in scheduler | Built-in scheduler | Built-in scheduler |
| UI quality | Functional, dated | Modern, asset-centric | Modern |
| Operational complexity | High (DB, workers, scheduler) | Medium | Low (local) to Medium (cloud) |
| dbt integration | Via operator | Native (asset) | Via task |
| Failure handling | Per-task retries | Per-asset retries | Per-task retries |
| Deployment | Standalone | Standalone or cloud | Standalone or cloud |
| Learning curve | Steep | Moderate | Easy |
| Best for | Large ecosystems, existing Airflow teams | Asset-centric pipelines, data quality focus | Python-first teams, developer experience |
Airflow wins for large existing deployments with extensive ecosystem integrations. Dagster wins for teams prioritizing data assets and quality. Prefect wins for Python-first teams who want to start local and scale to cloud.
Orchestration Anti-Patterns
Pipeline orchestration goes wrong in predictable ways:
Blocking sensors: A sensor task that polls for a file or condition every minute for hours holds a worker slot. Use async sensors or external triggering instead.
Excessive cross-DAG dependencies: DAG A waits for DAG B to complete. DAG B waits for DAG C. One failure cascades. Keep cross-DAG dependencies to a minimum and use callback-based triggering where possible.
Hard-coded resource assumptions: A task assumes 4GB of memory is always available. On a saturated worker, it fails repeatedly. Use resource declarations and let the orchestrator handle scheduling.
No pipeline SLAs: A pipeline is marked “done” when it finishes, not when its outputs are fit for use. Define SLAs for data freshness, not just task completion.
Over-granular tasks: Breaking a pipeline into tasks for every single SQL query creates overhead and makes the DAG unreadable. Group related operations into cohesive tasks.
Quick Recap
- Orchestration frameworks manage dependencies, scheduling, retries, and monitoring for complex pipelines.
- Airflow is the ecosystem standard with the steepest operational curve.
- Dagster takes an asset-centric approach that makes data quality explicit.
- Prefect prioritizes developer experience and local-to-cloud flexibility.
- Define trigger rules carefully.
all_successvsnone_faileddetermines whether alerts fire. - Treat pipelines as production systems: SLAs, alerting, and failure recovery are not optional.
Conclusion
Pipeline orchestration turns scripts into reliable, observable workflows. Airflow, Dagster, and Prefect each take different approaches, but all solve the same core problems: dependency management, scheduling, retry logic, and observability.
The choice between them depends on your team’s existing skills, operational capacity, and whether you prefer an asset-centric or task-centric mental model. For teams already using dbt, Dagster’s asset model is a natural fit. For teams with existing Airflow investments, the ecosystem argument is strong.
Whatever framework you choose, treat orchestration as the foundation of your data platform. A well-orchestrated pipeline fails gracefully, retries appropriately, and tells you when something is wrong before your users do.
For related reading on pipeline patterns, see Extract-Transform-Load and Data Quality.
Category
Related Posts
Alerting in Production: Building Alerts That Matter
Build alerting systems that catch real problems without fatigue. Learn alert design principles, severity levels, runbooks, and on-call best practices.
Apache Beam: Portable Batch and Streaming Pipelines
Discover how Apache Beam's unified programming model lets you write batch and streaming pipelines once and run them on Spark, Flink, or cloud runners.
Apache Flink: Advanced Stream Processing at Scale
Apache Flink provides advanced stream processing with sophisticated windowing and event-time handling. Learn its architecture, programming model, and use cases.