Building a Production RAG Pipeline with FastAPI and PostgreSQL

· 14 min read · AI & Backend

Build a production RAG pipeline using FastAPI and PostgreSQL with pgvector, covering chunking, embeddings, reranking, caching, and monitoring.

Building a Production RAG Pipeline with FastAPI and PostgreSQL

Retrieval-augmented generation has moved from research demos to production systems faster than most AI techniques. The core idea is straightforward: instead of relying on an LLM's training data alone, you retrieve relevant documents and include them in the prompt context.

The challenge is making this work reliably at scale. Most RAG tutorials stop at "call the embedding API and query a vector store." Production systems need chunking strategies, embedding pipelines, reranking, caching, and monitoring — all of which introduce failure modes that tutorials don't cover.

This guide covers a complete RAG implementation using FastAPI for the API layer, PostgreSQL with pgvector for storage, and practical patterns for handling the parts that break in production.

Problem

Teams building RAG systems encounter the same issues:

  • Chunking strategies that work on demos fail on real documents
  • Embedding API rate limits cause pipeline stalls
  • Vector similarity search returns semantically irrelevant results
  • No observability into what the retrieval step actually found
  • Cold-start latency makes the system feel slow

Why RAG Instead of Fine-Tuning

Fine-tuning changes the model's weights. RAG changes the model's context. The tradeoffs are significant:

Approach Data freshness Cost Hallucination control
Fine-tuning Stale after training High (GPU hours) Low
RAG Real-time Low (API calls) High (verifiable sources)

RAG wins when your data changes frequently, when you need source attribution, or when you can't afford to retrain models. For most production applications, that covers the majority of use cases.

Architecture Overview

The pipeline has four stages:

  1. Ingestion — documents are chunked, embedded, and stored
  2. Retrieval — user queries are embedded, similar chunks are found
  3. Reranking — retrieved chunks are scored for relevance
  4. Generation — the LLM produces a response with retrieved context
User Query → Embed → Vector Search → Rerank → LLM → Response
                                        ↓
                              PostgreSQL + pgvector

Setting Up pgvector

PostgreSQL with pgvector gives you vector storage without a separate database:

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    content TEXT NOT NULL,
    metadata JSONB DEFAULT '{}'::jsonb,
    embedding vector(1536),
    created_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);

The ivfflat index trades exact results for speed. For most RAG applications, approximate nearest neighbors is good enough. If you need to understand indexing deeper, the patterns in PostgreSQL indexing strategies apply here too.

Document Chunking

Chunking is where most RAG pipelines fail silently. Too large and you waste context window. Too small and you lose meaning.

from dataclasses import dataclass

@dataclass
class Chunk:
    content: str
    metadata: dict
    token_count: int

def chunk_document(
    text: str,
    max_tokens: int = 512,
    overlap_tokens: int = 50,
) -> list[Chunk]:
    """Split text into overlapping chunks by paragraph boundaries."""
    paragraphs = text.split("\n\n")
    chunks: list[Chunk] = []
    current: list[str] = []
    current_tokens = 0

    for para in paragraphs:
        para_tokens = len(para.split())
        if current_tokens + para_tokens > max_tokens and current:
            chunk_text = "\n\n".join(current)
            chunks.append(Chunk(
                content=chunk_text,
                metadata={},
                token_count=current_tokens,
            ))
            # Keep last paragraph for overlap
            overlap = current[-1:] if current else []
            current = overlap
            current_tokens = len(overlap[0].split()) if overlap else 0

        current.append(para)
        current_tokens += para_tokens

    if current:
        chunks.append(Chunk(
            content="\n\n".join(current),
            metadata={},
            token_count=current_tokens,
        ))

    return chunks

The key decision is splitting on paragraph boundaries rather than token counts. Splitting mid-sentence destroys context that the embedding model needs.

Embedding Pipeline

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

BATCH_SIZE = 100
EMBED_MODEL = "text-embedding-3-small"

async def embed_chunks(chunks: list[Chunk]) -> list[list[float]]:
    """Embed chunks with batching and rate limit handling."""
    embeddings: list[list[float]] = []

    for i in range(0, len(chunks), BATCH_SIZE):
        batch = chunks[i : i + BATCH_SIZE]
        texts = [c.content for c in batch]

        response = await client.embeddings.create(
            model=EMBED_MODEL,
            input=texts,
        )
        embeddings.extend([e.embedding for e in response.data])

        # Respect rate limits
        if i + BATCH_SIZE < len(chunks):
            await asyncio.sleep(0.5)

    return embeddings

FastAPI Retrieval Endpoint

from fastapi import FastAPI, Depends
from pydantic import BaseModel
import asyncpg

app = FastAPI()

class QueryRequest(BaseModel):
    query: str
    top_k: int = 5

class RetrievedChunk(BaseModel):
    content: str
    similarity: float
    metadata: dict

@app.post("/retrieve")
async def retrieve(
    req: QueryRequest,
    pool: asyncpg.Pool = Depends(get_db_pool),
) -> list[RetrievedChunk]:
    # Embed the query
    query_embedding = await embed_text(req.query)

    # Vector similarity search
    rows = await pool.fetch("""
        SELECT content, metadata,
               1 - (embedding <=> $1::vector) AS similarity
        FROM documents
        ORDER BY embedding <=> $1::vector
        LIMIT $2
    """, str(query_embedding), req.top_k)

    return [
        RetrievedChunk(
            content=row["content"],
            similarity=row["similarity"],
            metadata=row["metadata"],
        )
        for row in rows
    ]

The <=> operator computes cosine distance. Subtracting from 1 converts it to similarity. This pattern integrates well with FastAPI performance patterns for handling concurrent requests.

Reranking

Vector similarity alone is not enough. A reranker scores the actual relevance of retrieved chunks to the query:

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank_chunks(
    query: str,
    chunks: list[RetrievedChunk],
    top_k: int = 3,
) -> list[RetrievedChunk]:
    pairs = [(query, chunk.content) for chunk in chunks]
    scores = reranker.predict(pairs)

    ranked = sorted(
        zip(chunks, scores),
        key=lambda x: x[1],
        reverse=True,
    )
    return [chunk for chunk, _ in ranked[:top_k]]

Generation with Context

async def generate_answer(
    query: str,
    context_chunks: list[RetrievedChunk],
) -> str:
    context = "\n\n---\n\n".join(c.content for c in context_chunks)

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Answer the question using only the provided context. "
                    "If the context does not contain enough information, "
                    "say so explicitly. Cite the relevant section."
                ),
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {query}",
            },
        ],
        temperature=0.1,
    )
    return response.choices[0].message.content

Low temperature reduces hallucination. The system prompt forces the model to stay within the retrieved context.

Caching Strategy

Embedding the same query repeatedly wastes money and adds latency:

import hashlib
from functools import lru_cache

def query_cache_key(query: str) -> str:
    normalized = query.strip().lower()
    return hashlib.sha256(normalized.encode()).hexdigest()

# In-memory cache for development; use Redis in production
embedding_cache: dict[str, list[float]] = {}

async def embed_with_cache(query: str) -> list[float]:
    key = query_cache_key(query)
    if key in embedding_cache:
        return embedding_cache[key]

    embedding = await embed_text(query)
    embedding_cache[key] = embedding
    return embedding

Production Considerations

Monitoring retrieval quality. Log every query, the retrieved chunks, and the final response. You cannot improve what you cannot measure.

Handling embedding API failures. Wrap embedding calls in retry logic with exponential backoff. If the API is down, queue ingestion jobs rather than dropping documents.

Index maintenance. As your document count grows past 100K, rebuild the IVFFlat index periodically. The number of lists should approximate sqrt(row_count).

Chunk metadata. Store source document ID, page number, and section title in the metadata JSONB column. This enables source attribution in responses.

Common Mistakes

Mistake 1: Embedding entire documents. Large documents exceed embedding model token limits and produce diluted vectors. Always chunk first.

Mistake 2: Ignoring chunking overlap. Without overlap, context at chunk boundaries is lost. The reranker cannot recover meaning that was split across chunks.

Mistake 3: Skipping reranking. Vector similarity finds semantically similar text, not necessarily relevant answers. Reranking closes this gap.

Mistake 4: Not versioning embeddings. When you change the embedding model, old vectors are incompatible. Track the model version in metadata and re-embed when models change.

Takeaways

A production RAG pipeline is more than embeddings and a vector store. The chunking strategy determines retrieval quality. The reranker separates relevant results from merely similar ones. Caching and monitoring turn a prototype into a system you can operate.

PostgreSQL with pgvector handles the storage layer without adding another database to your stack. FastAPI handles the API layer with async support for embedding and LLM calls. The combination is simple enough to deploy and powerful enough for production traffic.