RAG SystemsAdvanced
Modular RAG
Build a composable RAG framework with swappable retrievers, rerankers, and generators
Modular RAG
| Property | Value |
|---|---|
| Difficulty | Advanced |
| Time | ~4 days |
| Code Size | ~800 LOC |
| Prerequisites | RAG with Reranking, Production RAG |
TL;DR
Build a composable RAG framework where retrievers, rerankers, and generators are swappable components. Define pipelines via YAML config, enabling A/B testing and easy experimentation without code changes.
Tech Stack
| Technology | Purpose |
|---|---|
| Python ABC | Component interfaces |
| Pydantic | Configuration & validation |
| ChromaDB | Default vector store |
| Pinecone | Alternative vector store |
| OpenAI | Default embeddings & LLM |
| Cohere | Alternative reranker |
| FastAPI | REST API |
Prerequisites
- Completed intermediate RAG projects
- Understanding of design patterns (Strategy, Factory)
- Experience with multiple vector databases
- Python 3.10+
What You'll Learn
- Design a modular RAG architecture with swappable components
- Implement abstract interfaces for retrievers, rerankers, and generators
- Build a pipeline configuration system
- Create component factories for dynamic instantiation
- Enable A/B testing between different RAG configurations
Why Modular RAG?
Production RAG systems need flexibility:
- Different data types need different chunking strategies
- Different queries benefit from different retrievers
- Different accuracy/speed tradeoffs require swappable rerankers
- A/B testing requires running multiple configurations
┌─────────────────────────────────────────────────────────────────────────────┐
│ MONOLITHIC RAG ❌ │
│ │
│ Query ───► Fixed Retriever ───► Fixed Reranker ───► Fixed Generator │
│ │
│ Problem: Want to try Pinecone? Cohere reranker? Claude? Rewrite code! ⚠️ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ MODULAR RAG ✅ │
│ │
│ ┌────────────────┐ │
│ │ Router │ │
│ └───────┬────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ ChromaDB │ │ Pinecone │ │ Hybrid │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Reranker │ (Cohere / CrossEncoder / LLM) │
│ │ Pool │ │
│ └──────┬──────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Generator │ (OpenAI / Anthropic) │
│ │ Pool │ │
│ └─────────────┘ │
│ │
│ Solution: Swap components via YAML config, A/B test instantly ✓ │
└─────────────────────────────────────────────────────────────────────────────┘Project Structure
modular-rag/
├── core/
│ ├── __init__.py
│ ├── interfaces.py # Abstract base classes
│ ├── schemas.py # Pydantic models
│ └── registry.py # Component registry
├── retrievers/
│ ├── __init__.py
│ ├── base.py
│ ├── chroma.py # ChromaDB retriever
│ ├── pinecone.py # Pinecone retriever
│ └── hybrid.py # Hybrid search retriever
├── rerankers/
│ ├── __init__.py
│ ├── base.py
│ ├── cohere.py # Cohere reranker
│ ├── cross_encoder.py # Local cross-encoder
│ └── llm.py # LLM-based reranker
├── generators/
│ ├── __init__.py
│ ├── base.py
│ ├── openai.py # OpenAI generator
│ └── anthropic.py # Anthropic generator
├── chunkers/
│ ├── __init__.py
│ ├── base.py
│ ├── recursive.py # Recursive text splitter
│ └── semantic.py # Semantic chunker
├── pipeline/
│ ├── __init__.py
│ ├── builder.py # Pipeline builder
│ └── executor.py # Pipeline executor
├── config.py
├── app.py
└── requirements.txtStep 1: Core Interfaces
Define abstract interfaces that all components must implement.
# core/interfaces.py
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
from pydantic import BaseModel
# Type variables for generic interfaces
TConfig = TypeVar("TConfig", bound=BaseModel)
TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")
class Component(ABC, Generic[TConfig]):
"""Base interface for all RAG components."""
def __init__(self, config: TConfig):
self.config = config
self._validate_config()
@abstractmethod
def _validate_config(self) -> None:
"""Validate component configuration."""
pass
@property
@abstractmethod
def name(self) -> str:
"""Unique component name."""
pass
@property
@abstractmethod
def version(self) -> str:
"""Component version."""
pass
class Document(BaseModel):
"""Standard document representation."""
id: str
content: str
metadata: dict[str, Any] = {}
embedding: list[float] | None = None
score: float | None = None
class Query(BaseModel):
"""Standard query representation."""
text: str
metadata: dict[str, Any] = {}
filters: dict[str, Any] = {}
class RetrievalResult(BaseModel):
"""Result from retrieval."""
query: Query
documents: list[Document]
retriever_name: str
latency_ms: float
class RerankResult(BaseModel):
"""Result from reranking."""
query: Query
documents: list[Document]
reranker_name: str
latency_ms: float
class GenerationResult(BaseModel):
"""Result from generation."""
query: Query
answer: str
sources: list[Document]
generator_name: str
latency_ms: float
tokens_used: int | None = None
class Retriever(Component[TConfig], ABC):
"""Interface for document retrievers."""
@abstractmethod
async def retrieve(
self,
query: Query,
k: int = 10
) -> RetrievalResult:
"""Retrieve relevant documents for a query."""
pass
@abstractmethod
async def add_documents(
self,
documents: list[Document]
) -> int:
"""Add documents to the index."""
pass
class Reranker(Component[TConfig], ABC):
"""Interface for document rerankers."""
@abstractmethod
async def rerank(
self,
query: Query,
documents: list[Document],
top_k: int = 5
) -> RerankResult:
"""Rerank documents by relevance."""
pass
class Generator(Component[TConfig], ABC):
"""Interface for answer generators."""
@abstractmethod
async def generate(
self,
query: Query,
documents: list[Document]
) -> GenerationResult:
"""Generate answer from documents."""
pass
class Chunker(Component[TConfig], ABC):
"""Interface for document chunkers."""
@abstractmethod
def chunk(
self,
text: str,
metadata: dict[str, Any] | None = None
) -> list[Document]:
"""Split text into chunks."""
passStep 2: Core Schemas
# core/schemas.py
from pydantic import BaseModel, Field
from typing import Any, Literal
from enum import Enum
class RetrieverType(str, Enum):
CHROMA = "chroma"
PINECONE = "pinecone"
HYBRID = "hybrid"
class RerankerType(str, Enum):
COHERE = "cohere"
CROSS_ENCODER = "cross_encoder"
LLM = "llm"
NONE = "none"
class GeneratorType(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
class ChunkerType(str, Enum):
RECURSIVE = "recursive"
SEMANTIC = "semantic"
# Component Configs
class ChromaConfig(BaseModel):
"""Configuration for ChromaDB retriever."""
collection_name: str = "documents"
persist_directory: str = "./chroma_db"
embedding_model: str = "text-embedding-3-small"
distance_metric: Literal["cosine", "l2", "ip"] = "cosine"
class PineconeConfig(BaseModel):
"""Configuration for Pinecone retriever."""
index_name: str
namespace: str = ""
embedding_model: str = "text-embedding-3-small"
api_key: str
class HybridConfig(BaseModel):
"""Configuration for hybrid retriever."""
vector_weight: float = Field(0.7, ge=0, le=1)
bm25_weight: float = Field(0.3, ge=0, le=1)
vector_config: ChromaConfig | PineconeConfig
class CohereRerankerConfig(BaseModel):
"""Configuration for Cohere reranker."""
model: str = "rerank-english-v3.0"
api_key: str
class CrossEncoderConfig(BaseModel):
"""Configuration for cross-encoder reranker."""
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
device: str = "cpu"
batch_size: int = 32
class LLMRerankerConfig(BaseModel):
"""Configuration for LLM-based reranker."""
model: str = "gpt-4o-mini"
api_key: str
class OpenAIGeneratorConfig(BaseModel):
"""Configuration for OpenAI generator."""
model: str = "gpt-4o-mini"
api_key: str
temperature: float = 0.7
max_tokens: int = 1024
system_prompt: str | None = None
class AnthropicGeneratorConfig(BaseModel):
"""Configuration for Anthropic generator."""
model: str = "claude-3-5-sonnet-20241022"
api_key: str
temperature: float = 0.7
max_tokens: int = 1024
system_prompt: str | None = None
class RecursiveChunkerConfig(BaseModel):
"""Configuration for recursive chunker."""
chunk_size: int = 512
chunk_overlap: int = 50
separators: list[str] = ["\n\n", "\n", ". ", " "]
class SemanticChunkerConfig(BaseModel):
"""Configuration for semantic chunker."""
embedding_model: str = "text-embedding-3-small"
similarity_threshold: float = 0.8
min_chunk_size: int = 100
max_chunk_size: int = 1000
# Pipeline Config
class PipelineConfig(BaseModel):
"""Full pipeline configuration."""
name: str
description: str = ""
# Component selections
retriever_type: RetrieverType
retriever_config: dict[str, Any]
reranker_type: RerankerType = RerankerType.NONE
reranker_config: dict[str, Any] = {}
generator_type: GeneratorType = GeneratorType.OPENAI
generator_config: dict[str, Any] = {}
chunker_type: ChunkerType = ChunkerType.RECURSIVE
chunker_config: dict[str, Any] = {}
# Pipeline settings
retrieval_k: int = 20
rerank_k: int = 5Step 3: Component Registry
# core/registry.py
from typing import Type, TypeVar, Any
from core.interfaces import Component, Retriever, Reranker, Generator, Chunker
from core.schemas import (
RetrieverType, RerankerType, GeneratorType, ChunkerType
)
T = TypeVar("T", bound=Component)
class ComponentRegistry:
"""Registry for RAG components."""
_retrievers: dict[RetrieverType, Type[Retriever]] = {}
_rerankers: dict[RerankerType, Type[Reranker]] = {}
_generators: dict[GeneratorType, Type[Generator]] = {}
_chunkers: dict[ChunkerType, Type[Chunker]] = {}
@classmethod
def register_retriever(
cls,
type_: RetrieverType
):
"""Decorator to register a retriever."""
def decorator(retriever_cls: Type[Retriever]):
cls._retrievers[type_] = retriever_cls
return retriever_cls
return decorator
@classmethod
def register_reranker(
cls,
type_: RerankerType
):
"""Decorator to register a reranker."""
def decorator(reranker_cls: Type[Reranker]):
cls._rerankers[type_] = reranker_cls
return reranker_cls
return decorator
@classmethod
def register_generator(
cls,
type_: GeneratorType
):
"""Decorator to register a generator."""
def decorator(generator_cls: Type[Generator]):
cls._generators[type_] = generator_cls
return generator_cls
return decorator
@classmethod
def register_chunker(
cls,
type_: ChunkerType
):
"""Decorator to register a chunker."""
def decorator(chunker_cls: Type[Chunker]):
cls._chunkers[type_] = chunker_cls
return chunker_cls
return decorator
@classmethod
def get_retriever(
cls,
type_: RetrieverType,
config: dict[str, Any]
) -> Retriever:
"""Get a retriever instance."""
if type_ not in cls._retrievers:
raise ValueError(f"Unknown retriever type: {type_}")
return cls._retrievers[type_](config)
@classmethod
def get_reranker(
cls,
type_: RerankerType,
config: dict[str, Any]
) -> Reranker | None:
"""Get a reranker instance."""
if type_ == RerankerType.NONE:
return None
if type_ not in cls._rerankers:
raise ValueError(f"Unknown reranker type: {type_}")
return cls._rerankers[type_](config)
@classmethod
def get_generator(
cls,
type_: GeneratorType,
config: dict[str, Any]
) -> Generator:
"""Get a generator instance."""
if type_ not in cls._generators:
raise ValueError(f"Unknown generator type: {type_}")
return cls._generators[type_](config)
@classmethod
def get_chunker(
cls,
type_: ChunkerType,
config: dict[str, Any]
) -> Chunker:
"""Get a chunker instance."""
if type_ not in cls._chunkers:
raise ValueError(f"Unknown chunker type: {type_}")
return cls._chunkers[type_](config)
@classmethod
def list_components(cls) -> dict:
"""List all registered components."""
return {
"retrievers": list(cls._retrievers.keys()),
"rerankers": list(cls._rerankers.keys()),
"generators": list(cls._generators.keys()),
"chunkers": list(cls._chunkers.keys())
}Step 4: Retriever Implementations
# retrievers/chroma.py
import chromadb
from chromadb.utils import embedding_functions
import time
from openai import OpenAI
from core.interfaces import Retriever, Document, Query, RetrievalResult
from core.schemas import ChromaConfig, RetrieverType
from core.registry import ComponentRegistry
@ComponentRegistry.register_retriever(RetrieverType.CHROMA)
class ChromaRetriever(Retriever[ChromaConfig]):
"""ChromaDB-based retriever."""
def __init__(self, config: dict):
self._config = ChromaConfig(**config)
super().__init__(self._config)
# Initialize ChromaDB
self.client = chromadb.PersistentClient(
path=self._config.persist_directory
)
# Initialize OpenAI for embeddings
self.openai = OpenAI()
self.embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
model_name=self._config.embedding_model
)
self.collection = self.client.get_or_create_collection(
name=self._config.collection_name,
embedding_function=self.embedding_fn,
metadata={"hnsw:space": self._config.distance_metric}
)
def _validate_config(self) -> None:
pass
@property
def name(self) -> str:
return f"chroma-{self._config.collection_name}"
@property
def version(self) -> str:
return "1.0.0"
async def retrieve(
self,
query: Query,
k: int = 10
) -> RetrievalResult:
start = time.time()
# Build where clause from filters
where = query.filters if query.filters else None
results = self.collection.query(
query_texts=[query.text],
n_results=k,
where=where,
include=["documents", "metadatas", "distances"]
)
documents = []
for i in range(len(results["documents"][0])):
doc = Document(
id=results["ids"][0][i],
content=results["documents"][0][i],
metadata=results["metadatas"][0][i] or {},
score=1 - results["distances"][0][i] # Convert distance to score
)
documents.append(doc)
latency = (time.time() - start) * 1000
return RetrievalResult(
query=query,
documents=documents,
retriever_name=self.name,
latency_ms=latency
)
async def add_documents(
self,
documents: list[Document]
) -> int:
self.collection.add(
ids=[d.id for d in documents],
documents=[d.content for d in documents],
metadatas=[d.metadata for d in documents]
)
return len(documents)# retrievers/hybrid.py
import time
from rank_bm25 import BM25Okapi
import numpy as np
from core.interfaces import Retriever, Document, Query, RetrievalResult
from core.schemas import HybridConfig, RetrieverType
from core.registry import ComponentRegistry
from retrievers.chroma import ChromaRetriever
@ComponentRegistry.register_retriever(RetrieverType.HYBRID)
class HybridRetriever(Retriever[HybridConfig]):
"""Hybrid retriever combining vector and BM25 search."""
def __init__(self, config: dict):
self._config = HybridConfig(**config)
super().__init__(self._config)
# Initialize vector retriever
self.vector_retriever = ChromaRetriever(
self._config.vector_config.model_dump()
)
# BM25 index (built from documents)
self.bm25: BM25Okapi | None = None
self.documents: list[Document] = []
def _validate_config(self) -> None:
if self._config.vector_weight + self._config.bm25_weight != 1.0:
# Normalize weights
total = self._config.vector_weight + self._config.bm25_weight
self._config.vector_weight /= total
self._config.bm25_weight /= total
@property
def name(self) -> str:
return "hybrid-retriever"
@property
def version(self) -> str:
return "1.0.0"
async def retrieve(
self,
query: Query,
k: int = 10
) -> RetrievalResult:
start = time.time()
# Vector search
vector_results = await self.vector_retriever.retrieve(query, k=k*2)
# BM25 search
bm25_results = self._bm25_search(query.text, k=k*2)
# Reciprocal Rank Fusion
fused = self._fuse_results(
vector_results.documents,
bm25_results,
k=k
)
latency = (time.time() - start) * 1000
return RetrievalResult(
query=query,
documents=fused,
retriever_name=self.name,
latency_ms=latency
)
def _bm25_search(
self,
query: str,
k: int
) -> list[Document]:
if not self.bm25 or not self.documents:
return []
tokenized_query = query.lower().split()
scores = self.bm25.get_scores(tokenized_query)
top_indices = np.argsort(scores)[::-1][:k]
results = []
for idx in top_indices:
if scores[idx] > 0:
doc = self.documents[idx].model_copy()
doc.score = float(scores[idx])
results.append(doc)
return results
def _fuse_results(
self,
vector_docs: list[Document],
bm25_docs: list[Document],
k: int
) -> list[Document]:
"""Reciprocal Rank Fusion."""
scores: dict[str, float] = {}
doc_map: dict[str, Document] = {}
rrf_k = 60 # RRF constant
for rank, doc in enumerate(vector_docs):
scores[doc.id] = scores.get(doc.id, 0) + \
self._config.vector_weight / (rank + rrf_k)
doc_map[doc.id] = doc
for rank, doc in enumerate(bm25_docs):
scores[doc.id] = scores.get(doc.id, 0) + \
self._config.bm25_weight / (rank + rrf_k)
if doc.id not in doc_map:
doc_map[doc.id] = doc
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
results = []
for doc_id in sorted_ids[:k]:
doc = doc_map[doc_id].model_copy()
doc.score = scores[doc_id]
results.append(doc)
return results
async def add_documents(
self,
documents: list[Document]
) -> int:
# Add to vector store
await self.vector_retriever.add_documents(documents)
# Update BM25 index
self.documents.extend(documents)
tokenized = [doc.content.lower().split() for doc in self.documents]
self.bm25 = BM25Okapi(tokenized)
return len(documents)Step 5: Reranker Implementations
# rerankers/cohere.py
import cohere
import time
from core.interfaces import Reranker, Document, Query, RerankResult
from core.schemas import CohereRerankerConfig, RerankerType
from core.registry import ComponentRegistry
@ComponentRegistry.register_reranker(RerankerType.COHERE)
class CohereReranker(Reranker[CohereRerankerConfig]):
"""Cohere-based reranker."""
def __init__(self, config: dict):
self._config = CohereRerankerConfig(**config)
super().__init__(self._config)
self.client = cohere.Client(self._config.api_key)
def _validate_config(self) -> None:
pass
@property
def name(self) -> str:
return f"cohere-{self._config.model}"
@property
def version(self) -> str:
return "1.0.0"
async def rerank(
self,
query: Query,
documents: list[Document],
top_k: int = 5
) -> RerankResult:
start = time.time()
response = self.client.rerank(
model=self._config.model,
query=query.text,
documents=[d.content for d in documents],
top_n=top_k
)
reranked = []
for result in response.results:
doc = documents[result.index].model_copy()
doc.score = result.relevance_score
reranked.append(doc)
latency = (time.time() - start) * 1000
return RerankResult(
query=query,
documents=reranked,
reranker_name=self.name,
latency_ms=latency
)# rerankers/cross_encoder.py
from sentence_transformers import CrossEncoder
import time
from core.interfaces import Reranker, Document, Query, RerankResult
from core.schemas import CrossEncoderConfig, RerankerType
from core.registry import ComponentRegistry
@ComponentRegistry.register_reranker(RerankerType.CROSS_ENCODER)
class CrossEncoderReranker(Reranker[CrossEncoderConfig]):
"""Local cross-encoder reranker."""
def __init__(self, config: dict):
self._config = CrossEncoderConfig(**config)
super().__init__(self._config)
self.model = CrossEncoder(
self._config.model_name,
device=self._config.device
)
def _validate_config(self) -> None:
pass
@property
def name(self) -> str:
return f"cross-encoder-{self._config.model_name.split('/')[-1]}"
@property
def version(self) -> str:
return "1.0.0"
async def rerank(
self,
query: Query,
documents: list[Document],
top_k: int = 5
) -> RerankResult:
start = time.time()
# Create query-document pairs
pairs = [[query.text, doc.content] for doc in documents]
# Score all pairs
scores = self.model.predict(
pairs,
batch_size=self._config.batch_size
)
# Sort by score
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
reranked = []
for doc, score in scored_docs[:top_k]:
doc_copy = doc.model_copy()
doc_copy.score = float(score)
reranked.append(doc_copy)
latency = (time.time() - start) * 1000
return RerankResult(
query=query,
documents=reranked,
reranker_name=self.name,
latency_ms=latency
)Step 6: Generator Implementations
# generators/openai.py
from openai import OpenAI
import time
from core.interfaces import Generator, Document, Query, GenerationResult
from core.schemas import OpenAIGeneratorConfig, GeneratorType
from core.registry import ComponentRegistry
@ComponentRegistry.register_generator(GeneratorType.OPENAI)
class OpenAIGenerator(Generator[OpenAIGeneratorConfig]):
"""OpenAI-based generator."""
DEFAULT_SYSTEM_PROMPT = """You are a helpful assistant. Answer questions based
on the provided context. If the context doesn't contain enough information, say so.
Be concise and cite sources when possible."""
def __init__(self, config: dict):
self._config = OpenAIGeneratorConfig(**config)
super().__init__(self._config)
self.client = OpenAI(api_key=self._config.api_key)
def _validate_config(self) -> None:
pass
@property
def name(self) -> str:
return f"openai-{self._config.model}"
@property
def version(self) -> str:
return "1.0.0"
async def generate(
self,
query: Query,
documents: list[Document]
) -> GenerationResult:
start = time.time()
# Build context from documents
context = "\n\n---\n\n".join([
f"[Source: {doc.metadata.get('source', doc.id)}]\n{doc.content}"
for doc in documents
])
system_prompt = self._config.system_prompt or self.DEFAULT_SYSTEM_PROMPT
response = self.client.chat.completions.create(
model=self._config.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query.text}"}
],
temperature=self._config.temperature,
max_tokens=self._config.max_tokens
)
answer = response.choices[0].message.content
tokens = response.usage.total_tokens if response.usage else None
latency = (time.time() - start) * 1000
return GenerationResult(
query=query,
answer=answer,
sources=documents,
generator_name=self.name,
latency_ms=latency,
tokens_used=tokens
)# generators/anthropic.py
import anthropic
import time
from core.interfaces import Generator, Document, Query, GenerationResult
from core.schemas import AnthropicGeneratorConfig, GeneratorType
from core.registry import ComponentRegistry
@ComponentRegistry.register_generator(GeneratorType.ANTHROPIC)
class AnthropicGenerator(Generator[AnthropicGeneratorConfig]):
"""Anthropic-based generator."""
DEFAULT_SYSTEM_PROMPT = """You are a helpful assistant. Answer questions based
on the provided context. If the context doesn't contain enough information, say so.
Be concise and cite sources when possible."""
def __init__(self, config: dict):
self._config = AnthropicGeneratorConfig(**config)
super().__init__(self._config)
self.client = anthropic.Anthropic(api_key=self._config.api_key)
def _validate_config(self) -> None:
pass
@property
def name(self) -> str:
return f"anthropic-{self._config.model}"
@property
def version(self) -> str:
return "1.0.0"
async def generate(
self,
query: Query,
documents: list[Document]
) -> GenerationResult:
start = time.time()
context = "\n\n---\n\n".join([
f"[Source: {doc.metadata.get('source', doc.id)}]\n{doc.content}"
for doc in documents
])
system_prompt = self._config.system_prompt or self.DEFAULT_SYSTEM_PROMPT
response = self.client.messages.create(
model=self._config.model,
max_tokens=self._config.max_tokens,
system=system_prompt,
messages=[
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query.text}"}
]
)
answer = response.content[0].text
tokens = response.usage.input_tokens + response.usage.output_tokens
latency = (time.time() - start) * 1000
return GenerationResult(
query=query,
answer=answer,
sources=documents,
generator_name=self.name,
latency_ms=latency,
tokens_used=tokens
)Step 7: Pipeline Builder & Executor
# pipeline/builder.py
from core.interfaces import Retriever, Reranker, Generator, Chunker
from core.schemas import PipelineConfig
from core.registry import ComponentRegistry
class RAGPipeline:
"""A configured RAG pipeline."""
def __init__(
self,
config: PipelineConfig,
retriever: Retriever,
reranker: Reranker | None,
generator: Generator,
chunker: Chunker
):
self.config = config
self.retriever = retriever
self.reranker = reranker
self.generator = generator
self.chunker = chunker
@property
def name(self) -> str:
return self.config.name
class PipelineBuilder:
"""Builds RAG pipelines from configuration."""
@staticmethod
def build(config: PipelineConfig) -> RAGPipeline:
"""Build a pipeline from configuration."""
retriever = ComponentRegistry.get_retriever(
config.retriever_type,
config.retriever_config
)
reranker = ComponentRegistry.get_reranker(
config.reranker_type,
config.reranker_config
)
generator = ComponentRegistry.get_generator(
config.generator_type,
config.generator_config
)
chunker = ComponentRegistry.get_chunker(
config.chunker_type,
config.chunker_config
)
return RAGPipeline(
config=config,
retriever=retriever,
reranker=reranker,
generator=generator,
chunker=chunker
)
@staticmethod
def build_from_yaml(yaml_path: str) -> RAGPipeline:
"""Build a pipeline from a YAML configuration file."""
import yaml
with open(yaml_path) as f:
config_dict = yaml.safe_load(f)
config = PipelineConfig(**config_dict)
return PipelineBuilder.build(config)# pipeline/executor.py
import time
from pydantic import BaseModel
from core.interfaces import Document, Query, GenerationResult
from pipeline.builder import RAGPipeline
class PipelineMetrics(BaseModel):
"""Metrics from pipeline execution."""
total_latency_ms: float
retrieval_latency_ms: float
rerank_latency_ms: float | None
generation_latency_ms: float
documents_retrieved: int
documents_after_rerank: int
tokens_used: int | None
class PipelineResult(BaseModel):
"""Result from pipeline execution."""
answer: str
sources: list[dict]
pipeline_name: str
metrics: PipelineMetrics
class PipelineExecutor:
"""Executes RAG pipelines."""
def __init__(self, pipeline: RAGPipeline):
self.pipeline = pipeline
async def execute(self, query_text: str) -> PipelineResult:
"""Execute the pipeline for a query."""
start = time.time()
query = Query(text=query_text)
# Retrieval
retrieval_result = await self.pipeline.retriever.retrieve(
query=query,
k=self.pipeline.config.retrieval_k
)
documents = retrieval_result.documents
rerank_latency = None
# Reranking (optional)
if self.pipeline.reranker:
rerank_result = await self.pipeline.reranker.rerank(
query=query,
documents=documents,
top_k=self.pipeline.config.rerank_k
)
documents = rerank_result.documents
rerank_latency = rerank_result.latency_ms
else:
# Just take top k without reranking
documents = documents[:self.pipeline.config.rerank_k]
# Generation
generation_result = await self.pipeline.generator.generate(
query=query,
documents=documents
)
total_latency = (time.time() - start) * 1000
metrics = PipelineMetrics(
total_latency_ms=total_latency,
retrieval_latency_ms=retrieval_result.latency_ms,
rerank_latency_ms=rerank_latency,
generation_latency_ms=generation_result.latency_ms,
documents_retrieved=len(retrieval_result.documents),
documents_after_rerank=len(documents),
tokens_used=generation_result.tokens_used
)
return PipelineResult(
answer=generation_result.answer,
sources=[
{"id": d.id, "source": d.metadata.get("source", ""), "score": d.score}
for d in documents
],
pipeline_name=self.pipeline.name,
metrics=metrics
)
async def add_documents(
self,
texts: list[str],
sources: list[str]
) -> int:
"""Add documents to the pipeline's retriever."""
documents = []
doc_id = 0
for text, source in zip(texts, sources):
# Chunk the text
chunks = self.pipeline.chunker.chunk(
text,
metadata={"source": source}
)
for chunk in chunks:
chunk.id = f"doc_{doc_id}"
chunk.metadata["source"] = source
documents.append(chunk)
doc_id += 1
return await self.pipeline.retriever.add_documents(documents)Step 8: FastAPI Application
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
from typing import Any
from core.schemas import PipelineConfig, RetrieverType, GeneratorType, ChunkerType
from core.registry import ComponentRegistry
from pipeline.builder import PipelineBuilder
from pipeline.executor import PipelineExecutor, PipelineResult
# Import components to register them
import retrievers.chroma
import retrievers.hybrid
import rerankers.cohere
import rerankers.cross_encoder
import generators.openai
import generators.anthropic
import chunkers.recursive
# Global state
pipelines: dict[str, PipelineExecutor] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create default pipeline
default_config = PipelineConfig(
name="default",
description="Default RAG pipeline",
retriever_type=RetrieverType.CHROMA,
retriever_config={
"collection_name": "default_docs",
"persist_directory": "./chroma_db"
},
generator_type=GeneratorType.OPENAI,
generator_config={
"model": "gpt-4o-mini"
},
chunker_type=ChunkerType.RECURSIVE,
chunker_config={
"chunk_size": 512,
"chunk_overlap": 50
}
)
pipeline = PipelineBuilder.build(default_config)
pipelines["default"] = PipelineExecutor(pipeline)
yield
pipelines.clear()
app = FastAPI(
title="Modular RAG API",
description="Composable RAG framework with swappable components",
lifespan=lifespan
)
class QueryRequest(BaseModel):
query: str
pipeline: str = "default"
class DocumentsRequest(BaseModel):
documents: list[str]
sources: list[str]
pipeline: str = "default"
class CreatePipelineRequest(BaseModel):
config: dict[str, Any]
@app.post("/query", response_model=PipelineResult)
async def query(request: QueryRequest):
"""Query a RAG pipeline."""
if request.pipeline not in pipelines:
raise HTTPException(
status_code=404,
detail=f"Pipeline not found: {request.pipeline}"
)
executor = pipelines[request.pipeline]
result = await executor.execute(request.query)
return result
@app.post("/documents")
async def add_documents(request: DocumentsRequest):
"""Add documents to a pipeline."""
if request.pipeline not in pipelines:
raise HTTPException(
status_code=404,
detail=f"Pipeline not found: {request.pipeline}"
)
if len(request.documents) != len(request.sources):
raise HTTPException(
status_code=400,
detail="Documents and sources must have same length"
)
executor = pipelines[request.pipeline]
count = await executor.add_documents(request.documents, request.sources)
return {"status": "success", "chunks_added": count}
@app.post("/pipelines")
async def create_pipeline(request: CreatePipelineRequest):
"""Create a new RAG pipeline."""
try:
config = PipelineConfig(**request.config)
pipeline = PipelineBuilder.build(config)
pipelines[config.name] = PipelineExecutor(pipeline)
return {"status": "success", "pipeline": config.name}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/pipelines")
async def list_pipelines():
"""List all available pipelines."""
return {
"pipelines": list(pipelines.keys()),
"components": ComponentRegistry.list_components()
}
@app.delete("/pipelines/{name}")
async def delete_pipeline(name: str):
"""Delete a pipeline."""
if name == "default":
raise HTTPException(status_code=400, detail="Cannot delete default pipeline")
if name not in pipelines:
raise HTTPException(status_code=404, detail=f"Pipeline not found: {name}")
del pipelines[name]
return {"status": "success", "deleted": name}
@app.get("/health")
async def health():
return {"status": "healthy", "service": "modular-rag"}Step 9: Requirements
# requirements.txt
openai>=1.12.0
anthropic>=0.18.0
chromadb>=0.4.22
pinecone-client>=3.0.0
cohere>=4.0.0
sentence-transformers>=2.2.0
rank-bm25>=0.2.2
pydantic>=2.0.0
pydantic-settings>=2.0.0
fastapi>=0.109.0
uvicorn>=0.27.0
pyyaml>=6.0.0
numpy>=1.24.0Usage Examples
Create Custom Pipeline via YAML
# pipelines/hybrid_cohere.yaml
name: hybrid-cohere
description: Hybrid search with Cohere reranking
retriever_type: hybrid
retriever_config:
vector_weight: 0.7
bm25_weight: 0.3
vector_config:
collection_name: hybrid_docs
persist_directory: ./chroma_hybrid
reranker_type: cohere
reranker_config:
model: rerank-english-v3.0
api_key: ${COHERE_API_KEY}
generator_type: openai
generator_config:
model: gpt-4o
temperature: 0.3
api_key: ${OPENAI_API_KEY}
chunker_type: recursive
chunker_config:
chunk_size: 512
chunk_overlap: 50
retrieval_k: 20
rerank_k: 5Python Usage
from pipeline.builder import PipelineBuilder
from pipeline.executor import PipelineExecutor
# Build from YAML
pipeline = PipelineBuilder.build_from_yaml("pipelines/hybrid_cohere.yaml")
executor = PipelineExecutor(pipeline)
# Add documents
await executor.add_documents(
texts=["Document content..."],
sources=["source_name"]
)
# Query
result = await executor.execute("What is machine learning?")
print(f"Answer: {result.answer}")
print(f"Total latency: {result.metrics.total_latency_ms:.2f}ms")
print(f"Retrieval: {result.metrics.retrieval_latency_ms:.2f}ms")
print(f"Rerank: {result.metrics.rerank_latency_ms:.2f}ms")
print(f"Generation: {result.metrics.generation_latency_ms:.2f}ms")A/B Testing Pipelines
import asyncio
from pipeline.builder import PipelineBuilder
from pipeline.executor import PipelineExecutor
# Create two pipeline variants
config_a = PipelineConfig(
name="variant_a",
retriever_type=RetrieverType.CHROMA,
reranker_type=RerankerType.COHERE,
# ...
)
config_b = PipelineConfig(
name="variant_b",
retriever_type=RetrieverType.HYBRID,
reranker_type=RerankerType.CROSS_ENCODER,
# ...
)
pipeline_a = PipelineExecutor(PipelineBuilder.build(config_a))
pipeline_b = PipelineExecutor(PipelineBuilder.build(config_b))
# Compare results
query = "Explain neural networks"
result_a, result_b = await asyncio.gather(
pipeline_a.execute(query),
pipeline_b.execute(query)
)
print(f"Pipeline A: {result_a.metrics.total_latency_ms:.2f}ms")
print(f"Pipeline B: {result_b.metrics.total_latency_ms:.2f}ms")Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ MODULAR RAG ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CONFIGURATION LAYER │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ YAML Config│ │Environment │ │ API Request│ │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │
│ └─────────────────┼─────────────────┘ │
│ ▼ │
│ ─────────────────────────────────────────────────────────────────────────│
│ │
│ COMPONENT REGISTRY │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │Retriever Registry│ │Reranker Registry│ │Generator Registry│ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ─────────────────────────────────────────────────────────────────────────│
│ │ │ │ │
│ PLUGGABLE COMPONENTS │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Retrievers │ │ Rerankers │ │ Generators │ │
│ │ • ChromaDB │ │ • Cohere │ │ • OpenAI │ │
│ │ • Pinecone │ │ • CrossEncoder │ │ • Anthropic │ │
│ │ • Hybrid │ │ • LLM-based │ │ • Local LLM │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ─────────────────────────────────────────────────────────────────────────│
│ │ │ │ │
│ PIPELINE EXECUTION │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ Pipeline Builder ───► Pipeline Executor │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Abstract Interfaces | Base classes for each component type | Guarantees consistent API across implementations |
| Component Registry | Central catalog of available components | Factory pattern for dynamic instantiation |
| YAML Configuration | Declarative pipeline definition | Non-developers can experiment, version control configs |
| Pipeline Builder | Creates pipelines from config | Validates and wires components together |
| Retriever Swapping | ChromaDB ↔ Pinecone ↔ Hybrid | Test performance without code changes |
| Reranker Options | Cohere ↔ CrossEncoder ↔ LLM | Balance speed vs accuracy per use case |
| A/B Testing | Run multiple configs simultaneously | Data-driven component selection |
| Metrics Collection | Track latency, tokens, quality | Compare configurations quantitatively |
Next Steps
- Add caching layer as a component
- Implement evaluation component for quality metrics
- Create pipeline versioning for reproducibility
- Build streaming generators for real-time responses
- Explore Agentic RAG for autonomous pipelines