Serving and drift
Wrap the registered model in a FastAPI service, instrument it for Prometheus, monitor drift with Evidently, and ship a Grafana dashboard that an on-call engineer can read at 3 AM.
By the end of this module you will have:
- A FastAPI service loading the model from the MLflow Registry by stage (
StagingorProduction), not by hard-coded path. - A Prometheus scrape endpoint exposing request rate, latency histogram, prediction distribution, and the live business-cost-weighted error rate.
- Grafana running on the server with a pre-built dashboard for the service.
- Evidently computing data-drift and target-drift reports on a daily cadence, written to MinIO and alerted on.
- An alerting rule that fires when drift crosses a threshold or the production model’s hit-rate falls more than 10% below its registered baseline.
This is the module that turns a trained model into something an SRE would not be afraid to put behind a customer-facing endpoint.
The five things a model service must do
Forget your favorite framework for a second. The minimum a model service has to do:
- Load the model from a registry, not from disk. No hard-coded
model.pklpaths. - Validate inputs. The feature schema is part of the contract; reject malformed inputs at the door.
- Emit metrics. RED + USE + business outcome.
- Log every prediction. Sampled in high-traffic services; full in low-traffic. Without prediction logs, drift detection is guessing.
- Health and readiness probes that distinguish “process is up” from “model is loaded and warm.”
FastAPI gives you (1) for free, makes (2) trivial via Pydantic, and is the right place to wire (3), (4), and (5).
Step 1 — A FastAPI skeleton
# src/<project>/serve/app.py
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from typing import Literal
import mlflow.pyfunc
import polars as pl
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
MODEL_NAME = os.environ.get("MODEL_NAME", "nyc-tip-classifier")
MODEL_STAGE = os.environ.get("MODEL_STAGE", "Production")
mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
# Metrics
PRED_COUNT = Counter("predictions_total", "Predictions made", ["outcome"])
LATENCY = Histogram("prediction_latency_seconds", "Inference latency",
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5])
PROB_DIST = Histogram("prediction_probability", "Distribution of predicted probabilities",
buckets=[0.05 * i for i in range(21)])
MODEL_GAUGE = Gauge("model_loaded", "1 if model is loaded")
class TripFeatures(BaseModel):
distance_miles: float
duration_min: float
fare_amount: float
passenger_count: int = Field(ge=0, le=8)
pickup_hour: int = Field(ge=0, le=23)
pickup_dow: int = Field(ge=0, le=6)
pickup_month: int = Field(ge=1, le=12)
payment_type: Literal[1, 2, 3, 4]
pickup_borough: str
dropoff_borough: str
intra_borough: Literal[0, 1]
class TripPrediction(BaseModel):
probability: float
label: Literal[0, 1]
threshold: float
model_version: str
state: dict = {"model": None, "version": None, "threshold": None}
@asynccontextmanager
async def lifespan(app: FastAPI):
model = mlflow.pyfunc.load_model(f"models:/{MODEL_NAME}/{MODEL_STAGE}")
state["model"] = model
state["version"] = model.metadata.run_id
state["threshold"] = float(model.metadata.metrics.get("chosen_threshold", 0.5))
MODEL_GAUGE.set(1)
yield
MODEL_GAUGE.set(0)
app = FastAPI(lifespan=lifespan)
app.mount("/metrics", make_asgi_app())
@app.get("/healthz")
def healthz():
return {"ok": True}
@app.get("/readyz")
def readyz():
if state["model"] is None:
raise HTTPException(status_code=503, detail="model not loaded")
return {"ok": True, "version": state["version"]}
@app.post("/predict", response_model=TripPrediction)
def predict(features: TripFeatures) -> TripPrediction:
with LATENCY.time():
df = pl.from_dicts([features.model_dump()]).to_pandas()
prob = float(state["model"].predict(df)[0])
threshold = state["threshold"]
label = int(prob >= threshold)
PROB_DIST.observe(prob)
PRED_COUNT.labels(outcome="positive" if label == 1 else "negative").inc()
return TripPrediction(
probability=prob, label=label,
threshold=threshold, model_version=state["version"],
)
Notice: the service has no idea which model version it’s serving. It loads models:/nyc-tip-classifier/Production and lets MLflow resolve the URI. To swap the served model, you transition a different version to Production in the registry — no redeploy.
Step 2 — Run it via Slurm or just docker compose
For development:
uv run uvicorn src.<project>.serve.app:app --host 0.0.0.0 --port 8080
For “production” on the server:
/opt/<project>/docker-compose.yml:
services:
tip-classifier:
image: <project>:latest
container_name: tip-classifier
restart: unless-stopped
network_mode: host
environment:
- MLFLOW_TRACKING_URI=http://localhost:5000
- MODEL_NAME=nyc-tip-classifier
- MODEL_STAGE=Production
- AWS_ACCESS_KEY_ID=platform
- AWS_SECRET_ACCESS_KEY=__platform_secret__
- MLFLOW_S3_ENDPOINT_URL=http://localhost:9000
command: uvicorn src.<project>.serve.app:app --host 0.0.0.0 --port 8080
Load-test it:
ab -n 5000 -c 50 -p body.json -T 'application/json' http://localhost:8080/predict
Expect sub-25 ms p99 for this size of LightGBM model. If it’s much higher, the bottleneck is Polars-to-Pandas conversion (one row at a time); for high-throughput services, batch the inference and accept lists instead of single rows.
Step 3 — Log every prediction
Predictions must be logged somewhere durable, with the model version and timestamp. Two reasonable destinations: a Postgres table or a Parquet file appended via an async worker. For a course we use Postgres for simplicity:
import asyncpg
@app.post("/predict")
async def predict(features: TripFeatures) -> TripPrediction:
# … inference as before …
if app.state.db_pool:
async with app.state.db_pool.acquire() as conn:
await conn.execute(
"INSERT INTO ml.prediction_log "
"(model_name, model_version, ts, features, probability, label) "
"VALUES ($1, $2, now(), $3::jsonb, $4, $5)",
MODEL_NAME, state["version"], features.model_dump_json(), prob, label,
)
The schema:
CREATE SCHEMA ml;
CREATE TABLE ml.prediction_log (
id bigserial PRIMARY KEY,
model_name text NOT NULL,
model_version text NOT NULL,
ts timestamptz NOT NULL,
features jsonb NOT NULL,
probability double precision NOT NULL,
label int NOT NULL
);
CREATE INDEX ON ml.prediction_log (model_name, ts DESC);
A high-traffic service should batch and write to S3 instead. For a course, Postgres is fine.
Step 4 — Prometheus and Grafana
Prometheus scrapes the service every 15 seconds.
/opt/prometheus/prometheus.yml:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'tip-classifier'
static_configs:
- targets: ['localhost:8080']
labels:
service: 'tip-classifier'
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
- job_name: 'nvidia-dcgm'
static_configs:
- targets: ['localhost:9400']
/opt/prometheus/docker-compose.yml:
services:
prometheus:
image: prom/prometheus:v2.55.0
container_name: prometheus
restart: unless-stopped
volumes:
- /opt/prometheus:/etc/prometheus
- /var/lib/prometheus:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention.time=30d'
ports:
- "9090:9090"
grafana:
image: grafana/grafana-oss:11.3.0
container_name: grafana
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_PASSWORD=__grafana_password__
ports:
- "3001:3000"
volumes:
- /var/lib/grafana:/var/lib/grafana
node_exporter:
image: prom/node-exporter:v1.8.2
container_name: node-exporter
restart: unless-stopped
network_mode: host
dcgm_exporter:
image: nvcr.io/nvidia/k8s/dcgm-exporter:3.3.7-3.5.0-ubuntu22.04
container_name: dcgm-exporter
restart: unless-stopped
runtime: nvidia
cap_add: [SYS_ADMIN]
ports:
- "9400:9400"
The DCGM exporter is what makes GPU metrics (utilization, temperature, power, VRAM use) flow into Grafana — essential for later modules when you’re sharing GPUs and someone asks “is that vLLM endpoint actually using a GPU?”
In Grafana, add Prometheus as a datasource. Import a starter dashboard with the panels:
| Panel | Query |
|---|---|
| Requests / sec | sum(rate(predictions_total[1m])) |
| p50 / p99 latency | histogram_quantile(0.99, rate(prediction_latency_seconds_bucket[5m])) |
| Positive-prediction rate | sum(rate(predictions_total{outcome="positive"}[5m])) / sum(rate(predictions_total[5m])) |
| GPU utilization | DCGM_FI_DEV_GPU_UTIL |
| Slurm queued jobs | (from slurm_exporter if installed; else skip) |
Step 5 — Evidently for drift
Drift comes in three flavors that matter:
- Data drift. Input feature distributions shift (NYC tipping patterns post-pandemic, say).
- Concept drift. The relationship between inputs and outputs shifts.
- Prediction drift. The model’s output distribution shifts even if inputs look similar.
Evidently computes all three from your prediction log against a reference dataset (typically the training set).
A daily Prefect task:
from prefect import flow, task
from evidently import Report
from evidently.metrics import DataDriftPreset, RegressionPreset, ClassificationPreset
@task
def daily_drift_report() -> None:
reference = pl.read_parquet("s3://datasets/silver/training_snapshot/2024-01-01.parquet")
current = pl.read_database(
"SELECT (features).* FROM ml.prediction_log WHERE ts > now() - interval '1 day'",
os.environ["WAREHOUSE_URI"]
)
report = Report(metrics=[DataDriftPreset()])
snap = report.run(reference_data=reference.to_pandas(),
current_data=current.to_pandas())
snap.save_html(f"/srv/shared/scratch/drift-{dt.date.today()}.html")
snap.save_json("s3://datasets/silver/drift-reports/")
# Pull out the headline number and route alerts
drift_share = snap.as_dict()["metrics"][0]["result"]["dataset_drift"]
if drift_share > 0.2:
notify_slack(f"Data drift {drift_share:.0%} — investigate")
Threshold of 20% drifted columns is a starter; tune by experience. The point is the alert is specific — it tells you which columns drifted — not the generic “something changed.”
Step 6 — Alerting rules
In Prometheus, /opt/prometheus/rules/serving.yml:
groups:
- name: model-serving
rules:
- alert: HighLatency
expr: histogram_quantile(0.99, rate(prediction_latency_seconds_bucket[5m])) > 0.1
for: 5m
labels: { severity: warning }
annotations:
summary: "Tip classifier p99 latency > 100ms"
- alert: PredictionDistributionShift
expr: |
abs(
sum(rate(predictions_total{outcome="positive"}[1h]))
/ sum(rate(predictions_total[1h]))
-
sum(rate(predictions_total{outcome="positive"}[24h]))
/ sum(rate(predictions_total[24h]))
) > 0.1
for: 30m
annotations:
summary: "Positive-rate has drifted >10% from 24h baseline"
- alert: ModelNotLoaded
expr: model_loaded == 0
for: 2m
labels: { severity: page }
annotations:
summary: "Tip classifier failed to load a model"
Route alerts via Alertmanager to Slack or email. The third one is the only page-severity rule in the platform — everything else is “look at it next morning.”
Step 7 — Build a one-page runbook
For every service the platform runs, write a one-page runbook in runbooks/<service>.md:
# Runbook — tip-classifier
## What it does
Serves `nyc-tip-classifier` (currently version <id>) at /predict.
## Where it lives
- Container: `tip-classifier` on `<gpu-server>`
- MLflow tracking: http://<gpu-server>:5000
- Logs: `docker logs tip-classifier`
## SLOs
- Availability: 99.5% during weekdays 09:00–18:00 UTC.
- Latency: p99 < 100 ms.
- Freshness: model retrained quarterly; alert if drift > 20%.
## Page conditions
- `ModelNotLoaded` for >2 min
- Health check `/readyz` failing for >5 min
## Diagnosis playbook
1. Is the container up? `docker ps | grep tip-classifier`
2. Is the model loaded? `curl :8080/readyz`
3. Is MLflow up? `curl :5000/api/2.0/mlflow/registered-models/list`
4. Is MinIO reachable? `mc ls local/mlflow-artifacts/`
## Rollback
The previous model version is in MLflow Registry. Transition the prior
version to `Production` and restart the container.
## Owner
@instructor — escalate to #ml-platform Slack channel.
The runbook is graded for being useful at 3 AM, not for being thorough. “What command do I type first” beats “comprehensive overview of system architecture.”
Recap and what’s next
You have a real service:
- Loaded from the registry by stage.
- Emitting metrics scraped by Prometheus.
- Logging every prediction for downstream drift analysis.
- Watched by drift reports and Prometheus alert rules.
- Documented with a one-page runbook.
The next four modules pivot to LLMs and computer vision. Module 11 stands up the self-hosted LLM endpoint that the RAG capstone will use.
Next: 11 — Self-hosted LLM.