Building a Production RAG Pipeline with FastAPI and PostgreSQL
Retrieval-augmented generation has moved from research demos to production systems faster than most AI techniques. The core idea is straightforward: instead of relying on an LLM's training data alone, you retrieve relevant documents and include them in the prompt context.
The challenge is making this work reliably at scale. Most RAG tutorials stop at "call the embedding API and query a vector store." Production systems need chunking strategies, embedding pipelines, reranking, caching, and monitoring — all of which introduce failure modes that tutorials don't cover.
This guide covers a complete RAG implementation using FastAPI for the API layer, PostgreSQL with pgvector for storage, and practical patterns for handling the parts that break in production.
Problem
Teams building RAG systems encounter the same issues:
- Chunking strategies that work on demos fail on real documents
- Embedding API rate limits cause pipeline stalls
- Vector similarity search returns semantically irrelevant results
- No observability into what the retrieval step actually found
- Cold-start latency makes the system feel slow
Why RAG Instead of Fine-Tuning
Fine-tuning changes the model's weights. RAG changes the model's context. The tradeoffs are significant:
| Approach | Data freshness | Cost | Hallucination control |
|---|---|---|---|
| Fine-tuning | Stale after training | High (GPU hours) | Low |
| RAG | Real-time | Low (API calls) | High (verifiable sources) |
RAG wins when your data changes frequently, when you need source attribution, or when you can't afford to retrain models. For most production applications, that covers the majority of use cases.
Architecture Overview
The pipeline has four stages:
- Ingestion — documents are chunked, embedded, and stored
- Retrieval — user queries are embedded, similar chunks are found
- Reranking — retrieved chunks are scored for relevance
- Generation — the LLM produces a response with retrieved context
User Query → Embed → Vector Search → Rerank → LLM → Response
↓
PostgreSQL + pgvector
Setting Up pgvector
PostgreSQL with pgvector gives you vector storage without a separate database:
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}'::jsonb,
embedding vector(1536),
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
The ivfflat index trades exact results for speed. For most RAG applications, approximate nearest neighbors is good enough. If you need to understand indexing deeper, the patterns in PostgreSQL indexing strategies apply here too.
Document Chunking
Chunking is where most RAG pipelines fail silently. Too large and you waste context window. Too small and you lose meaning.
from dataclasses import dataclass
@dataclass
class Chunk:
content: str
metadata: dict
token_count: int
def chunk_document(
text: str,
max_tokens: int = 512,
overlap_tokens: int = 50,
) -> list[Chunk]:
"""Split text into overlapping chunks by paragraph boundaries."""
paragraphs = text.split("\n\n")
chunks: list[Chunk] = []
current: list[str] = []
current_tokens = 0
for para in paragraphs:
para_tokens = len(para.split())
if current_tokens + para_tokens > max_tokens and current:
chunk_text = "\n\n".join(current)
chunks.append(Chunk(
content=chunk_text,
metadata={},
token_count=current_tokens,
))
# Keep last paragraph for overlap
overlap = current[-1:] if current else []
current = overlap
current_tokens = len(overlap[0].split()) if overlap else 0
current.append(para)
current_tokens += para_tokens
if current:
chunks.append(Chunk(
content="\n\n".join(current),
metadata={},
token_count=current_tokens,
))
return chunks
The key decision is splitting on paragraph boundaries rather than token counts. Splitting mid-sentence destroys context that the embedding model needs.
Embedding Pipeline
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
BATCH_SIZE = 100
EMBED_MODEL = "text-embedding-3-small"
async def embed_chunks(chunks: list[Chunk]) -> list[list[float]]:
"""Embed chunks with batching and rate limit handling."""
embeddings: list[list[float]] = []
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
texts = [c.content for c in batch]
response = await client.embeddings.create(
model=EMBED_MODEL,
input=texts,
)
embeddings.extend([e.embedding for e in response.data])
# Respect rate limits
if i + BATCH_SIZE < len(chunks):
await asyncio.sleep(0.5)
return embeddings
FastAPI Retrieval Endpoint
from fastapi import FastAPI, Depends
from pydantic import BaseModel
import asyncpg
app = FastAPI()
class QueryRequest(BaseModel):
query: str
top_k: int = 5
class RetrievedChunk(BaseModel):
content: str
similarity: float
metadata: dict
@app.post("/retrieve")
async def retrieve(
req: QueryRequest,
pool: asyncpg.Pool = Depends(get_db_pool),
) -> list[RetrievedChunk]:
# Embed the query
query_embedding = await embed_text(req.query)
# Vector similarity search
rows = await pool.fetch("""
SELECT content, metadata,
1 - (embedding <=> $1::vector) AS similarity
FROM documents
ORDER BY embedding <=> $1::vector
LIMIT $2
""", str(query_embedding), req.top_k)
return [
RetrievedChunk(
content=row["content"],
similarity=row["similarity"],
metadata=row["metadata"],
)
for row in rows
]
The <=> operator computes cosine distance. Subtracting from 1 converts it to similarity. This pattern integrates well with FastAPI performance patterns for handling concurrent requests.
Reranking
Vector similarity alone is not enough. A reranker scores the actual relevance of retrieved chunks to the query:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank_chunks(
query: str,
chunks: list[RetrievedChunk],
top_k: int = 3,
) -> list[RetrievedChunk]:
pairs = [(query, chunk.content) for chunk in chunks]
scores = reranker.predict(pairs)
ranked = sorted(
zip(chunks, scores),
key=lambda x: x[1],
reverse=True,
)
return [chunk for chunk, _ in ranked[:top_k]]
Generation with Context
async def generate_answer(
query: str,
context_chunks: list[RetrievedChunk],
) -> str:
context = "\n\n---\n\n".join(c.content for c in context_chunks)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Answer the question using only the provided context. "
"If the context does not contain enough information, "
"say so explicitly. Cite the relevant section."
),
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {query}",
},
],
temperature=0.1,
)
return response.choices[0].message.content
Low temperature reduces hallucination. The system prompt forces the model to stay within the retrieved context.
Caching Strategy
Embedding the same query repeatedly wastes money and adds latency:
import hashlib
from functools import lru_cache
def query_cache_key(query: str) -> str:
normalized = query.strip().lower()
return hashlib.sha256(normalized.encode()).hexdigest()
# In-memory cache for development; use Redis in production
embedding_cache: dict[str, list[float]] = {}
async def embed_with_cache(query: str) -> list[float]:
key = query_cache_key(query)
if key in embedding_cache:
return embedding_cache[key]
embedding = await embed_text(query)
embedding_cache[key] = embedding
return embedding
Production Considerations
Monitoring retrieval quality. Log every query, the retrieved chunks, and the final response. You cannot improve what you cannot measure.
Handling embedding API failures. Wrap embedding calls in retry logic with exponential backoff. If the API is down, queue ingestion jobs rather than dropping documents.
Index maintenance. As your document count grows past 100K, rebuild the IVFFlat index periodically. The number of lists should approximate sqrt(row_count).
Chunk metadata. Store source document ID, page number, and section title in the metadata JSONB column. This enables source attribution in responses.
Common Mistakes
Mistake 1: Embedding entire documents. Large documents exceed embedding model token limits and produce diluted vectors. Always chunk first.
Mistake 2: Ignoring chunking overlap. Without overlap, context at chunk boundaries is lost. The reranker cannot recover meaning that was split across chunks.
Mistake 3: Skipping reranking. Vector similarity finds semantically similar text, not necessarily relevant answers. Reranking closes this gap.
Mistake 4: Not versioning embeddings. When you change the embedding model, old vectors are incompatible. Track the model version in metadata and re-embed when models change.
Takeaways
A production RAG pipeline is more than embeddings and a vector store. The chunking strategy determines retrieval quality. The reranker separates relevant results from merely similar ones. Caching and monitoring turn a prototype into a system you can operate.
PostgreSQL with pgvector handles the storage layer without adding another database to your stack. FastAPI handles the API layer with async support for embedding and LLM calls. The combination is simple enough to deploy and powerful enough for production traffic.