Orchestration with Prefect
Stand up the Prefect server, turn the bronze ingestion into a real flow, schedule it daily, wire retries and a Slack/email alert on failure, and trigger dbt from the same flow.
By the end of this module you will have:
- The Prefect server running on the GPU server, with its own Postgres database and a web UI.
- A work pool and a worker so flows actually execute, not just sit “scheduled.”
- The bronze ingestion from module 05 turned into a
@flowwith retries, idempotency, and a daily schedule. - A
dbt buildstep triggered from the same flow after ingestion succeeds. - A failure alert (Slack, email, or webhook) wired to the flow.
- An ADR pinning the orchestrator choice.
If module 05 was “write the connector” and module 06 was “shape the data in SQL,” this module is what runs all of it without anyone watching.
Why Prefect over Airflow
Three real options for orchestration on a single server:
| Option | Strengths | Weaknesses |
|---|---|---|
| Prefect 2/3 | Pythonic, light, dynamic DAGs, friendly UI, easy local install | Smaller ecosystem than Airflow; some operators you’d have in Airflow you write yourself |
| Airflow | Industry default, huge operator library, easy hiring signal | Heavy, opinionated about DAG style, painful in single-machine dev, container model is fiddly |
| Dagster | Strong asset-graph model, lineage native | Newer, smaller community; great if you go all-in, less great as a “schedule a script” tool |
For a ≤6-student course running on one server, Prefect wins on install simplicity and the Pythonic flow model. If your students will join Airflow shops after, swap accordingly — the patterns in this module (idempotency, retries, sensors) transfer.
Step 1 — Install Prefect server
Prefect needs a database (use the existing Postgres) and a UI service. As usual, docker-compose:
docker exec -it gitea-postgres psql -U gitea -d postgres -c "
CREATE DATABASE prefect;
CREATE USER prefect WITH PASSWORD '__prefect_password__';
GRANT ALL PRIVILEGES ON DATABASE prefect TO prefect;
"
/opt/prefect/docker-compose.yml:
services:
prefect-server:
image: prefecthq/prefect:3-python3.12
container_name: prefect-server
restart: unless-stopped
network_mode: host
environment:
- PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://prefect:__prefect_password__@localhost:5432/prefect
- PREFECT_UI_API_URL=http://<gpu-server>:4200/api
- PREFECT_API_URL=http://localhost:4200/api
command: prefect server start --host 0.0.0.0 --port 4200
prefect-worker:
image: prefecthq/prefect:3-python3.12
container_name: prefect-worker
restart: unless-stopped
network_mode: host
environment:
- PREFECT_API_URL=http://localhost:4200/api
volumes:
- /home:/home
- /srv/shared:/srv/shared
- /var/run/docker.sock:/var/run/docker.sock
command: prefect worker start --pool 'default' --type process
Bring it up and confirm the UI:
sudo docker compose -f /opt/prefect/docker-compose.yml up -d
curl -s http://localhost:4200/api/health
# Open http://<gpu-server>:4200 in the browser
Create the work pool (one-time):
prefect work-pool create default --type process
Step 2 — A flow with retries and idempotency
Open the project from module 05 (the one with the nyc_tlc source connector). Add a flow alongside it:
# src/<project>/flows/ingest_nyc_tlc.py
from __future__ import annotations
import datetime as dt
import os
import s3fs
from prefect import flow, task, get_run_logger
from prefect.tasks import exponential_backoff
from src.<project>.data.sources.nyc_tlc import ingest_month
@task(retries=3, retry_delay_seconds=exponential_backoff(60), retry_jitter_factor=0.5)
def ingest_one_month(year: int, month: int) -> str:
logger = get_run_logger()
fs = s3fs.S3FileSystem(
key=os.environ["MINIO_ACCESS_KEY"],
secret=os.environ["MINIO_SECRET_KEY"],
client_kwargs={"endpoint_url": os.environ["MINIO_ENDPOINT_URL"]},
)
target = ingest_month(year, month, fs)
logger.info("Wrote %s", target)
return target
@flow(name="ingest-nyc-tlc")
def ingest_nyc_tlc(year: int | None = None) -> list[str]:
"""Ingest one year of yellow-taxi data to bronze. Defaults to last year."""
if year is None:
year = (dt.date.today() - dt.timedelta(days=30)).year
out = []
for month in range(1, 13):
out.append(ingest_one_month.submit(year, month))
return [t.result() for t in out]
Two things that look small and matter a lot:
@task(retries=3, retry_delay_seconds=exponential_backoff(60))— a flaky source (HTTP timeouts) doesn’t fail the whole flow; the task retries up to three times with 60s/120s/240s waits.@task.submitthen.result()— Prefect runs the twelve months in parallel against the same worker pool. With a worker that has 4 threads, four months land at a time.
Idempotency is already in the underlying ingest_month (skip-if-exists). The flow inherits that property automatically — re-running it never duplicates data.
Step 3 — Deploy and schedule
A deployment in Prefect is a flow plus a schedule plus a work pool. Create one:
prefect deploy src/<project>/flows/ingest_nyc_tlc.py:ingest_nyc_tlc \
--name daily \
--pool default \
--cron "0 4 * * *" \
--param year=2024
A daily 04:00 schedule is enough for a monthly-grained source. Real-world: match the schedule to the source’s update cadence — never schedule more frequently than the source actually produces.
In the UI, the new deployment is visible under Deployments, with the next-scheduled time and the cron expression. Trigger one manual run from the UI to confirm the worker is alive.
Step 4 — Chain dbt after ingestion
The flow is incomplete without rebuilding the warehouse. Append a dbt task:
import subprocess
from prefect import task
@task
def dbt_build() -> None:
logger = get_run_logger()
result = subprocess.run(
["uv", "run", "dbt", "build", "--profiles-dir", os.environ["DBT_PROFILES_DIR"]],
cwd="/home/<user>/work/<project>/analytics",
env={**os.environ, "DBT_PG_PASSWORD": os.environ["DBT_PG_PASSWORD"]},
capture_output=True,
text=True,
)
logger.info(result.stdout)
if result.returncode != 0:
logger.error(result.stderr)
raise RuntimeError("dbt build failed")
Then in ingest_nyc_tlc, after the ingestion fan-out:
@flow(name="ingest-nyc-tlc")
def ingest_nyc_tlc(year: int | None = None) -> None:
if year is None:
year = (dt.date.today() - dt.timedelta(days=30)).year
futures = [ingest_one_month.submit(year, m) for m in range(1, 13)]
[f.result() for f in futures] # wait for all
dbt_build() # only runs if all ingestion succeeded
This is the gate pattern: warehouse rebuilds only happen on a green bronze refresh. A partial ingestion does not produce half-fresh marts.
Step 5 — Wire a failure alert
In the UI: Settings → Automations → Create. Trigger on Flow run state changes → Failed. Action: send a Slack message, an email, or hit a webhook. For Slack:
# Or programmatically:
prefect block register -m prefect_slack
# Then in the UI, create a SlackWebhook block from your incoming-webhook URL
A working alert routes from the flow’s failure straight to the channel the instructor watches. The wrong default is “I’ll check the UI in the morning.” Failures need to find you.
Step 6 — Sensor flows for the lake
Some pipelines need to wait for something. A second flow that watches for a file’s arrival and only then kicks off processing:
from prefect import flow, task
import s3fs, time
@task
def wait_for_partition(prefix: str, timeout_minutes: int = 60) -> str:
fs = s3fs.S3FileSystem(...)
start = time.time()
while time.time() - start < timeout_minutes * 60:
if fs.exists(prefix):
return prefix
time.sleep(30)
raise TimeoutError(f"{prefix} did not appear within {timeout_minutes}m")
@flow
def silver_after_bronze(dt: str):
wait_for_partition(f"s3://datasets/bronze/nyc_tlc/yellow_tripdata/dt={dt}/data.parquet")
dbt_build()
This is the alternative to “schedule both jobs and hope the upstream finishes first” — explicit dependency, never a race.
Step 7 — Backfills
The clean way to re-run history:
for year in 2020 2021 2022 2023 2024; do
prefect deployment run "ingest-nyc-tlc/daily" --param year=$year
done
The flow is idempotent, so this is safe to run repeatedly. Backfills are something every pipeline will need exactly once, at the worst possible moment; make sure your flow supports them from day one.
ADR 0007 — Prefect over Airflow
/srv/shared/adr/0007-orchestrator.md:
# ADR 0007 — Orchestrator: Prefect 3 over Airflow
## Status
Accepted, 2026-05-15.
## Context
The platform needs a scheduler that runs Python flows, retries on failure,
supports backfills, and integrates with dbt. The three real options were
Prefect 3, Airflow 2, and Dagster.
## Decision
Prefect 3. One container for the server, one for the worker, Postgres for
state, `process` worker type (jobs run as subprocesses on the host).
## Consequences
- Pro: deployment is a docker-compose stanza; no separate `webserver`,
`scheduler`, `worker`, `triggerer` like Airflow's standard install.
- Pro: dynamic DAGs are first-class — no DAG-file scanning loop.
- Con: smaller operator ecosystem than Airflow; expect to write a few
glue tasks the Airflow community already published.
## Alternatives considered
- Airflow 2. The default in industry but heavy and awkward for a one-host
deployment with two GPUs. The Airflow API and CLI are also more
permissions-heavy than is justified at this scale.
- Dagster. Strong asset-graph model. We chose Prefect for the lighter
developer ergonomics; will revisit if asset-first thinking becomes a
pain point.
Recap and what’s next
You now have:
- A scheduled, retry-bounded, idempotent ingestion flow.
- A dbt build chained after a green ingestion.
- Failure alerting that routes to a channel a human reads.
- The sensor and backfill patterns for the things that will inevitably come up.
The warehouse is now self-maintaining. Time to put a face on it — Superset dashboards. That’s the data-engineering capstone.
Next: 08 — Analytics with Superset (Capstone 2).