~75 min read · updated 2026-05-15

Orchestration with Prefect

Stand up the Prefect server, turn the bronze ingestion into a real flow, schedule it daily, wire retries and a Slack/email alert on failure, and trigger dbt from the same flow.

By the end of this module you will have:

  • The Prefect server running on the GPU server, with its own Postgres database and a web UI.
  • A work pool and a worker so flows actually execute, not just sit “scheduled.”
  • The bronze ingestion from module 05 turned into a @flow with retries, idempotency, and a daily schedule.
  • A dbt build step triggered from the same flow after ingestion succeeds.
  • A failure alert (Slack, email, or webhook) wired to the flow.
  • An ADR pinning the orchestrator choice.

If module 05 was “write the connector” and module 06 was “shape the data in SQL,” this module is what runs all of it without anyone watching.

Why Prefect over Airflow

Three real options for orchestration on a single server:

OptionStrengthsWeaknesses
Prefect 2/3Pythonic, light, dynamic DAGs, friendly UI, easy local installSmaller ecosystem than Airflow; some operators you’d have in Airflow you write yourself
AirflowIndustry default, huge operator library, easy hiring signalHeavy, opinionated about DAG style, painful in single-machine dev, container model is fiddly
DagsterStrong asset-graph model, lineage nativeNewer, smaller community; great if you go all-in, less great as a “schedule a script” tool

For a ≤6-student course running on one server, Prefect wins on install simplicity and the Pythonic flow model. If your students will join Airflow shops after, swap accordingly — the patterns in this module (idempotency, retries, sensors) transfer.

Step 1 — Install Prefect server

Prefect needs a database (use the existing Postgres) and a UI service. As usual, docker-compose:

docker exec -it gitea-postgres psql -U gitea -d postgres -c "
  CREATE DATABASE prefect;
  CREATE USER prefect WITH PASSWORD '__prefect_password__';
  GRANT ALL PRIVILEGES ON DATABASE prefect TO prefect;
"

/opt/prefect/docker-compose.yml:

services:
  prefect-server:
    image: prefecthq/prefect:3-python3.12
    container_name: prefect-server
    restart: unless-stopped
    network_mode: host
    environment:
      - PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://prefect:__prefect_password__@localhost:5432/prefect
      - PREFECT_UI_API_URL=http://<gpu-server>:4200/api
      - PREFECT_API_URL=http://localhost:4200/api
    command: prefect server start --host 0.0.0.0 --port 4200

  prefect-worker:
    image: prefecthq/prefect:3-python3.12
    container_name: prefect-worker
    restart: unless-stopped
    network_mode: host
    environment:
      - PREFECT_API_URL=http://localhost:4200/api
    volumes:
      - /home:/home
      - /srv/shared:/srv/shared
      - /var/run/docker.sock:/var/run/docker.sock
    command: prefect worker start --pool 'default' --type process

Bring it up and confirm the UI:

sudo docker compose -f /opt/prefect/docker-compose.yml up -d
curl -s http://localhost:4200/api/health
# Open http://<gpu-server>:4200 in the browser

Create the work pool (one-time):

prefect work-pool create default --type process

Step 2 — A flow with retries and idempotency

Open the project from module 05 (the one with the nyc_tlc source connector). Add a flow alongside it:

# src/<project>/flows/ingest_nyc_tlc.py
from __future__ import annotations

import datetime as dt
import os

import s3fs
from prefect import flow, task, get_run_logger
from prefect.tasks import exponential_backoff

from src.<project>.data.sources.nyc_tlc import ingest_month


@task(retries=3, retry_delay_seconds=exponential_backoff(60), retry_jitter_factor=0.5)
def ingest_one_month(year: int, month: int) -> str:
    logger = get_run_logger()
    fs = s3fs.S3FileSystem(
        key=os.environ["MINIO_ACCESS_KEY"],
        secret=os.environ["MINIO_SECRET_KEY"],
        client_kwargs={"endpoint_url": os.environ["MINIO_ENDPOINT_URL"]},
    )
    target = ingest_month(year, month, fs)
    logger.info("Wrote %s", target)
    return target


@flow(name="ingest-nyc-tlc")
def ingest_nyc_tlc(year: int | None = None) -> list[str]:
    """Ingest one year of yellow-taxi data to bronze. Defaults to last year."""
    if year is None:
        year = (dt.date.today() - dt.timedelta(days=30)).year

    out = []
    for month in range(1, 13):
        out.append(ingest_one_month.submit(year, month))
    return [t.result() for t in out]

Two things that look small and matter a lot:

  • @task(retries=3, retry_delay_seconds=exponential_backoff(60)) — a flaky source (HTTP timeouts) doesn’t fail the whole flow; the task retries up to three times with 60s/120s/240s waits.
  • @task.submit then .result() — Prefect runs the twelve months in parallel against the same worker pool. With a worker that has 4 threads, four months land at a time.

Idempotency is already in the underlying ingest_month (skip-if-exists). The flow inherits that property automatically — re-running it never duplicates data.

Step 3 — Deploy and schedule

A deployment in Prefect is a flow plus a schedule plus a work pool. Create one:

prefect deploy src/<project>/flows/ingest_nyc_tlc.py:ingest_nyc_tlc \
  --name daily \
  --pool default \
  --cron "0 4 * * *" \
  --param year=2024

A daily 04:00 schedule is enough for a monthly-grained source. Real-world: match the schedule to the source’s update cadence — never schedule more frequently than the source actually produces.

In the UI, the new deployment is visible under Deployments, with the next-scheduled time and the cron expression. Trigger one manual run from the UI to confirm the worker is alive.

Step 4 — Chain dbt after ingestion

The flow is incomplete without rebuilding the warehouse. Append a dbt task:

import subprocess
from prefect import task

@task
def dbt_build() -> None:
    logger = get_run_logger()
    result = subprocess.run(
        ["uv", "run", "dbt", "build", "--profiles-dir", os.environ["DBT_PROFILES_DIR"]],
        cwd="/home/<user>/work/<project>/analytics",
        env={**os.environ, "DBT_PG_PASSWORD": os.environ["DBT_PG_PASSWORD"]},
        capture_output=True,
        text=True,
    )
    logger.info(result.stdout)
    if result.returncode != 0:
        logger.error(result.stderr)
        raise RuntimeError("dbt build failed")

Then in ingest_nyc_tlc, after the ingestion fan-out:

@flow(name="ingest-nyc-tlc")
def ingest_nyc_tlc(year: int | None = None) -> None:
    if year is None:
        year = (dt.date.today() - dt.timedelta(days=30)).year

    futures = [ingest_one_month.submit(year, m) for m in range(1, 13)]
    [f.result() for f in futures]      # wait for all
    dbt_build()                         # only runs if all ingestion succeeded

This is the gate pattern: warehouse rebuilds only happen on a green bronze refresh. A partial ingestion does not produce half-fresh marts.

Step 5 — Wire a failure alert

In the UI: Settings → Automations → Create. Trigger on Flow run state changes → Failed. Action: send a Slack message, an email, or hit a webhook. For Slack:

# Or programmatically:
prefect block register -m prefect_slack
# Then in the UI, create a SlackWebhook block from your incoming-webhook URL

A working alert routes from the flow’s failure straight to the channel the instructor watches. The wrong default is “I’ll check the UI in the morning.” Failures need to find you.

Step 6 — Sensor flows for the lake

Some pipelines need to wait for something. A second flow that watches for a file’s arrival and only then kicks off processing:

from prefect import flow, task
import s3fs, time

@task
def wait_for_partition(prefix: str, timeout_minutes: int = 60) -> str:
    fs = s3fs.S3FileSystem(...)
    start = time.time()
    while time.time() - start < timeout_minutes * 60:
        if fs.exists(prefix):
            return prefix
        time.sleep(30)
    raise TimeoutError(f"{prefix} did not appear within {timeout_minutes}m")

@flow
def silver_after_bronze(dt: str):
    wait_for_partition(f"s3://datasets/bronze/nyc_tlc/yellow_tripdata/dt={dt}/data.parquet")
    dbt_build()

This is the alternative to “schedule both jobs and hope the upstream finishes first” — explicit dependency, never a race.

Step 7 — Backfills

The clean way to re-run history:

for year in 2020 2021 2022 2023 2024; do
    prefect deployment run "ingest-nyc-tlc/daily" --param year=$year
done

The flow is idempotent, so this is safe to run repeatedly. Backfills are something every pipeline will need exactly once, at the worst possible moment; make sure your flow supports them from day one.

ADR 0007 — Prefect over Airflow

/srv/shared/adr/0007-orchestrator.md:

# ADR 0007 — Orchestrator: Prefect 3 over Airflow

## Status
Accepted, 2026-05-15.

## Context
The platform needs a scheduler that runs Python flows, retries on failure,
supports backfills, and integrates with dbt. The three real options were
Prefect 3, Airflow 2, and Dagster.

## Decision
Prefect 3. One container for the server, one for the worker, Postgres for
state, `process` worker type (jobs run as subprocesses on the host).

## Consequences
- Pro: deployment is a docker-compose stanza; no separate `webserver`,
  `scheduler`, `worker`, `triggerer` like Airflow's standard install.
- Pro: dynamic DAGs are first-class — no DAG-file scanning loop.
- Con: smaller operator ecosystem than Airflow; expect to write a few
  glue tasks the Airflow community already published.

## Alternatives considered
- Airflow 2. The default in industry but heavy and awkward for a one-host
  deployment with two GPUs. The Airflow API and CLI are also more
  permissions-heavy than is justified at this scale.
- Dagster. Strong asset-graph model. We chose Prefect for the lighter
  developer ergonomics; will revisit if asset-first thinking becomes a
  pain point.

Recap and what’s next

You now have:

  • A scheduled, retry-bounded, idempotent ingestion flow.
  • A dbt build chained after a green ingestion.
  • Failure alerting that routes to a channel a human reads.
  • The sensor and backfill patterns for the things that will inevitably come up.

The warehouse is now self-maintaining. Time to put a face on it — Superset dashboards. That’s the data-engineering capstone.


Next: 08 — Analytics with Superset (Capstone 2).