~75 min read · updated 2026-05-15

The data lake: bronze ingestion

Land raw data on MinIO as partitioned Parquet, write the first source connector, and adopt the bronze/silver/gold convention every later module assumes.

By the end of this module you will have:

  • A bronze/silver/gold zone layout on MinIO that the rest of the course (warehouse, ML, RAG, vision) writes through.
  • A Python source connector that ingests a public dataset to bronze as partitioned Parquet, idempotently.
  • A working understanding of partitioning, compaction, and the small-files problem — three things that bite every data lake within a quarter of going live.
  • The convention for schema evolution — adding a column to a source without breaking the readers downstream.

The dataset we use throughout the data-engineering capstone is the NYC TLC trip records. It’s public, big enough to be interesting (tens of millions of rows per year), small enough to fit on one server, and a real schema that has evolved over the years — exactly the texture you want to teach against.

The zone layout

A working data lake organizes data by trust level, not by source:

s3://datasets/bronze/<source>/<table>/dt=YYYY-MM-DD/*.parquet
s3://datasets/silver/<table>/dt=YYYY-MM-DD/*.parquet
s3://datasets/gold/<mart>/*.parquet
  • Bronze: raw, append-only, exactly what the source gave us. Type-coerced to Parquet (no CSV in your lake — see below) but otherwise unprocessed. The audit log.
  • Silver: cleaned, deduplicated, conformed types, but still row-level. The thing analytics joins to.
  • Gold: aggregated, business-meaningful, dimensional. The thing the dashboard reads.

The convention is that silver and gold are reproducible from bronze. If silver is corrupted, you re-run silver from bronze. Bronze is the only thing whose loss is unrecoverable, so bronze is the only thing you back up like it matters.

Why Parquet, and never CSV

Quick justification, then we move on:

PropertyCSVParquet
TypedNo (everything is a string)Yes
CompressedNo, or whole-file gzipYes, column-aware (snappy/zstd)
Column projectionRead the whole row, throw away 90%Read only the columns you need
Predicate pushdownNoYes (dt = '2024-01-15' reads ~zero blocks if partitioned)
Streaming-friendlyYesYes (row-group level)
Self-describingNoYes (schema in the footer)

For a 50 GB dataset queried by analysts every day, Parquet is ~10× faster, ~5× smaller on disk, and produces dramatically less GC pressure on the readers. There is no scenario in this course where CSV is the right output format.

Step 1 — Set the buckets up

You created the datasets bucket in module 03. The bronze/silver/gold split is just a prefix convention, not separate buckets — that way Identity policies stay simple.

A small bit of lifecycle housekeeping helps. On the datasets bucket, set a rule that transitions objects older than 30 days to cool storage (if your MinIO is multi-tier) and expires bronze objects older than 2 years. Two years is long enough that any silver/gold rebuild succeeds, short enough that you’re not paying to keep five years of raw trips.

mc ilm rule add local/datasets \
  --prefix "bronze/" --expire-days 730

If your MinIO is single-tier (most are), skip the transition rule.

Step 2 — A typed source connector

The connector pattern this course uses everywhere:

# src/<project>/data/sources/nyc_tlc.py
from __future__ import annotations

import io
import datetime as dt
from pathlib import PurePosixPath

import polars as pl
import requests
import s3fs

BASE = "https://d37ci6vzurychx.cloudfront.net/trip-data"
BRONZE = "s3://datasets/bronze/nyc_tlc/yellow_tripdata"

# Stable schema we coerce *every* file into. Source has drifted across years;
# coercion at the bronze edge is what keeps silver readable.
SCHEMA = {
    "VendorID": pl.Int32,
    "tpep_pickup_datetime": pl.Datetime("us"),
    "tpep_dropoff_datetime": pl.Datetime("us"),
    "passenger_count": pl.Int32,
    "trip_distance": pl.Float64,
    "RatecodeID": pl.Int32,
    "store_and_fwd_flag": pl.Utf8,
    "PULocationID": pl.Int32,
    "DOLocationID": pl.Int32,
    "payment_type": pl.Int32,
    "fare_amount": pl.Float64,
    "tip_amount": pl.Float64,
    "total_amount": pl.Float64,
}


def ingest_month(year: int, month: int, fs: s3fs.S3FileSystem) -> str:
    """Download one month of yellow taxi data, coerce, write to bronze.

    Returns the S3 URI of the written Parquet.
    """
    month_tag = f"{year:04d}-{month:02d}"
    url = f"{BASE}/yellow_tripdata_{month_tag}.parquet"

    target = f"{BRONZE}/dt={month_tag}/data.parquet"
    if fs.exists(target):
        return target  # idempotent: skip if already landed

    # Stream-download to memory (single month is ~80 MB compressed)
    resp = requests.get(url, timeout=60)
    resp.raise_for_status()
    df = pl.read_parquet(io.BytesIO(resp.content))

    # Coerce to the stable schema; drop columns we don't carry forward
    df = df.select([pl.col(c).cast(t) for c, t in SCHEMA.items() if c in df.columns])

    with fs.open(target, "wb") as f:
        df.write_parquet(f, compression="zstd")

    return target

Three things this code does that the naïve version does not:

  • Idempotency. A second run of the same (year, month) is a no-op. This is what makes Prefect retries safe (module 07).
  • Schema coercion at the edge. Bronze is “raw,” but raw with a stable column set and stable types. If the source adds a column next year, you decide whether to carry it forward; you don’t get surprised at the silver layer.
  • Partitioning by dt=<month>. Hive-style partition keys. Polars, DuckDB, dbt, and Trino all understand this convention; a query for WHERE dt = '2024-03' reads exactly one file.

Run it once from a notebook:

import s3fs
from src.churn.data.sources.nyc_tlc import ingest_month

fs = s3fs.S3FileSystem(
    key="platform",
    secret="__platform_secret__",
    client_kwargs={"endpoint_url": "http://<gpu-server>:9000"},
)

for month in range(1, 13):
    ingest_month(2023, month, fs)

Twelve files in MinIO under bronze/nyc_tlc/yellow_tripdata/dt=2023-01/... through dt=2023-12/.... About 1 GB total, queryable in DuckDB without moving it.

Step 3 — Verify with DuckDB

DuckDB reads Parquet from S3 natively. Two queries that should both work from any notebook:

import duckdb

con = duckdb.connect()
con.execute("""
    INSTALL httpfs;
    LOAD httpfs;
    SET s3_endpoint='<gpu-server>:9000';
    SET s3_url_style='path';
    SET s3_use_ssl=false;
    SET s3_access_key_id='platform';
    SET s3_secret_access_key='__platform_secret__';
""")

# Whole year
print(con.execute("""
    SELECT count(*), avg(trip_distance)
    FROM read_parquet('s3://datasets/bronze/nyc_tlc/yellow_tripdata/dt=*/data.parquet',
                      hive_partitioning=1)
""").fetchall())

# One partition — predicate pushdown means this reads ~one file
print(con.execute("""
    SELECT count(*), avg(fare_amount)
    FROM read_parquet('s3://datasets/bronze/nyc_tlc/yellow_tripdata/dt=*/data.parquet',
                      hive_partitioning=1)
    WHERE dt = '2023-07'
""").fetchall())

The second query should return in well under a second. If it doesn’t, partitioning isn’t working — the most common cause is forgetting hive_partitioning=1 in the read_parquet call.

Step 4 — The small-files problem (and how to fight it)

A naïve ingestion writes one file per source request. If your source is daily, you have 365 tiny files per year, 365 round-trips per scan, and every reader spends most of its time on S3 list-and-open overhead instead of actual reading.

The two patterns that fix this:

  • Partition by an intentional grain. Daily for fast-moving data, monthly for slow-moving. Don’t partition by minute “because you can” — you’ll create the small-files problem yourself.
  • Compact periodically. Once a month, rewrite the partition as a single file. Compaction is just read partition → write one big file → atomic replace.

A minimal compaction routine for a monthly partition:

def compact_partition(fs: s3fs.S3FileSystem, prefix: str) -> None:
    """Rewrite all files under `prefix` as a single sorted Parquet."""
    files = fs.ls(prefix)
    df = pl.concat([pl.read_parquet(f"s3://{f}") for f in files])
    df = df.sort("tpep_pickup_datetime")
    with fs.open(f"s3://{prefix}/data.parquet", "wb") as f:
        df.write_parquet(f, compression="zstd")
    for f in files:
        if not f.endswith("data.parquet"):
            fs.rm(f"s3://{f}")

Sorting before writing is a free win — Parquet’s row-group statistics let queries skip blocks whose min/max ranges don’t match the predicate.

Step 5 — Schema evolution as a first-class event

Sources drift. A column is added, a column is renamed, a column’s type changes. The wrong response is “let it break and fix it.” The right response, in order:

  1. Detect the drift at the bronze edge. Polars will refuse to cast an unexpected column type; that refusal is the signal.
  2. Decide. Add the new column to SCHEMA? Coerce a renamed column? Drop a deprecated one?
  3. Bump a version. Write an _metadata.json next to the data with {"schema_version": 7, "added": ["airport_fee"], "removed": []}. Silver readers check this and decide whether they need to be re-run for older partitions.
  4. Add a test. Great Expectations or just a pytest that loads the latest partition and asserts the schema matches. Run it in Prefect every day (module 07).

This pattern is unglamorous. It is also the single biggest reason real pipelines outlive their authors.

ADR 0005 — Bronze/silver/gold over staged warehouses

/srv/shared/adr/0005-lake-zones.md:

# ADR 0005 — Data lake zones: bronze / silver / gold

## Status
Accepted, 2026-05-15.

## Context
Raw data lands from multiple sources and is consumed by analytics (Superset),
ML (training pipelines), and downstream services. We need a single convention
for where data lives at each stage of trust.

## Decision
Three zones in one MinIO bucket, by prefix:

- `s3://datasets/bronze/<source>/<table>/dt=YYYY-MM-DD/` — raw, typed Parquet
- `s3://datasets/silver/<table>/dt=YYYY-MM-DD/`        — cleaned, deduplicated
- `s3://datasets/gold/<mart>/`                          — aggregated, business-ready

Silver and gold must be reproducible from bronze. Bronze is the only zone
backed up out-of-cluster.

## Consequences
- A bug in silver costs a re-run, not a re-ingestion.
- Storage cost is ~2× the raw footprint (bronze + silver kept side by side).
- Sources that go away no longer leave us stranded — bronze is the source of
  truth in perpetuity.

## Alternatives considered
- Staged warehouses in Postgres only. Rejected: doesn't scale to the file
  sizes we'll hit in modules 13 and 14 (model weights, image datasets).
- Lakehouse format (Delta / Iceberg). Strongly considered. Deferred: adds
  one more service (the metastore) for marginal benefit at this scale. We
  may revisit if/when concurrent writers become a real problem.

Recap and what’s next

You now have:

  • The zone convention every later module assumes.
  • A working source connector with idempotency, typed coercion, and partitioning.
  • A read path verified through DuckDB with predicate pushdown actually working.
  • The discipline for schema evolution and the routine for compaction.

The lake holds raw and shaped row-level data. The warehouse is where you ask questions of it. That’s module 06.


Next: 06 — The warehouse: Postgres + dbt.