SLM-Powered RAG
Build efficient RAG pipelines with local small models
SLM-Powered RAG
TL;DR
Build local RAG with ChromaDB for vector storage, BM25 for sparse search, and combine them using Reciprocal Rank Fusion (RRF). Use cross-encoder reranking for quality and local SLMs (Phi-3, Qwen2.5) for generation. Result: $0/query, 100-500ms latency, complete privacy.
Build a complete Retrieval-Augmented Generation system that runs entirely locally using small language models. Create privacy-preserving, cost-effective RAG pipelines optimized for latency and accuracy.
Project Overview
| Aspect | Details |
|---|---|
| Difficulty | Intermediate |
| Time | 6-8 hours |
| Prerequisites | Local SLM Setup, RAG Basics |
| What You'll Build | Complete local RAG system with hybrid search and reranking |
What You'll Learn
- Local embedding models (BGE, E5, GTE)
- ChromaDB for vector storage
- Complete RAG pipeline with SLMs
- Hybrid search (dense + sparse)
- Local reranking for quality
- Latency optimization techniques
- Privacy-preserving architecture
Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ Local SLM RAG Pipeline │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ INGESTION PHASE │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────────────┐ │
│ │ Documents │───►│ Chunker │───►│ Local Embeddings │ │
│ └─────────────┘ └──────┬──────┘ └─────────────┬───────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ BM25 Index │ │ ChromaDB │ │
│ └───────────────┘ └───────────────┘ │
│ │
│ QUERY PHASE │
│ ┌─────────────┐ │
│ │ User Query │─────────────────────────────┐ │
│ └──────┬──────┘ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────┐ │
│ │ Query Embedding │ │Sparse Search│ │
│ └────────┬────────┘ └──────┬──────┘ │
│ ▼ │ │
│ ┌─────────────────┐ │ │
│ │ Dense Search │ │ │
│ └────────┬────────┘ │ │
│ │ ┌─────────────────┐ │ │
│ └────────►│ RRF Fusion │◄─────┘ │
│ └────────┬────────┘ │
│ ▼ │
│ ┌─────────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Local Reranker │───►│Local SLM │───►│ Response │ │
│ └─────────────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Why Local RAG?
| Aspect | Cloud RAG | Local SLM RAG |
|---|---|---|
| Cost | $0.01-0.10/query | $0 after setup |
| Latency | 500-2000ms | 100-500ms |
| Privacy | Data leaves device | Data stays local |
| Offline | Requires internet | Works offline |
| Control | Limited | Full control |
Project Setup
Install Dependencies
# Create project directory
mkdir slm-rag && cd slm-rag
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install dependencies
pip install chromadb sentence-transformers ollama langchain langchain-community
pip install rank_bm25 fastapi uvicorn python-multipart
pip install pypdf docx2txt tiktoken richPull Required Models
# Embedding model (run in Ollama)
ollama pull nomic-embed-text # 274MB - Good quality/speed balance
ollama pull mxbai-embed-large # 669MB - Higher quality
# Generation models
ollama pull phi3:mini # 2.3GB
ollama pull qwen2.5:3b # 2.0GB
ollama pull llama3.2:3b # 2.0GB
# Reranker (optional - we'll use cross-encoder)
pip install sentence-transformersPart 1: Local Embedding Models
Embedding Provider
Create a unified interface for different embedding sources.
# embeddings.py
from typing import List, Union
from abc import ABC, abstractmethod
import numpy as np
class EmbeddingProvider(ABC):
"""Base class for embedding providers."""
@abstractmethod
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed multiple documents."""
pass
@abstractmethod
def embed_query(self, text: str) -> List[float]:
"""Embed a single query."""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Return embedding dimension."""
pass
class OllamaEmbeddings(EmbeddingProvider):
"""Embeddings using Ollama."""
def __init__(self, model: str = "nomic-embed-text"):
self.model = model
self._dimension = None
import ollama
self.client = ollama
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed multiple documents."""
embeddings = []
for text in texts:
response = self.client.embeddings(
model=self.model,
prompt=text
)
embeddings.append(response["embedding"])
return embeddings
def embed_query(self, text: str) -> List[float]:
"""Embed a single query."""
response = self.client.embeddings(
model=self.model,
prompt=text
)
return response["embedding"]
@property
def dimension(self) -> int:
if self._dimension is None:
# Get dimension from a test embedding
test = self.embed_query("test")
self._dimension = len(test)
return self._dimension
class SentenceTransformerEmbeddings(EmbeddingProvider):
"""Embeddings using sentence-transformers."""
def __init__(self, model: str = "BAAI/bge-small-en-v1.5"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model)
self._dimension = self.model.get_sentence_embedding_dimension()
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed multiple documents with batching."""
embeddings = self.model.encode(
texts,
batch_size=32,
show_progress_bar=True,
convert_to_numpy=True
)
return embeddings.tolist()
def embed_query(self, text: str) -> List[float]:
"""Embed a single query."""
embedding = self.model.encode(text, convert_to_numpy=True)
return embedding.tolist()
@property
def dimension(self) -> int:
return self._dimension
# Recommended models for different use cases
EMBEDDING_MODELS = {
# Ollama models
"nomic-embed-text": {
"provider": "ollama",
"dimension": 768,
"description": "Good balance of quality and speed"
},
"mxbai-embed-large": {
"provider": "ollama",
"dimension": 1024,
"description": "Higher quality, larger"
},
# Sentence-transformers models
"BAAI/bge-small-en-v1.5": {
"provider": "sentence-transformers",
"dimension": 384,
"description": "Fast, good for English"
},
"BAAI/bge-base-en-v1.5": {
"provider": "sentence-transformers",
"dimension": 768,
"description": "Balanced quality/speed"
},
"thenlper/gte-small": {
"provider": "sentence-transformers",
"dimension": 384,
"description": "Good general-purpose"
},
}
def get_embeddings(model_name: str = "nomic-embed-text") -> EmbeddingProvider:
"""Factory function to get embedding provider."""
if model_name.startswith("BAAI/") or model_name.startswith("thenlper/"):
return SentenceTransformerEmbeddings(model_name)
else:
return OllamaEmbeddings(model_name)What's Happening Here?
The Embedding Provider creates a unified interface for different local embedding models:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Embedding Provider Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ get_embeddings("nomic-embed-text") ──► OllamaEmbeddings │
│ get_embeddings("BAAI/bge-small") ──► SentenceTransformerEmbeddings │
│ │
│ Both implement the same interface: │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ embed_documents([texts]) ──► [[0.1, 0.2, ...], [0.3, 0.1, ...]] ││
│ │ embed_query("question") ──► [0.2, 0.4, ...] ││
│ │ dimension ──► 768 (or 384, 1024, etc.) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Why this matters: Swap embedding models without changing any other code! │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Embedding Model Selection Guide:
| Model | Dimension | Speed | Quality | Best For |
|---|---|---|---|---|
| nomic-embed-text | 768 | Fast | Good | General purpose, default choice |
| mxbai-embed-large | 1024 | Medium | High | When quality matters most |
| BAAI/bge-small | 384 | Very Fast | Medium | Speed-critical, resource-constrained |
| BAAI/bge-base | 768 | Medium | Good | Balanced alternative to nomic |
| thenlper/gte-small | 384 | Very Fast | Good | Good for limited memory |
Understanding Embedding Dimensions:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Dimension Impact on Performance │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Higher Dimension (1024) Lower Dimension (384) │
│ ──────────────────────── ───────────────────── │
│ ✓ More semantic nuance ✓ Faster similarity computation │
│ ✓ Better for complex queries ✓ Less memory per vector │
│ ✗ Slower similarity search ✓ Smaller index size on disk │
│ ✗ More RAM/VRAM needed ✗ May lose subtle distinctions │
│ │
│ Memory Formula: │
│ Storage = num_vectors × dimension × 4 bytes (float32) │
│ │
│ Example: 100,000 documents │
│ • 384-dim: 100K × 384 × 4 = ~150 MB │
│ • 768-dim: 100K × 768 × 4 = ~300 MB │
│ • 1024-dim: 100K × 1024 × 4 = ~400 MB │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Part 2: Document Processing
Chunking Strategy
# chunker.py
from typing import List, Dict, Any
from dataclasses import dataclass
import re
@dataclass
class Chunk:
"""A document chunk with metadata."""
text: str
metadata: Dict[str, Any]
chunk_id: str
class DocumentChunker:
"""Smart document chunking with overlap."""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 50,
min_chunk_size: int = 100,
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
def chunk_text(
self,
text: str,
metadata: Dict[str, Any] = None
) -> List[Chunk]:
"""Chunk text into overlapping pieces."""
metadata = metadata or {}
# Clean text
text = self._clean_text(text)
# Split into sentences
sentences = self._split_sentences(text)
chunks = []
current_chunk = []
current_length = 0
chunk_idx = 0
for sentence in sentences:
sentence_length = len(sentence)
if current_length + sentence_length > self.chunk_size and current_chunk:
# Save current chunk
chunk_text = " ".join(current_chunk)
if len(chunk_text) >= self.min_chunk_size:
chunks.append(Chunk(
text=chunk_text,
metadata={
**metadata,
"chunk_index": chunk_idx,
},
chunk_id=f"{metadata.get('doc_id', 'doc')}_{chunk_idx}"
))
chunk_idx += 1
# Start new chunk with overlap
overlap_sentences = self._get_overlap(current_chunk)
current_chunk = overlap_sentences
current_length = sum(len(s) for s in current_chunk)
current_chunk.append(sentence)
current_length += sentence_length
# Don't forget last chunk
if current_chunk:
chunk_text = " ".join(current_chunk)
if len(chunk_text) >= self.min_chunk_size:
chunks.append(Chunk(
text=chunk_text,
metadata={
**metadata,
"chunk_index": chunk_idx,
},
chunk_id=f"{metadata.get('doc_id', 'doc')}_{chunk_idx}"
))
return chunks
def _clean_text(self, text: str) -> str:
"""Clean and normalize text."""
# Replace multiple newlines with single
text = re.sub(r'\n+', '\n', text)
# Replace multiple spaces with single
text = re.sub(r' +', ' ', text)
return text.strip()
def _split_sentences(self, text: str) -> List[str]:
"""Split text into sentences."""
# Simple sentence splitting
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def _get_overlap(self, sentences: List[str]) -> List[str]:
"""Get overlap sentences for next chunk."""
overlap_length = 0
overlap_sentences = []
for sentence in reversed(sentences):
if overlap_length >= self.chunk_overlap:
break
overlap_sentences.insert(0, sentence)
overlap_length += len(sentence)
return overlap_sentences
class DocumentLoader:
"""Load documents from various formats."""
@staticmethod
def load_pdf(filepath: str) -> str:
"""Load PDF file."""
import pypdf
reader = pypdf.PdfReader(filepath)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
@staticmethod
def load_txt(filepath: str) -> str:
"""Load text file."""
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
@staticmethod
def load_docx(filepath: str) -> str:
"""Load Word document."""
import docx2txt
return docx2txt.process(filepath)
@classmethod
def load(cls, filepath: str) -> str:
"""Load document based on extension."""
ext = filepath.lower().split('.')[-1]
loaders = {
'pdf': cls.load_pdf,
'txt': cls.load_txt,
'md': cls.load_txt,
'docx': cls.load_docx,
}
loader = loaders.get(ext)
if not loader:
raise ValueError(f"Unsupported file type: {ext}")
return loader(filepath)What's Happening Here?
The DocumentChunker splits documents into semantically meaningful pieces:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Sentence-Based Chunking with Overlap │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Original Document: │
│ "Machine learning is a field of AI. It enables systems to learn from │
│ data. Deep learning uses neural networks. These networks have many │
│ layers. They can learn complex patterns." │
│ │
│ Split into sentences: │
│ [S1] "Machine learning is a field of AI." │
│ [S2] "It enables systems to learn from data." │
│ [S3] "Deep learning uses neural networks." │
│ [S4] "These networks have many layers." │
│ [S5] "They can learn complex patterns." │
│ │
│ With chunk_size=100 and overlap=30: │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Chunk 0: [S1] + [S2] = "Machine learning is a field of AI. It enables │││
│ │ systems to learn from data." │││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ ↓ │
│ [S2] is in BOTH chunks (overlap!) │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Chunk 1: [S2] + [S3] + [S4] = "It enables systems to learn from data. │││
│ │ Deep learning uses neural networks. These networks have..." │││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Why overlap? If someone asks "What enables systems to learn?" the answer │
│ context is preserved in Chunk 1 even though S2 started in Chunk 0. │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Chunking Parameters and Their Effects:
| Parameter | Default | Effect of Increasing | Effect of Decreasing |
|---|---|---|---|
chunk_size | 512 | More context per chunk, fewer chunks | More chunks, finer granularity |
chunk_overlap | 50 | Better context preservation | Less redundancy |
min_chunk_size | 100 | Filters short fragments | Keeps all content |
Choosing Chunk Size:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Chunk Size Trade-offs │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Small chunks (100-256 chars): │
│ ✓ Precise retrieval for specific facts │
│ ✓ Works well with limited context windows │
│ ✗ May lose surrounding context │
│ ✗ More chunks = slower search │
│ │
│ Large chunks (512-1024 chars): │
│ ✓ Preserves context and relationships │
│ ✓ Fewer chunks = faster search │
│ ✗ May include irrelevant content in retrieval │
│ ✗ Consumes more of generation context window │
│ │
│ Rule of thumb: chunk_size ≈ (context_window / num_retrieved) / 2 │
│ Example: 4096 tokens, retrieve 5 chunks → ~400 chars/chunk │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Part 3: Vector Store with ChromaDB
ChromaDB Integration
# vector_store.py
from typing import List, Dict, Any, Optional
import chromadb
from chromadb.config import Settings
from dataclasses import dataclass
import uuid
from embeddings import EmbeddingProvider, get_embeddings
from chunker import Chunk
@dataclass
class SearchResult:
"""A search result with score."""
text: str
metadata: Dict[str, Any]
score: float
chunk_id: str
class LocalVectorStore:
"""ChromaDB-based local vector store."""
def __init__(
self,
collection_name: str = "documents",
persist_directory: str = "./chroma_db",
embedding_model: str = "nomic-embed-text",
):
self.collection_name = collection_name
self.embeddings = get_embeddings(embedding_model)
# Initialize ChromaDB
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
# Get or create collection
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
def add_chunks(self, chunks: List[Chunk]) -> int:
"""Add chunks to the vector store."""
if not chunks:
return 0
texts = [c.text for c in chunks]
ids = [c.chunk_id for c in chunks]
metadatas = [c.metadata for c in chunks]
# Generate embeddings
embeddings = self.embeddings.embed_documents(texts)
# Add to collection
self.collection.add(
embeddings=embeddings,
documents=texts,
metadatas=metadatas,
ids=ids
)
return len(chunks)
def search(
self,
query: str,
k: int = 5,
filter: Dict[str, Any] = None,
) -> List[SearchResult]:
"""Search for similar documents."""
# Embed query
query_embedding = self.embeddings.embed_query(query)
# Search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k,
where=filter,
include=["documents", "metadatas", "distances"]
)
# Convert to SearchResult objects
search_results = []
for i in range(len(results["documents"][0])):
# Convert distance to similarity score
distance = results["distances"][0][i]
score = 1 - distance # Cosine distance to similarity
search_results.append(SearchResult(
text=results["documents"][0][i],
metadata=results["metadatas"][0][i],
score=score,
chunk_id=results["ids"][0][i]
))
return search_results
def delete_collection(self):
"""Delete the collection."""
self.client.delete_collection(self.collection_name)
def get_stats(self) -> Dict[str, Any]:
"""Get collection statistics."""
return {
"name": self.collection_name,
"count": self.collection.count(),
"embedding_model": type(self.embeddings).__name__,
}Part 4: Hybrid Search
BM25 Sparse Search
# sparse_search.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import pickle
from pathlib import Path
import re
from rank_bm25 import BM25Okapi
from chunker import Chunk
@dataclass
class SparseSearchResult:
"""Sparse search result."""
text: str
metadata: Dict[str, Any]
score: float
chunk_id: str
class BM25Index:
"""BM25 sparse retrieval index."""
def __init__(self, persist_path: str = "./bm25_index"):
self.persist_path = Path(persist_path)
self.persist_path.mkdir(exist_ok=True)
self.chunks: List[Chunk] = []
self.bm25: Optional[BM25Okapi] = None
self.tokenized_corpus: List[List[str]] = []
def _tokenize(self, text: str) -> List[str]:
"""Simple tokenization."""
# Lowercase and split on non-alphanumeric
text = text.lower()
tokens = re.findall(r'\b\w+\b', text)
return tokens
def add_chunks(self, chunks: List[Chunk]):
"""Add chunks to the index."""
for chunk in chunks:
self.chunks.append(chunk)
tokens = self._tokenize(chunk.text)
self.tokenized_corpus.append(tokens)
# Rebuild BM25 index
if self.tokenized_corpus:
self.bm25 = BM25Okapi(self.tokenized_corpus)
def search(self, query: str, k: int = 5) -> List[SparseSearchResult]:
"""Search using BM25."""
if not self.bm25:
return []
query_tokens = self._tokenize(query)
scores = self.bm25.get_scores(query_tokens)
# Get top-k indices
top_indices = sorted(
range(len(scores)),
key=lambda i: scores[i],
reverse=True
)[:k]
results = []
for idx in top_indices:
if scores[idx] > 0:
chunk = self.chunks[idx]
results.append(SparseSearchResult(
text=chunk.text,
metadata=chunk.metadata,
score=float(scores[idx]),
chunk_id=chunk.chunk_id
))
return results
def save(self):
"""Save index to disk."""
data = {
"chunks": self.chunks,
"tokenized_corpus": self.tokenized_corpus,
}
with open(self.persist_path / "bm25_data.pkl", 'wb') as f:
pickle.dump(data, f)
def load(self):
"""Load index from disk."""
path = self.persist_path / "bm25_data.pkl"
if path.exists():
with open(path, 'rb') as f:
data = pickle.load(f)
self.chunks = data["chunks"]
self.tokenized_corpus = data["tokenized_corpus"]
if self.tokenized_corpus:
self.bm25 = BM25Okapi(self.tokenized_corpus)
class HybridSearcher:
"""Combine dense and sparse search."""
def __init__(
self,
dense_store: 'LocalVectorStore',
sparse_index: BM25Index,
dense_weight: float = 0.7,
):
self.dense_store = dense_store
self.sparse_index = sparse_index
self.dense_weight = dense_weight
self.sparse_weight = 1 - dense_weight
def search(
self,
query: str,
k: int = 5,
dense_k: int = 10,
sparse_k: int = 10,
) -> List[SearchResult]:
"""Hybrid search with reciprocal rank fusion."""
# Get results from both
dense_results = self.dense_store.search(query, k=dense_k)
sparse_results = self.sparse_index.search(query, k=sparse_k)
# Create score maps
scores = {}
# Add dense scores with RRF
for rank, result in enumerate(dense_results):
rrf_score = 1 / (60 + rank) # RRF constant = 60
scores[result.chunk_id] = {
"dense_rrf": rrf_score * self.dense_weight,
"text": result.text,
"metadata": result.metadata,
}
# Add sparse scores with RRF
for rank, result in enumerate(sparse_results):
rrf_score = 1 / (60 + rank)
if result.chunk_id in scores:
scores[result.chunk_id]["sparse_rrf"] = rrf_score * self.sparse_weight
else:
scores[result.chunk_id] = {
"sparse_rrf": rrf_score * self.sparse_weight,
"dense_rrf": 0,
"text": result.text,
"metadata": result.metadata,
}
# Calculate final scores
final_results = []
for chunk_id, data in scores.items():
total_score = data.get("dense_rrf", 0) + data.get("sparse_rrf", 0)
final_results.append(SearchResult(
text=data["text"],
metadata=data["metadata"],
score=total_score,
chunk_id=chunk_id
))
# Sort by score and return top k
final_results.sort(key=lambda x: x.score, reverse=True)
return final_results[:k]What's Happening Here?
Hybrid Search combines dense (semantic) and sparse (keyword) retrieval for better results:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Hybrid Search Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Query: "What is the BM25 algorithm?" │
│ │
│ ┌─────────────────────────┐ ┌──────────────────────────┐ │
│ │ Dense Search (ChromaDB) │ │ Sparse Search (BM25) │ │
│ ├─────────────────────────┤ ├──────────────────────────┤ │
│ │ Finds semantically │ │ Finds exact keyword │ │
│ │ similar content │ │ matches │ │
│ │ │ │ │ │
│ │ Results: │ │ Results: │ │
│ │ 1. Doc about ranking │ │ 1. Doc with "BM25" term │ │
│ │ 2. Doc about retrieval │ │ 2. Doc about algorithms │ │
│ │ 3. Doc about search │ │ 3. Doc about scoring │ │
│ └────────────┬────────────┘ └────────────┬─────────────┘ │
│ │ │ │
│ └──────────┬─────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Reciprocal Rank Fusion (RRF) │ │
│ │ │ │
│ │ RRF formula: score = 1 / (k + rank) │ │
│ │ where k = 60 (constant to prevent division │ │
│ │ by small numbers) │ │
│ │ │ │
│ │ Dense rank 1: 1/(60+1) = 0.0164 │ │
│ │ Dense rank 2: 1/(60+2) = 0.0161 │ │
│ │ Sparse rank 1: 1/(60+1) = 0.0164 │ │
│ │ │ │
│ │ Doc in both lists gets BOTH scores added! │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Why Hybrid Search Works Better:
| Query Type | Dense Search | Sparse Search | Hybrid |
|---|---|---|---|
| "machine learning" | ✓ Finds related concepts | ✓ Exact match | ✓✓ Both |
| "What's similar to ML?" | ✓ Semantic understanding | ✗ No exact terms | ✓ Dense helps |
| "BM25Okapi class" | ✗ Rare term not trained | ✓ Exact keyword match | ✓ Sparse helps |
| Typos: "machien lerning" | ✓ Still similar | ✗ Wrong tokens | ✓ Dense helps |
Understanding RRF Weighting:
┌─────────────────────────────────────────────────────────────────────────────┐
│ RRF Score Calculation Example │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ dense_weight = 0.7, sparse_weight = 0.3 │
│ │
│ Document A: │
│ • Dense rank: 1 → RRF = 1/61 = 0.0164 × 0.7 = 0.0115 │
│ • Sparse rank: 3 → RRF = 1/63 = 0.0159 × 0.3 = 0.0048 │
│ • Total: 0.0115 + 0.0048 = 0.0163 │
│ │
│ Document B: │
│ • Dense rank: 5 → RRF = 1/65 = 0.0154 × 0.7 = 0.0108 │
│ • Sparse rank: 1 → RRF = 1/61 = 0.0164 × 0.3 = 0.0049 │
│ • Total: 0.0108 + 0.0049 = 0.0157 │
│ │
│ Document C (only in dense): │
│ • Dense rank: 2 → RRF = 1/62 = 0.0161 × 0.7 = 0.0113 │
│ • Sparse rank: - → 0 │
│ • Total: 0.0113 │
│ │
│ Final ranking: A (0.0163) > C (0.0113) > B (0.0157) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Part 5: Local Reranking
Cross-Encoder Reranker
# reranker.py
from typing import List, Tuple
from sentence_transformers import CrossEncoder
from dataclasses import dataclass
from vector_store import SearchResult
@dataclass
class RerankedResult:
"""Result after reranking."""
text: str
metadata: dict
original_score: float
rerank_score: float
chunk_id: str
class LocalReranker:
"""Local cross-encoder reranker."""
# Lightweight reranker models
MODELS = {
"tiny": "cross-encoder/ms-marco-TinyBERT-L-2-v2", # 17MB
"mini": "cross-encoder/ms-marco-MiniLM-L-6-v2", # 90MB
"small": "BAAI/bge-reranker-base", # 278MB
}
def __init__(self, model_size: str = "mini"):
model_name = self.MODELS.get(model_size, self.MODELS["mini"])
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
results: List[SearchResult],
top_k: int = 5,
) -> List[RerankedResult]:
"""Rerank search results using cross-encoder."""
if not results:
return []
# Prepare pairs for scoring
pairs = [(query, r.text) for r in results]
# Get reranking scores
scores = self.model.predict(pairs)
# Create reranked results
reranked = []
for result, score in zip(results, scores):
reranked.append(RerankedResult(
text=result.text,
metadata=result.metadata,
original_score=result.score,
rerank_score=float(score),
chunk_id=result.chunk_id
))
# Sort by rerank score
reranked.sort(key=lambda x: x.rerank_score, reverse=True)
return reranked[:top_k]
class LLMReranker:
"""Use SLM for reranking (slower but more flexible)."""
def __init__(self, model: str = "phi3:mini"):
import ollama
self.client = ollama
self.model = model
def rerank(
self,
query: str,
results: List[SearchResult],
top_k: int = 5,
) -> List[RerankedResult]:
"""Rerank using LLM scoring."""
if not results:
return []
reranked = []
for result in results:
# Score each result
prompt = f"""Rate the relevance of the following passage to the query on a scale of 0-10.
Query: {query}
Passage: {result.text[:500]}
Return ONLY a number from 0-10. Score:"""
response = self.client.chat(
model=self.model,
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0}
)
try:
score = float(response["message"]["content"].strip())
score = max(0, min(10, score)) # Clamp to 0-10
except ValueError:
score = 5.0 # Default score
reranked.append(RerankedResult(
text=result.text,
metadata=result.metadata,
original_score=result.score,
rerank_score=score / 10.0, # Normalize to 0-1
chunk_id=result.chunk_id
))
reranked.sort(key=lambda x: x.rerank_score, reverse=True)
return reranked[:top_k]What's Happening Here?
Reranking uses a cross-encoder to score query-document pairs directly:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Bi-Encoder vs Cross-Encoder │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ BI-ENCODER (used for retrieval): │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ Query ──► Encoder ──► [0.2, 0.4, ...] ─┐ ││
│ │ ├──► cosine similarity = 0.85 ││
│ │ Doc ────► Encoder ──► [0.3, 0.5, ...] ─┘ ││
│ │ ││
│ │ ✓ Fast: encode once, compare many ││
│ │ ✗ Can't see query-doc interaction ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ CROSS-ENCODER (used for reranking): │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ [CLS] Query [SEP] Document [SEP] ││
│ │ │ ││
│ │ ▼ ││
│ │ BERT/Transformer (sees BOTH at once) ││
│ │ │ ││
│ │ ▼ ││
│ │ Relevance Score: 0.92 ││
│ │ ││
│ │ ✓ Sees interaction between query and doc ││
│ │ ✓ More accurate relevance scoring ││
│ │ ✗ Slower: must process each pair separately ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘Cross-Encoder Model Selection:
| Model | Size | Speed | Quality | Best For |
|---|---|---|---|---|
| TinyBERT-L-2 | 17MB | Very Fast | Good | Resource-constrained, high volume |
| MiniLM-L-6 | 90MB | Fast | Better | Balanced (recommended default) |
| BGE-Reranker-Base | 278MB | Medium | Best | When quality is priority |
Reranking Flow:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Reranking Improves Retrieval Quality │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Initial Retrieval (top 10): After Reranking (top 5): │
│ ┌──────────────────────────────┐ ┌──────────────────────────────┐ │
│ │ 1. Doc A (score: 0.92) │ │ 1. Doc C (rerank: 0.95) ★ │ │
│ │ 2. Doc B (score: 0.88) │ │ 2. Doc A (rerank: 0.91) │ │
│ │ 3. Doc C (score: 0.85) ★ │ ──► │ 3. Doc E (rerank: 0.87) ★ │ │
│ │ 4. Doc D (score: 0.82) │ │ 4. Doc B (rerank: 0.83) │ │
│ │ 5. Doc E (score: 0.80) ★ │ │ 5. Doc G (rerank: 0.79) │ │
│ │ 6. Doc F (score: 0.78) │ └──────────────────────────────┘ │
│ │ 7. Doc G (score: 0.75) │ │
│ │ 8. Doc H (score: 0.72) │ ★ = Actually relevant documents │
│ │ 9. Doc I (score: 0.70) │ │
│ │ 10. Doc J (score: 0.68) │ Relevant docs moved from positions │
│ └──────────────────────────────┘ 3,5 to positions 1,3! │
│ │
│ Why this matters: LLM only sees top K documents. Better ranking = │
│ more relevant context = better answers. │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Part 6: Complete RAG Pipeline
RAG System
# rag_pipeline.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import ollama
from pathlib import Path
from embeddings import get_embeddings
from chunker import DocumentChunker, DocumentLoader, Chunk
from vector_store import LocalVectorStore, SearchResult
from sparse_search import BM25Index, HybridSearcher
from reranker import LocalReranker, RerankedResult
@dataclass
class RAGResponse:
"""Response from RAG pipeline."""
answer: str
sources: List[Dict[str, Any]]
query: str
latency_ms: float
class LocalRAGPipeline:
"""Complete local RAG pipeline."""
def __init__(
self,
collection_name: str = "documents",
embedding_model: str = "nomic-embed-text",
generation_model: str = "phi3:mini",
persist_dir: str = "./rag_data",
use_hybrid_search: bool = True,
use_reranking: bool = True,
):
self.persist_dir = Path(persist_dir)
self.persist_dir.mkdir(exist_ok=True)
# Initialize components
self.chunker = DocumentChunker(chunk_size=512, chunk_overlap=50)
self.vector_store = LocalVectorStore(
collection_name=collection_name,
persist_directory=str(self.persist_dir / "chroma"),
embedding_model=embedding_model,
)
self.use_hybrid = use_hybrid_search
if use_hybrid_search:
self.bm25_index = BM25Index(str(self.persist_dir / "bm25"))
self.bm25_index.load()
self.hybrid_searcher = HybridSearcher(
self.vector_store,
self.bm25_index,
dense_weight=0.7
)
self.use_reranking = use_reranking
if use_reranking:
self.reranker = LocalReranker(model_size="mini")
self.generation_model = generation_model
def ingest_document(
self,
filepath: str,
metadata: Dict[str, Any] = None
) -> int:
"""Ingest a document into the RAG system."""
import hashlib
from pathlib import Path
path = Path(filepath)
doc_id = hashlib.md5(path.name.encode()).hexdigest()[:8]
# Load document
text = DocumentLoader.load(filepath)
# Chunk
base_metadata = metadata or {}
base_metadata["doc_id"] = doc_id
base_metadata["source"] = path.name
chunks = self.chunker.chunk_text(text, base_metadata)
# Add to vector store
self.vector_store.add_chunks(chunks)
# Add to BM25 index
if self.use_hybrid:
self.bm25_index.add_chunks(chunks)
self.bm25_index.save()
return len(chunks)
def ingest_text(
self,
text: str,
source_name: str = "text",
metadata: Dict[str, Any] = None
) -> int:
"""Ingest raw text into the RAG system."""
import hashlib
doc_id = hashlib.md5(text[:100].encode()).hexdigest()[:8]
base_metadata = metadata or {}
base_metadata["doc_id"] = doc_id
base_metadata["source"] = source_name
chunks = self.chunker.chunk_text(text, base_metadata)
self.vector_store.add_chunks(chunks)
if self.use_hybrid:
self.bm25_index.add_chunks(chunks)
self.bm25_index.save()
return len(chunks)
def _retrieve(self, query: str, k: int = 5) -> List[SearchResult]:
"""Retrieve relevant chunks."""
if self.use_hybrid:
results = self.hybrid_searcher.search(query, k=k * 2)
else:
results = self.vector_store.search(query, k=k * 2)
# Rerank if enabled
if self.use_reranking and results:
reranked = self.reranker.rerank(query, results, top_k=k)
# Convert back to SearchResult
results = [
SearchResult(
text=r.text,
metadata=r.metadata,
score=r.rerank_score,
chunk_id=r.chunk_id
)
for r in reranked
]
else:
results = results[:k]
return results
def _build_prompt(
self,
query: str,
context: List[SearchResult],
system_prompt: str = None
) -> str:
"""Build the generation prompt."""
if system_prompt is None:
system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Use only the information from the context to answer. If the context doesn't contain
enough information, say so. Be concise and accurate."""
# Build context string
context_parts = []
for i, result in enumerate(context, 1):
source = result.metadata.get("source", "Unknown")
context_parts.append(f"[{i}] (Source: {source})\n{result.text}")
context_str = "\n\n".join(context_parts)
prompt = f"""{system_prompt}
Context:
{context_str}
Question: {query}
Answer based on the context above:"""
return prompt
def query(
self,
question: str,
k: int = 5,
system_prompt: str = None,
temperature: float = 0.1,
) -> RAGResponse:
"""Query the RAG system."""
import time
start_time = time.time()
# Retrieve
results = self._retrieve(question, k=k)
if not results:
return RAGResponse(
answer="I couldn't find any relevant information to answer your question.",
sources=[],
query=question,
latency_ms=(time.time() - start_time) * 1000
)
# Build prompt
prompt = self._build_prompt(question, results, system_prompt)
# Generate
response = ollama.chat(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
options={"temperature": temperature}
)
answer = response["message"]["content"]
# Prepare sources
sources = [
{
"text": r.text[:200] + "..." if len(r.text) > 200 else r.text,
"source": r.metadata.get("source", "Unknown"),
"score": r.score,
}
for r in results
]
latency = (time.time() - start_time) * 1000
return RAGResponse(
answer=answer,
sources=sources,
query=question,
latency_ms=latency
)
def get_stats(self) -> Dict[str, Any]:
"""Get system statistics."""
stats = self.vector_store.get_stats()
stats["hybrid_search"] = self.use_hybrid
stats["reranking"] = self.use_reranking
stats["generation_model"] = self.generation_model
return stats
# Example usage
if __name__ == "__main__":
# Initialize RAG
rag = LocalRAGPipeline(
collection_name="my_docs",
embedding_model="nomic-embed-text",
generation_model="phi3:mini",
use_hybrid_search=True,
use_reranking=True,
)
# Ingest some text
sample_text = """
Machine learning is a subset of artificial intelligence that enables
systems to learn and improve from experience without being explicitly
programmed. It focuses on developing computer programs that can access
data and use it to learn for themselves.
Deep learning is a subset of machine learning that uses neural networks
with many layers. These deep neural networks can learn complex patterns
in large amounts of data, enabling breakthroughs in areas like computer
vision and natural language processing.
"""
rag.ingest_text(sample_text, source_name="ml_overview.txt")
# Query
response = rag.query("What is the relationship between deep learning and machine learning?")
print(f"Answer: {response.answer}")
print(f"\nLatency: {response.latency_ms:.0f}ms")
print(f"\nSources:")
for source in response.sources:
print(f" - {source['source']} (score: {source['score']:.3f})")Part 7: FastAPI Application
REST API
# api.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
import tempfile
import os
from rag_pipeline import LocalRAGPipeline, RAGResponse
app = FastAPI(
title="Local SLM RAG API",
description="Privacy-preserving RAG with local models",
version="1.0.0"
)
# Initialize RAG pipeline
rag = LocalRAGPipeline(
collection_name="documents",
embedding_model="nomic-embed-text",
generation_model="phi3:mini",
use_hybrid_search=True,
use_reranking=True,
)
class QueryRequest(BaseModel):
question: str
k: int = 5
system_prompt: Optional[str] = None
temperature: float = 0.1
class QueryResponse(BaseModel):
answer: str
sources: List[dict]
query: str
latency_ms: float
class TextIngestRequest(BaseModel):
text: str
source_name: str = "text"
metadata: Optional[dict] = None
@app.get("/health")
async def health_check():
"""Health check endpoint."""
stats = rag.get_stats()
return {"status": "healthy", **stats}
@app.post("/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
"""Query the RAG system."""
try:
response = rag.query(
question=request.question,
k=request.k,
system_prompt=request.system_prompt,
temperature=request.temperature,
)
return QueryResponse(
answer=response.answer,
sources=response.sources,
query=response.query,
latency_ms=response.latency_ms,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ingest/text")
async def ingest_text(request: TextIngestRequest):
"""Ingest text into the RAG system."""
try:
num_chunks = rag.ingest_text(
text=request.text,
source_name=request.source_name,
metadata=request.metadata,
)
return {"status": "success", "chunks_created": num_chunks}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ingest/file")
async def ingest_file(file: UploadFile = File(...)):
"""Upload and ingest a document."""
try:
# Save to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file.filename.split('.')[-1]}") as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
# Ingest
num_chunks = rag.ingest_document(
filepath=tmp_path,
metadata={"original_filename": file.filename}
)
# Cleanup
os.unlink(tmp_path)
return {
"status": "success",
"filename": file.filename,
"chunks_created": num_chunks
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/stats")
async def get_stats():
"""Get system statistics."""
return rag.get_stats()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)Usage
# Start the server
python api.py
# Ingest text
curl -X POST http://localhost:8000/ingest/text \
-H "Content-Type: application/json" \
-d '{"text": "Your document content here...", "source_name": "doc1.txt"}'
# Upload file
curl -X POST http://localhost:8000/ingest/file \
-F "file=@document.pdf"
# Query
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "What is machine learning?"}'Performance Optimization
Latency Breakdown
| Component | Typical Latency | Optimization |
|---|---|---|
| Embedding | 10-50ms | Use quantized models |
| Vector Search | 5-20ms | Tune HNSW parameters |
| BM25 Search | 5-15ms | Pre-tokenize |
| Reranking | 50-150ms | Use tiny model |
| Generation | 100-500ms | Smaller model, lower tokens |
Configuration Tips
# Fast configuration (lower quality)
rag = LocalRAGPipeline(
embedding_model="nomic-embed-text",
generation_model="gemma2:2b",
use_hybrid_search=False,
use_reranking=False,
)
# Balanced configuration
rag = LocalRAGPipeline(
embedding_model="nomic-embed-text",
generation_model="phi3:mini",
use_hybrid_search=True,
use_reranking=True,
)
# Quality configuration (slower)
rag = LocalRAGPipeline(
embedding_model="mxbai-embed-large",
generation_model="qwen2.5:3b",
use_hybrid_search=True,
use_reranking=True,
)Exercises
-
Streaming Responses: Add streaming generation for better UX
-
Query Expansion: Implement query expansion using the SLM
-
Caching: Add semantic caching for repeated queries
-
Multi-Collection: Support multiple document collections
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| ChromaDB | Local vector database with HNSW index | Persistent storage, cosine similarity search |
| BM25 | Sparse retrieval using term frequency | Catches exact matches dense search misses |
| Hybrid Search | Combine dense + sparse retrieval | Best of both semantic and keyword matching |
| RRF (Reciprocal Rank Fusion) | Merge ranked lists: 1/(k+rank) | Combines rankings without score calibration |
| Cross-Encoder Reranker | Score (query, doc) pairs directly | More accurate than bi-encoder similarity |
| Chunk Overlap | Repeat text between chunks | Preserves context at chunk boundaries |
| nomic-embed-text | 768-dim Ollama embedding model | Good quality/speed balance for local use |
| Embedding Dimension | Vector size (384-1024 typical) | Trade-off between quality and memory |
| Context Window | Max input tokens for generation | Limits how many chunks you can include |
| Local Privacy | All processing on-device | No data leaves your machine |
Next Steps
- Edge Deployment - Deploy RAG on edge devices
- SLM Agents - Add agentic capabilities
- Production SLM - Scale to production