RAG application
Ship a RAG application against a real corpus on the platform's self-hosted LLM: hybrid retrieval (BM25 + dense) in pgvector, a cross-encoder reranker, Langfuse tracing, and an offline eval set you actually trust.
This is the LLM application capstone. By the end you will have:
- A document ingestion pipeline that chunks, embeds, and indexes a real corpus into Postgres + pgvector.
- Hybrid retrieval combining BM25 keyword scores and dense vector similarity, fused with Reciprocal Rank Fusion.
- A cross-encoder reranker that re-orders the top-k candidates by query-document relevance.
- A FastAPI chat endpoint wired to the vLLM endpoint from module 11, with grounded answers and source citations.
- Langfuse tracing every step (retrieval, rerank, prompt, response) for offline analysis.
- An offline eval set of ≥30 graded examples with a measured retrieval-and-answer quality baseline.
This is a long module. Plan two sittings.
Capstone 3 — Brief
Mission. You are building a RAG assistant for an internal policy/regulation/research corpus of your choice. The instructor will accept any corpus with these properties:
- Real. A scrape from a website, an organization’s docs export, regulatory PDFs, your own blog archive — not an LLM-generated toy.
- At least 500 documents, totaling at least 5 MB of text.
- Has authoritative answers. The point of RAG is to be grounded; you need a way to actually verify groundedness.
Sample corpora that work well: SEC filings (EDGAR), NIH research papers, your country’s tax code, an open-source project’s full documentation, Wikipedia for a domain you know well.
Required artifacts.
- An ingestion pipeline (Prefect flow) that takes the raw corpus to chunks in pgvector, idempotently.
- A FastAPI service at
/chatreturning grounded answers with citations. - An eval set
eval/cases.jsonlof ≥30 hand-graded question/answer pairs with retrieval ground truth. - An evaluation script that reports Recall@5, MRR, Faithfulness, and Answer Relevance — current numbers in
EVALUATION.md. - Langfuse capturing every request, with a “session” linking retrieval → rerank → answer.
Grading rubric.
| Criterion | Weight |
|---|---|
| Reproducible ingestion from a clean clone | 15% |
| Retrieval is hybrid (not pure dense), with measurable improvement vs. dense-only | 20% |
| Reranker is wired and improves the eval over no-rerank | 15% |
| Answers cite sources and refuse to answer when unsupported | 15% |
| Eval set is real, ≥30 cases, with documented grading rubric | 15% |
| Langfuse traces show the full pipeline cleanly | 10% |
EVALUATION.md is honest about failure modes | 10% |
Step 1 — pgvector
You already have Postgres from the warehouse. Enable pgvector once:
CREATE DATABASE rag;
\c rag
CREATE EXTENSION vector;
CREATE TABLE documents (
id bigserial PRIMARY KEY,
source_uri text NOT NULL,
title text,
metadata jsonb,
created_at timestamptz DEFAULT now()
);
CREATE TABLE chunks (
id bigserial PRIMARY KEY,
document_id bigint REFERENCES documents(id) ON DELETE CASCADE,
chunk_index int NOT NULL,
text text NOT NULL,
embedding vector(1024), -- bge-large-en-v1.5 dim
metadata jsonb,
ts_text tsvector
);
CREATE INDEX chunks_embedding_hnsw ON chunks
USING hnsw (embedding vector_cosine_ops);
CREATE INDEX chunks_ts_gin ON chunks USING gin (ts_text);
Two indexes, two retrieval paths: HNSW for dense, GIN over tsvector for BM25-style keyword search.
Step 2 — Chunking that works
The biggest single quality win in RAG is chunking. The two reasonable strategies:
- Recursive character splitting. Break by headings → paragraphs → sentences → characters, keeping chunks within a target size with overlap.
- Semantic chunking. Embed every sentence, split when adjacent-sentence similarity drops below a threshold.
We use recursive splitting with token-aware sizing — 512 tokens with 64 tokens of overlap. The implementation:
# src/<project>/rag/ingest.py
from langchain_text_splitters import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-large-en-v1.5")
splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
tokenizer,
chunk_size=512,
chunk_overlap=64,
separators=["\n\n", "\n", ". ", " ", ""],
)
Keep metadata per chunk: source URI, page or section, ordinal. The frontend cites by metadata; ungrounded answers are detectable when the metadata is empty.
Step 3 — Ingest the corpus
import os, requests
import polars as pl
from openai import OpenAI
from prefect import flow, task
import psycopg
emb_client = OpenAI(
base_url=os.environ["EMBEDDING_URL"], # http://<gpu-server>:8001/v1
api_key="not-checked-by-tei",
)
def embed(texts: list[str]) -> list[list[float]]:
"""Batched embeddings against the TEI server from module 11."""
resp = requests.post(
os.environ["EMBEDDING_URL"].replace("/v1", "") + "/embed",
json={"inputs": texts}, timeout=60,
)
resp.raise_for_status()
return resp.json()
@task
def index_document(doc: dict, splitter, conn) -> int:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO documents (source_uri, title, metadata) VALUES (%s, %s, %s) RETURNING id",
(doc["uri"], doc["title"], psycopg.types.json.Jsonb(doc.get("metadata", {}))),
)
doc_id = cur.fetchone()[0]
chunks = splitter.split_text(doc["text"])
if not chunks:
return 0
vectors = embed(chunks)
rows = [
(doc_id, i, c, v, psycopg.types.json.Jsonb({"i": i}))
for i, (c, v) in enumerate(zip(chunks, vectors))
]
cur.executemany(
"INSERT INTO chunks (document_id, chunk_index, text, embedding, metadata)"
" VALUES (%s, %s, %s, %s, %s)",
rows,
)
cur.execute(
"UPDATE chunks SET ts_text = to_tsvector('english', text)"
" WHERE document_id = %s AND ts_text IS NULL",
(doc_id,),
)
return len(chunks)
@flow
def ingest_corpus(corpus_dir: str):
conn = psycopg.connect(os.environ["RAG_DB_URI"])
for path in corpus_files(corpus_dir):
doc = load_doc(path)
index_document(doc, splitter, conn)
conn.commit()
Run it once over the whole corpus. The tsvector update is one extra step that gives you BM25-quality keyword search for free.
Step 4 — Hybrid retrieval with RRF
Pure dense retrieval (HNSW over embeddings) loses to hybrid retrieval (dense + BM25) on most real corpora. The blunt truth: BM25 finds the chunk that contains the exact rare phrase in the question; dense finds the chunk that’s about the question. You want both.
def hybrid_search(query: str, k: int = 30) -> list[dict]:
q_vec = embed([query])[0]
with conn.cursor() as cur:
cur.execute("""
WITH dense AS (
SELECT id, text, source_uri, metadata,
1 - (embedding <=> %s::vector) AS score,
row_number() OVER (ORDER BY embedding <=> %s::vector) AS rnk
FROM chunks
ORDER BY embedding <=> %s::vector
LIMIT %s
),
bm25 AS (
SELECT id, text, source_uri, metadata,
ts_rank_cd(ts_text, plainto_tsquery('english', %s)) AS score,
row_number() OVER (ORDER BY ts_rank_cd(ts_text, plainto_tsquery('english', %s)) DESC) AS rnk
FROM chunks
WHERE ts_text @@ plainto_tsquery('english', %s)
ORDER BY 6 DESC
LIMIT %s
)
SELECT id, text, source_uri, metadata,
COALESCE(1.0 / (60 + d.rnk), 0) + COALESCE(1.0 / (60 + b.rnk), 0) AS rrf
FROM dense d
FULL OUTER JOIN bm25 b USING (id, text, source_uri, metadata)
ORDER BY rrf DESC
LIMIT %s
""", (q_vec, q_vec, q_vec, k, query, query, query, k, k))
return [dict(zip(["id","text","source_uri","metadata","score"], r)) for r in cur.fetchall()]
That’s RRF — Reciprocal Rank Fusion. It’s the cheapest, sturdiest hybrid fusion you can implement. Tune the 60 constant only if you’ve measured.
Step 5 — Cross-encoder rerank
Retrieval gives you 30 candidates that are probably relevant. A cross-encoder reads (query, candidate) pairs together and scores each — much better at relevance ranking than the bi-encoder embeddings, but too slow to run over the whole index.
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("BAAI/bge-reranker-large", max_length=512, device="cuda")
def rerank(query: str, candidates: list[dict], top_k: int = 5) -> list[dict]:
pairs = [(query, c["text"]) for c in candidates]
scores = reranker.predict(pairs, show_progress_bar=False)
for c, s in zip(candidates, scores):
c["rerank_score"] = float(s)
return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
The reranker fits on a fraction of one L40S — run it on GPU 1 (so it doesn’t fight vLLM on GPU 0) by setting CUDA_VISIBLE_DEVICES=1 in the serving process’s environment.
Step 6 — Grounded answer generation
SYSTEM = """You are a precise assistant. Answer the user's question using
ONLY the context below. If the answer is not in the context, say
"I don't have that in my sources."
Format every fact-bearing sentence with a citation like [1], [2] using the
source ids from the context. Do not invent citations."""
def build_prompt(query: str, ctx: list[dict]) -> str:
blocks = "\n\n".join(
f"[{i+1}] (source: {c['source_uri']})\n{c['text']}"
for i, c in enumerate(ctx)
)
return f"# Context\n{blocks}\n\n# Question\n{query}"
def answer(query: str) -> dict:
candidates = hybrid_search(query, k=30)
top = rerank(query, candidates, top_k=5)
resp = llm_client.chat.completions.create(
model="qwen2.5-14b",
messages=[
{"role": "system", "content": SYSTEM},
{"role": "user", "content": build_prompt(query, top)},
],
temperature=0.0,
)
return {
"answer": resp.choices[0].message.content,
"sources": [{"i": i+1, "uri": c["source_uri"], "snippet": c["text"][:200]}
for i, c in enumerate(top)],
}
Three load-bearing decisions:
- Temperature 0 for groundedness. Creativity is the enemy here.
- System prompt forbids unsupported claims. “Say I don’t know” is a feature, not a failure.
- Citations are numeric ids referencing the context block, not free-form URLs. This is what lets the frontend reliably render the right link.
Step 7 — FastAPI chat service
# src/<project>/rag/app.py
from fastapi import FastAPI
from pydantic import BaseModel
from src.<project>.rag.pipeline import answer
app = FastAPI()
class ChatRequest(BaseModel):
question: str
session_id: str | None = None
@app.post("/chat")
def chat(req: ChatRequest):
return answer(req.question)
That’s the whole API surface. Authentication, rate-limiting, and streaming responses are good follow-ups — none are required for the capstone.
Step 8 — Langfuse tracing
Langfuse runs as a self-hosted container. Add it:
langfuse:
image: langfuse/langfuse:3
container_name: langfuse
restart: unless-stopped
environment:
- DATABASE_URL=postgresql://langfuse:__lf_pw__@localhost:5432/langfuse
- NEXTAUTH_SECRET=__long_random__
- NEXTAUTH_URL=https://langfuse.example.com
- SALT=__another_long_random__
ports:
- "3002:3000"
Instrument the pipeline:
from langfuse import Langfuse, observe
langfuse = Langfuse(host=os.environ["LANGFUSE_HOST"], ...)
@observe(as_type="generation")
def generate_with_llm(query, ctx): ...
@observe()
def retrieve(query): ...
@observe()
def answer(query):
candidates = retrieve(query)
top = rerank(query, candidates)
return generate_with_llm(query, top)
Every request now has a trace tree in Langfuse showing retrieval candidates, rerank scores, the prompt sent to the LLM, the response, and token usage. This is what makes debugging an answer go from “stare at logs” to “click the trace.”
Step 9 — The eval set
The eval set is the only thing that tells you whether your RAG is getting better. Write it by hand. Do not generate it with the same LLM you’re evaluating — that measures the LLM’s agreement with itself, not correctness.
eval/cases.jsonl:
{"q": "What is the maximum allowable concentration of lead in drinking water under <regulation X>?", "expected_answer_contains": ["15 µg/L", "fifteen micrograms per liter"], "expected_sources_contain": ["lead-and-copper-rule"]}
{"q": "Who chaired <committee Y> in 1998?", "expected_answer_contains": ["Jane Doe"], "expected_sources_contain": ["committee-roster-1998"]}
For a course capstone, 30 cases is the floor — 50 is better. The grading rubric expects them to be hand-written.
The eval script:
import json
from src.<project>.rag.pipeline import answer, hybrid_search, rerank
cases = [json.loads(l) for l in open("eval/cases.jsonl")]
hits = []
for c in cases:
cands = hybrid_search(c["q"], k=30)
top = rerank(c["q"], cands, top_k=5)
a = answer(c["q"])
retrieval_hit = any(s in t["source_uri"] for s in c.get("expected_sources_contain", []) for t in top)
answer_hit = all(needle.lower() in a["answer"].lower() for needle in c["expected_answer_contains"])
hits.append({"q": c["q"], "retrieval_hit": retrieval_hit, "answer_hit": answer_hit})
print("Recall@5:", sum(h["retrieval_hit"] for h in hits) / len(hits))
print("Answer match:", sum(h["answer_hit"] for h in hits) / len(hits))
Report both numbers in EVALUATION.md. If Recall@5 is under 0.8, retrieval is the bottleneck — try smaller chunks, different embedding model, or expand the BM25 lexicon. If Recall@5 is high but answer match is low, the LLM is failing to ground — tighten the prompt.
Recap
You’ve finished Capstone 3. You should have:
- A working RAG service grounded in a real corpus.
- Hybrid retrieval with measurable improvement over dense-only.
- A reranker that further improves answer quality.
- Langfuse traces and a hand-graded eval set.
Next, fine-tuning. The same vLLM endpoint will host the model you train on your own task.
Next: 13 — LoRA fine-tuning.