~180 min read · updated 2026-05-15

RAG application

Ship a RAG application against a real corpus on the platform's self-hosted LLM: hybrid retrieval (BM25 + dense) in pgvector, a cross-encoder reranker, Langfuse tracing, and an offline eval set you actually trust.

This is the LLM application capstone. By the end you will have:

  • A document ingestion pipeline that chunks, embeds, and indexes a real corpus into Postgres + pgvector.
  • Hybrid retrieval combining BM25 keyword scores and dense vector similarity, fused with Reciprocal Rank Fusion.
  • A cross-encoder reranker that re-orders the top-k candidates by query-document relevance.
  • A FastAPI chat endpoint wired to the vLLM endpoint from module 11, with grounded answers and source citations.
  • Langfuse tracing every step (retrieval, rerank, prompt, response) for offline analysis.
  • An offline eval set of ≥30 graded examples with a measured retrieval-and-answer quality baseline.

This is a long module. Plan two sittings.

Capstone 3 — Brief

Mission. You are building a RAG assistant for an internal policy/regulation/research corpus of your choice. The instructor will accept any corpus with these properties:

  • Real. A scrape from a website, an organization’s docs export, regulatory PDFs, your own blog archive — not an LLM-generated toy.
  • At least 500 documents, totaling at least 5 MB of text.
  • Has authoritative answers. The point of RAG is to be grounded; you need a way to actually verify groundedness.

Sample corpora that work well: SEC filings (EDGAR), NIH research papers, your country’s tax code, an open-source project’s full documentation, Wikipedia for a domain you know well.

Required artifacts.

  • An ingestion pipeline (Prefect flow) that takes the raw corpus to chunks in pgvector, idempotently.
  • A FastAPI service at /chat returning grounded answers with citations.
  • An eval set eval/cases.jsonl of ≥30 hand-graded question/answer pairs with retrieval ground truth.
  • An evaluation script that reports Recall@5, MRR, Faithfulness, and Answer Relevance — current numbers in EVALUATION.md.
  • Langfuse capturing every request, with a “session” linking retrieval → rerank → answer.

Grading rubric.

CriterionWeight
Reproducible ingestion from a clean clone15%
Retrieval is hybrid (not pure dense), with measurable improvement vs. dense-only20%
Reranker is wired and improves the eval over no-rerank15%
Answers cite sources and refuse to answer when unsupported15%
Eval set is real, ≥30 cases, with documented grading rubric15%
Langfuse traces show the full pipeline cleanly10%
EVALUATION.md is honest about failure modes10%

Step 1 — pgvector

You already have Postgres from the warehouse. Enable pgvector once:

CREATE DATABASE rag;
\c rag
CREATE EXTENSION vector;

CREATE TABLE documents (
  id          bigserial PRIMARY KEY,
  source_uri  text NOT NULL,
  title       text,
  metadata    jsonb,
  created_at  timestamptz DEFAULT now()
);

CREATE TABLE chunks (
  id           bigserial PRIMARY KEY,
  document_id  bigint REFERENCES documents(id) ON DELETE CASCADE,
  chunk_index  int NOT NULL,
  text         text NOT NULL,
  embedding    vector(1024),                                 -- bge-large-en-v1.5 dim
  metadata     jsonb,
  ts_text      tsvector
);

CREATE INDEX chunks_embedding_hnsw ON chunks
  USING hnsw (embedding vector_cosine_ops);
CREATE INDEX chunks_ts_gin ON chunks USING gin (ts_text);

Two indexes, two retrieval paths: HNSW for dense, GIN over tsvector for BM25-style keyword search.

Step 2 — Chunking that works

The biggest single quality win in RAG is chunking. The two reasonable strategies:

  • Recursive character splitting. Break by headings → paragraphs → sentences → characters, keeping chunks within a target size with overlap.
  • Semantic chunking. Embed every sentence, split when adjacent-sentence similarity drops below a threshold.

We use recursive splitting with token-aware sizing — 512 tokens with 64 tokens of overlap. The implementation:

# src/<project>/rag/ingest.py
from langchain_text_splitters import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-large-en-v1.5")

splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
    tokenizer,
    chunk_size=512,
    chunk_overlap=64,
    separators=["\n\n", "\n", ". ", " ", ""],
)

Keep metadata per chunk: source URI, page or section, ordinal. The frontend cites by metadata; ungrounded answers are detectable when the metadata is empty.

Step 3 — Ingest the corpus

import os, requests
import polars as pl
from openai import OpenAI
from prefect import flow, task
import psycopg

emb_client = OpenAI(
    base_url=os.environ["EMBEDDING_URL"],            # http://<gpu-server>:8001/v1
    api_key="not-checked-by-tei",
)

def embed(texts: list[str]) -> list[list[float]]:
    """Batched embeddings against the TEI server from module 11."""
    resp = requests.post(
        os.environ["EMBEDDING_URL"].replace("/v1", "") + "/embed",
        json={"inputs": texts}, timeout=60,
    )
    resp.raise_for_status()
    return resp.json()

@task
def index_document(doc: dict, splitter, conn) -> int:
    with conn.cursor() as cur:
        cur.execute(
            "INSERT INTO documents (source_uri, title, metadata) VALUES (%s, %s, %s) RETURNING id",
            (doc["uri"], doc["title"], psycopg.types.json.Jsonb(doc.get("metadata", {}))),
        )
        doc_id = cur.fetchone()[0]
        chunks = splitter.split_text(doc["text"])
        if not chunks:
            return 0
        vectors = embed(chunks)
        rows = [
            (doc_id, i, c, v, psycopg.types.json.Jsonb({"i": i}))
            for i, (c, v) in enumerate(zip(chunks, vectors))
        ]
        cur.executemany(
            "INSERT INTO chunks (document_id, chunk_index, text, embedding, metadata)"
            " VALUES (%s, %s, %s, %s, %s)",
            rows,
        )
        cur.execute(
            "UPDATE chunks SET ts_text = to_tsvector('english', text)"
            " WHERE document_id = %s AND ts_text IS NULL",
            (doc_id,),
        )
    return len(chunks)

@flow
def ingest_corpus(corpus_dir: str):
    conn = psycopg.connect(os.environ["RAG_DB_URI"])
    for path in corpus_files(corpus_dir):
        doc = load_doc(path)
        index_document(doc, splitter, conn)
    conn.commit()

Run it once over the whole corpus. The tsvector update is one extra step that gives you BM25-quality keyword search for free.

Step 4 — Hybrid retrieval with RRF

Pure dense retrieval (HNSW over embeddings) loses to hybrid retrieval (dense + BM25) on most real corpora. The blunt truth: BM25 finds the chunk that contains the exact rare phrase in the question; dense finds the chunk that’s about the question. You want both.

def hybrid_search(query: str, k: int = 30) -> list[dict]:
    q_vec = embed([query])[0]

    with conn.cursor() as cur:
        cur.execute("""
            WITH dense AS (
              SELECT id, text, source_uri, metadata,
                     1 - (embedding <=> %s::vector) AS score,
                     row_number() OVER (ORDER BY embedding <=> %s::vector) AS rnk
              FROM chunks
              ORDER BY embedding <=> %s::vector
              LIMIT %s
            ),
            bm25 AS (
              SELECT id, text, source_uri, metadata,
                     ts_rank_cd(ts_text, plainto_tsquery('english', %s)) AS score,
                     row_number() OVER (ORDER BY ts_rank_cd(ts_text, plainto_tsquery('english', %s)) DESC) AS rnk
              FROM chunks
              WHERE ts_text @@ plainto_tsquery('english', %s)
              ORDER BY 6 DESC
              LIMIT %s
            )
            SELECT id, text, source_uri, metadata,
                   COALESCE(1.0 / (60 + d.rnk), 0) + COALESCE(1.0 / (60 + b.rnk), 0) AS rrf
            FROM dense d
            FULL OUTER JOIN bm25 b USING (id, text, source_uri, metadata)
            ORDER BY rrf DESC
            LIMIT %s
        """, (q_vec, q_vec, q_vec, k, query, query, query, k, k))
        return [dict(zip(["id","text","source_uri","metadata","score"], r)) for r in cur.fetchall()]

That’s RRF — Reciprocal Rank Fusion. It’s the cheapest, sturdiest hybrid fusion you can implement. Tune the 60 constant only if you’ve measured.

Step 5 — Cross-encoder rerank

Retrieval gives you 30 candidates that are probably relevant. A cross-encoder reads (query, candidate) pairs together and scores each — much better at relevance ranking than the bi-encoder embeddings, but too slow to run over the whole index.

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("BAAI/bge-reranker-large", max_length=512, device="cuda")

def rerank(query: str, candidates: list[dict], top_k: int = 5) -> list[dict]:
    pairs = [(query, c["text"]) for c in candidates]
    scores = reranker.predict(pairs, show_progress_bar=False)
    for c, s in zip(candidates, scores):
        c["rerank_score"] = float(s)
    return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:top_k]

The reranker fits on a fraction of one L40S — run it on GPU 1 (so it doesn’t fight vLLM on GPU 0) by setting CUDA_VISIBLE_DEVICES=1 in the serving process’s environment.

Step 6 — Grounded answer generation

SYSTEM = """You are a precise assistant. Answer the user's question using
ONLY the context below. If the answer is not in the context, say
"I don't have that in my sources."

Format every fact-bearing sentence with a citation like [1], [2] using the
source ids from the context. Do not invent citations."""

def build_prompt(query: str, ctx: list[dict]) -> str:
    blocks = "\n\n".join(
        f"[{i+1}] (source: {c['source_uri']})\n{c['text']}"
        for i, c in enumerate(ctx)
    )
    return f"# Context\n{blocks}\n\n# Question\n{query}"

def answer(query: str) -> dict:
    candidates = hybrid_search(query, k=30)
    top = rerank(query, candidates, top_k=5)
    resp = llm_client.chat.completions.create(
        model="qwen2.5-14b",
        messages=[
            {"role": "system", "content": SYSTEM},
            {"role": "user", "content": build_prompt(query, top)},
        ],
        temperature=0.0,
    )
    return {
        "answer": resp.choices[0].message.content,
        "sources": [{"i": i+1, "uri": c["source_uri"], "snippet": c["text"][:200]}
                    for i, c in enumerate(top)],
    }

Three load-bearing decisions:

  • Temperature 0 for groundedness. Creativity is the enemy here.
  • System prompt forbids unsupported claims. “Say I don’t know” is a feature, not a failure.
  • Citations are numeric ids referencing the context block, not free-form URLs. This is what lets the frontend reliably render the right link.

Step 7 — FastAPI chat service

# src/<project>/rag/app.py
from fastapi import FastAPI
from pydantic import BaseModel
from src.<project>.rag.pipeline import answer

app = FastAPI()

class ChatRequest(BaseModel):
    question: str
    session_id: str | None = None

@app.post("/chat")
def chat(req: ChatRequest):
    return answer(req.question)

That’s the whole API surface. Authentication, rate-limiting, and streaming responses are good follow-ups — none are required for the capstone.

Step 8 — Langfuse tracing

Langfuse runs as a self-hosted container. Add it:

  langfuse:
    image: langfuse/langfuse:3
    container_name: langfuse
    restart: unless-stopped
    environment:
      - DATABASE_URL=postgresql://langfuse:__lf_pw__@localhost:5432/langfuse
      - NEXTAUTH_SECRET=__long_random__
      - NEXTAUTH_URL=https://langfuse.example.com
      - SALT=__another_long_random__
    ports:
      - "3002:3000"

Instrument the pipeline:

from langfuse import Langfuse, observe

langfuse = Langfuse(host=os.environ["LANGFUSE_HOST"], ...)

@observe(as_type="generation")
def generate_with_llm(query, ctx): ...

@observe()
def retrieve(query): ...

@observe()
def answer(query):
    candidates = retrieve(query)
    top = rerank(query, candidates)
    return generate_with_llm(query, top)

Every request now has a trace tree in Langfuse showing retrieval candidates, rerank scores, the prompt sent to the LLM, the response, and token usage. This is what makes debugging an answer go from “stare at logs” to “click the trace.”

Step 9 — The eval set

The eval set is the only thing that tells you whether your RAG is getting better. Write it by hand. Do not generate it with the same LLM you’re evaluating — that measures the LLM’s agreement with itself, not correctness.

eval/cases.jsonl:

{"q": "What is the maximum allowable concentration of lead in drinking water under <regulation X>?", "expected_answer_contains": ["15 µg/L", "fifteen micrograms per liter"], "expected_sources_contain": ["lead-and-copper-rule"]}
{"q": "Who chaired <committee Y> in 1998?", "expected_answer_contains": ["Jane Doe"], "expected_sources_contain": ["committee-roster-1998"]}

For a course capstone, 30 cases is the floor — 50 is better. The grading rubric expects them to be hand-written.

The eval script:

import json
from src.<project>.rag.pipeline import answer, hybrid_search, rerank

cases = [json.loads(l) for l in open("eval/cases.jsonl")]
hits = []
for c in cases:
    cands = hybrid_search(c["q"], k=30)
    top = rerank(c["q"], cands, top_k=5)
    a = answer(c["q"])
    retrieval_hit = any(s in t["source_uri"] for s in c.get("expected_sources_contain", []) for t in top)
    answer_hit = all(needle.lower() in a["answer"].lower() for needle in c["expected_answer_contains"])
    hits.append({"q": c["q"], "retrieval_hit": retrieval_hit, "answer_hit": answer_hit})

print("Recall@5:", sum(h["retrieval_hit"] for h in hits) / len(hits))
print("Answer match:", sum(h["answer_hit"] for h in hits) / len(hits))

Report both numbers in EVALUATION.md. If Recall@5 is under 0.8, retrieval is the bottleneck — try smaller chunks, different embedding model, or expand the BM25 lexicon. If Recall@5 is high but answer match is low, the LLM is failing to ground — tighten the prompt.

Recap

You’ve finished Capstone 3. You should have:

  • A working RAG service grounded in a real corpus.
  • Hybrid retrieval with measurable improvement over dense-only.
  • A reranker that further improves answer quality.
  • Langfuse traces and a hand-graded eval set.

Next, fine-tuning. The same vLLM endpoint will host the model you train on your own task.


Next: 13 — LoRA fine-tuning.