Multi-Document RAG System
Build a RAG system that handles multiple documents with metadata filtering and source attribution
Multi-Document RAG System
TL;DR
Upgrade from single-document to multi-document RAG with metadata filtering. Learn to organize documents by category, filter searches by metadata (category, tags, date), and provide clear source attribution in answers. Essential for production RAG systems.
| Property | Value |
|---|---|
| Difficulty | Intermediate |
| Time | ~5 hours |
| Code Size | ~350 LOC |
| Prerequisites | Intelligent Document Q&A |
Tech Stack
| Technology | Purpose |
|---|---|
| LangChain | RAG orchestration |
| OpenAI | Embeddings + GPT-4 |
| ChromaDB | Vector storage with metadata |
| FastAPI | REST API |
| PyPDF / Unstructured | Document loaders |
Prerequisites
- Completed Intelligent Document Q&A tutorial
- Python 3.10+ with async understanding
- OpenAI API key (Get one here)
What You'll Learn
- Design document collections with rich metadata schemas
- Implement multi-document ingestion with automatic categorization
- Build cross-document retrieval with source attribution
- Create metadata filters for precise document targeting
- Handle document updates and versioning
- Manage large document collections efficiently
From Single to Multi-Document RAG
In the basic tutorial, we built a system for one document. But real applications need to handle hundreds or thousands of documents. This introduces new challenges:
The Problems with Scale
| Challenge | What Goes Wrong | Example |
|---|---|---|
| Source confusion | LLM mixes facts from different documents | "Revenue was $50M" (but from which year's report?) |
| Irrelevant results | Search returns docs outside user's scope | User asks about 2024, gets 2020 results |
| No filtering | Can't narrow search to specific categories | Legal question returns technical docs |
| Attribution | Can't tell which document answered the question | "According to... some document" |
The Solution: Metadata
The key insight is that metadata enables precise retrieval. By tagging each chunk with structured metadata (category, date, tags), we can:
- Filter - Only search relevant documents
- Attribute - Know exactly where answers came from
- Organize - Group documents logically
- Version - Handle document updates
Without metadata:
┌─────────────────────────────────────────┐
│ Vector Store │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │chunk│ │chunk│ │chunk│ │chunk│ │chunk│ │ ← All chunks mixed together
│ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │
└─────────────────────────────────────────┘
With metadata:
┌─────────────────────────────────────────┐
│ Vector Store │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Financial │ │ Technical │ │
│ │ ┌───┐ ┌───┐ │ │ ┌───┐ ┌───┐ │ │ ← Logical organization
│ │ │ Q1│ │ Q2│ │ │ │API│ │Doc│ │ │
│ │ └───┘ └───┘ │ │ └───┘ └───┘ │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────┘System Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ MULTI-DOCUMENT RAG ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ DOCUMENT INGESTION │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Upload │─►│ Parser │─►│Metadata │─►│ Chunker │─►│ Embed │ │
│ └─────────┘ └─────────┘ └────┬────┘ └─────────┘ └────┬────┘ │
│ │ │ │
│ ════════════════════════════════╪══════════════════════════╪═══════ │
│ │ │ │
│ VECTOR STORAGE ▼ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Collections: │ Legal │ Technical │ Financial │ ... │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Vector Index │ │ Metadata Store │ │ │
│ │ └────────┬────────┘ └────────┬────────┘ │ │
│ └───────────┼────────────────────┼──────────────────────────────────┘ │
│ │ │ │
│ ════════════╪════════════════════╪═════════════════════════════════ │
│ │ │ │
│ QUERY PIPELINE │ │
│ ┌──────────┐ ┌──────────┐ │ │
│ │ Query │──►│ Filter │─────┼──────┐ │
│ └──────────┘ └──────────┘ │ ▼ │
│ │ ┌──────────────┐ │
│ └─►│Multi-Search │ │
│ └──────┬───────┘ │
│ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Response│◄──│ LLM │◄──│ Cite │◄─│ Rank │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘Document Flow
┌─────────────────────────────────────────────────────────────────────────┐
│ DOCUMENT FLOW SEQUENCE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ User FastAPI DocManager VectorStore OpenAI │
│ │ │ │ │ │ │
│ │ ═══════════════════ DOCUMENT UPLOAD ═══════════════════ │
│ │ │ │ │ │ │
│ │─Upload PDF►│ │ │ │ │
│ │ +metadata │─Process───►│ │ │ │
│ │ │ │─Extract───►│ │ │
│ │ │ │ text & │ │ │
│ │ │ │ metadata │ │ │
│ │ │ │─Chunk─────►│ │ │
│ │ │ │─Store──────────────────►│ │
│ │ │◄────Doc ID─│ │ │ │
│ │◄──Success──│ │ │ │ │
│ │ │ │ │ │ │
│ │ ═══════════════════ QUERY WITH FILTERS ═══════════════════ │
│ │ │ │ │ │ │
│ │─Query─────►│ │ │ │ │
│ │ +filters │─Search with filter─────►│ │ │
│ │ │◄───Filtered chunks──────│ │ │
│ │ │─Generate with citations────────────►│ │
│ │ │◄──────Answer────────────────────────│ │
│ │◄─Response──│ │ │ │ │
│ │ +sources │ │ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────┘Metadata Schema Design
A well-designed metadata schema is crucial for multi-document RAG. Here's what we'll use:
# Example metadata schema
{
"document_id": "doc_abc123",
"filename": "annual_report_2024.pdf",
"title": "Annual Financial Report 2024",
"category": "financial",
"subcategory": "annual_reports",
"author": "Finance Team",
"created_date": "2024-01-15",
"version": "1.0",
"tags": ["finance", "annual", "2024"],
"page_count": 45,
"chunk_index": 12,
"total_chunks": 89
}Why These Fields?
| Field Type | Fields | Purpose |
|---|---|---|
| Identity | document_id, filename | Unique identification, deduplication |
| Classification | category, subcategory | Enable category-based filtering |
| Temporal | created_date, version | Filter by time, handle updates |
| Discovery | tags, author | Flexible search, ownership tracking |
| Structure | chunk_index, page_count | Know where content came from |
Schema Design Tip
Design your schema based on how users will search. If users often ask "Show me Q1 2024 financial reports", you need category, date, and tags fields to filter effectively.
Project Structure
multi-doc-rag/
├── src/
│ ├── __init__.py
│ ├── config.py # Settings
│ ├── models.py # Pydantic data models
│ ├── chunking.py # Smart document chunking
│ ├── document_manager.py # Document ingestion
│ ├── vector_store.py # ChromaDB with filtering
│ ├── rag_engine.py # Query + generation
│ └── api.py # REST endpoints
├── tests/
│ └── test_multi_doc.py
├── data/
│ └── documents/
├── .env
└── pyproject.tomlImplementation
Step 1: Project Setup
mkdir multi-doc-rag && cd multi-doc-rag
uv init
uv venv && source .venv/bin/activateInstall dependencies:
uv add langchain langchain-openai langchain-chroma
uv add chromadb python-dotenv
uv add fastapi uvicorn python-multipart
uv add pypdf unstructuredCreate .env file:
OPENAI_API_KEY=sk-your-key-here
CHROMA_PERSIST_DIR=./chroma_db
CHUNK_SIZE=800
CHUNK_OVERLAP=100
TOP_K=6Step 2: Configuration and Models
This step defines our data structures. We use Pydantic models for validation and type safety.
Create src/config.py:
"""Configuration management."""
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
"""Application configuration."""
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
CHROMA_PERSIST_DIR: str = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
CHUNK_SIZE: int = int(os.getenv("CHUNK_SIZE", "800"))
CHUNK_OVERLAP: int = int(os.getenv("CHUNK_OVERLAP", "100"))
TOP_K: int = int(os.getenv("TOP_K", "6"))
EMBEDDING_MODEL: str = "text-embedding-3-small"
LLM_MODEL: str = "gpt-4o-mini"
def validate(self) -> None:
if not self.OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY is required")
config = Config()Create src/models.py:
"""Data models for multi-document RAG."""
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field
from enum import Enum
class DocumentCategory(str, Enum):
"""Document categories for filtering."""
TECHNICAL = "technical"
LEGAL = "legal"
FINANCIAL = "financial"
RESEARCH = "research"
GENERAL = "general"
class DocumentMetadata(BaseModel):
"""Metadata for a document."""
document_id: str
filename: str
title: Optional[str] = None
category: DocumentCategory = DocumentCategory.GENERAL
subcategory: Optional[str] = None
author: Optional[str] = None
created_date: datetime = Field(default_factory=datetime.now)
version: str = "1.0"
tags: List[str] = Field(default_factory=list)
page_count: int = 0
class ChunkMetadata(DocumentMetadata):
"""Metadata for a document chunk."""
chunk_index: int
total_chunks: int
page_number: Optional[int] = None
section: Optional[str] = None
class DocumentFilter(BaseModel):
"""Filters for document retrieval."""
categories: Optional[List[DocumentCategory]] = None
tags: Optional[List[str]] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
document_ids: Optional[List[str]] = None
class SearchResult(BaseModel):
"""A search result with source attribution."""
content: str
score: float
document_id: str
filename: str
category: str
page_number: Optional[int]
chunk_index: int
class QueryResponse(BaseModel):
"""Response to a query with citations."""
answer: str
sources: List[SearchResult]
query: str
filters_applied: Optional[DocumentFilter] = NoneUnderstanding the Data Models
Let's break down the key design decisions:
Why use Enum for categories?
class DocumentCategory(str, Enum):
TECHNICAL = "technical"
LEGAL = "legal"
...Enums provide:
- Type safety - Only valid categories allowed
- Autocomplete - IDEs suggest valid options
- Validation - Invalid categories rejected automatically
- Documentation - Clear list of supported categories
Why ChunkMetadata inherits from DocumentMetadata?
class ChunkMetadata(DocumentMetadata):
chunk_index: int
total_chunks: int
...Every chunk needs all document-level metadata (so we can filter and attribute), plus chunk-specific fields. Inheritance avoids duplication.
The DocumentFilter pattern:
class DocumentFilter(BaseModel):
categories: Optional[List[DocumentCategory]] = None
tags: Optional[List[str]] = None
...All fields are optional - users can filter by any combination. This gives flexibility:
categories=["financial"]- Only financial docstags=["2024", "Q1"]- Only Q1 2024 content- Both together - Financial docs from Q1 2024
Step 3: Smart Document Chunking
The chunker splits documents while preserving metadata and detecting structure.
Create src/chunking.py:
"""Intelligent document chunking with metadata preservation."""
from typing import List, Optional
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from src.config import config
from src.models import DocumentMetadata, ChunkMetadata
class SmartChunker:
"""
Chunks documents while preserving structure and metadata.
"""
def __init__(
self,
chunk_size: int = None,
chunk_overlap: int = None
):
self.chunk_size = chunk_size or config.CHUNK_SIZE
self.chunk_overlap = chunk_overlap or config.CHUNK_OVERLAP
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
length_function=len,
separators=[
"\n\n\n", # Major section breaks
"\n\n", # Paragraphs
"\n", # Lines
". ", # Sentences
", ", # Clauses
" ", # Words
""
]
)
def _detect_section(self, text: str) -> Optional[str]:
"""Detect section header from text."""
# Common section patterns
patterns = [
r'^#{1,3}\s+(.+)$', # Markdown headers
r'^([A-Z][A-Z\s]+)$', # ALL CAPS headers
r'^\d+\.\s+([A-Z].+)$', # Numbered sections
r'^(Chapter|Section)\s+\d+', # Chapter/Section
]
first_line = text.split('\n')[0].strip()
for pattern in patterns:
match = re.match(pattern, first_line, re.MULTILINE)
if match:
return match.group(1) if match.groups() else first_line
return None
def _estimate_page(
self,
chunk_index: int,
total_chunks: int,
total_pages: int
) -> int:
"""Estimate page number based on chunk position."""
if total_pages == 0:
return 1
ratio = chunk_index / max(total_chunks, 1)
return max(1, int(ratio * total_pages) + 1)
def chunk_document(
self,
text: str,
metadata: DocumentMetadata
) -> List[Document]:
"""
Chunk a document with rich metadata.
Returns LangChain Documents with ChunkMetadata.
"""
# Split into chunks
chunks = self.text_splitter.split_text(text)
total_chunks = len(chunks)
documents = []
for i, chunk_text in enumerate(chunks):
# Detect section if possible
section = self._detect_section(chunk_text)
# Estimate page number
page_num = self._estimate_page(
i, total_chunks, metadata.page_count
)
# Create chunk metadata
chunk_meta = ChunkMetadata(
**metadata.model_dump(),
chunk_index=i,
total_chunks=total_chunks,
page_number=page_num,
section=section
)
# Create LangChain Document
doc = Document(
page_content=chunk_text,
metadata=chunk_meta.model_dump()
)
documents.append(doc)
return documentsHow Smart Chunking Works
This chunker does more than basic splitting. Let's understand the extra features:
1. Section Detection
def _detect_section(self, text: str) -> Optional[str]:
patterns = [
r'^#{1,3}\s+(.+)$', # Markdown headers
r'^([A-Z][A-Z\s]+)$', # ALL CAPS headers
...
]When a chunk starts with a header (like "## Revenue Analysis"), we capture that as the section field. This helps users understand the context:
"According to the Revenue Analysis section..." vs "According to..."2. Page Estimation
def _estimate_page(self, chunk_index, total_chunks, total_pages):
ratio = chunk_index / max(total_chunks, 1)
return max(1, int(ratio * total_pages) + 1)If a 50-page document creates 100 chunks, chunk 50 is approximately on page 25. This estimation helps with citations even when exact page tracking isn't available.
3. Metadata Propagation
chunk_meta = ChunkMetadata(
**metadata.model_dump(), # All document metadata
chunk_index=i, # Plus chunk-specific data
total_chunks=total_chunks,
...
)Every chunk carries the full document metadata. This is crucial - when we retrieve a chunk, we immediately know which document it came from, its category, tags, etc.
Step 4: Document Manager
The document manager handles file loading, ID generation, and automatic categorization.
Create src/document_manager.py:
"""Document ingestion and management."""
import hashlib
from pathlib import Path
from typing import List, Optional, Dict
from datetime import datetime
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
UnstructuredMarkdownLoader
)
from src.models import DocumentMetadata, DocumentCategory
from src.chunking import SmartChunker
class DocumentManager:
"""Manages document ingestion and metadata."""
SUPPORTED_EXTENSIONS = {'.pdf', '.txt', '.md'}
def __init__(self):
self.chunker = SmartChunker()
self._documents: Dict[str, DocumentMetadata] = {}
def _generate_doc_id(self, content: str, filename: str) -> str:
"""Generate unique document ID from content hash."""
hash_input = f"{filename}:{content[:1000]}"
return hashlib.sha256(hash_input.encode()).hexdigest()[:12]
def _detect_category(self, filename: str, text: str) -> DocumentCategory:
"""Auto-detect document category from content."""
filename_lower = filename.lower()
text_lower = text[:5000].lower()
# Simple keyword-based detection
if any(kw in filename_lower or kw in text_lower
for kw in ['legal', 'contract', 'agreement', 'terms']):
return DocumentCategory.LEGAL
if any(kw in filename_lower or kw in text_lower
for kw in ['financial', 'revenue', 'profit', 'budget']):
return DocumentCategory.FINANCIAL
if any(kw in filename_lower or kw in text_lower
for kw in ['technical', 'api', 'documentation', 'code']):
return DocumentCategory.TECHNICAL
if any(kw in filename_lower or kw in text_lower
for kw in ['research', 'study', 'analysis', 'paper']):
return DocumentCategory.RESEARCH
return DocumentCategory.GENERAL
def _load_file(self, file_path: Path) -> tuple[str, int]:
"""Load file content and return text with page count."""
suffix = file_path.suffix.lower()
if suffix == '.pdf':
loader = PyPDFLoader(str(file_path))
pages = loader.load()
text = "\n\n".join(page.page_content for page in pages)
return text, len(pages)
elif suffix == '.md':
loader = UnstructuredMarkdownLoader(str(file_path))
docs = loader.load()
text = "\n\n".join(doc.page_content for doc in docs)
return text, 1
else: # .txt
loader = TextLoader(str(file_path))
docs = loader.load()
text = docs[0].page_content if docs else ""
return text, 1
def ingest_document(
self,
file_path: str | Path,
title: Optional[str] = None,
category: Optional[DocumentCategory] = None,
tags: Optional[List[str]] = None,
author: Optional[str] = None
) -> tuple[str, List]:
"""
Ingest a document and return its ID and chunks.
Args:
file_path: Path to the document
title: Optional document title
category: Optional category (auto-detected if not provided)
tags: Optional tags for filtering
author: Optional author name
Returns:
Tuple of (document_id, list of chunked Documents)
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if file_path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported file type: {file_path.suffix}. "
f"Supported: {self.SUPPORTED_EXTENSIONS}"
)
# Load document
text, page_count = self._load_file(file_path)
if not text.strip():
raise ValueError(f"Document is empty: {file_path}")
# Generate document ID
doc_id = self._generate_doc_id(text, file_path.name)
# Auto-detect category if not provided
if category is None:
category = self._detect_category(file_path.name, text)
# Create metadata
metadata = DocumentMetadata(
document_id=doc_id,
filename=file_path.name,
title=title or file_path.stem.replace('_', ' ').title(),
category=category,
author=author,
tags=tags or [],
page_count=page_count,
created_date=datetime.now()
)
# Store document reference
self._documents[doc_id] = metadata
# Chunk document
chunks = self.chunker.chunk_document(text, metadata)
return doc_id, chunks
def get_document(self, doc_id: str) -> Optional[DocumentMetadata]:
"""Get document metadata by ID."""
return self._documents.get(doc_id)
def list_documents(self) -> List[DocumentMetadata]:
"""List all ingested documents."""
return list(self._documents.values())
def delete_document(self, doc_id: str) -> bool:
"""Remove document from manager."""
if doc_id in self._documents:
del self._documents[doc_id]
return True
return FalseKey Document Manager Features
1. Content-Based Document IDs
def _generate_doc_id(self, content: str, filename: str) -> str:
hash_input = f"{filename}:{content[:1000]}"
return hashlib.sha256(hash_input.encode()).hexdigest()[:12]Why hash-based IDs?
- Deterministic - Same file always gets same ID (enables deduplication)
- Collision-resistant - Different files get different IDs
- Short - 12 characters is enough for uniqueness, easy to display
2. Automatic Category Detection
def _detect_category(self, filename: str, text: str) -> DocumentCategory:
if any(kw in text_lower for kw in ['financial', 'revenue', 'profit']):
return DocumentCategory.FINANCIAL
...This is a simple keyword-based approach. For production:
- Use ML classification (fine-tuned BERT)
- Or always require explicit category on upload
- Or use LLM to classify
3. Multiple File Format Support
def _load_file(self, file_path: Path) -> tuple[str, int]:
if suffix == '.pdf':
loader = PyPDFLoader(str(file_path))
elif suffix == '.md':
loader = UnstructuredMarkdownLoader(str(file_path))
else: # .txt
loader = TextLoader(str(file_path))LangChain provides loaders for many formats. Each returns extracted text, and we track page count for PDFs.
Step 5: Vector Store with Metadata Filtering
This is where the multi-document magic happens - filtering by metadata during retrieval.
Create src/vector_store.py:
"""Vector store with metadata filtering support."""
from pathlib import Path
from typing import List, Optional, Dict, Any
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from src.config import config
from src.models import DocumentFilter, SearchResult
class MultiDocVectorStore:
"""Vector store optimized for multi-document RAG."""
def __init__(self, collection_name: str = "multi_doc_rag"):
config.validate()
self.embeddings = OpenAIEmbeddings(
model=config.EMBEDDING_MODEL,
openai_api_key=config.OPENAI_API_KEY
)
persist_dir = Path(config.CHROMA_PERSIST_DIR)
persist_dir.mkdir(parents=True, exist_ok=True)
self.vectorstore = Chroma(
collection_name=collection_name,
persist_directory=str(persist_dir),
embedding_function=self.embeddings
)
def add_documents(self, documents: List[Document]) -> int:
"""Add documents to the vector store."""
if not documents:
return 0
self.vectorstore.add_documents(documents)
return len(documents)
def _build_filter(self, doc_filter: DocumentFilter) -> Optional[Dict[str, Any]]:
"""Build ChromaDB filter from DocumentFilter."""
conditions = []
if doc_filter.categories:
categories = [c.value for c in doc_filter.categories]
if len(categories) == 1:
conditions.append({"category": categories[0]})
else:
conditions.append({"category": {"$in": categories}})
if doc_filter.document_ids:
if len(doc_filter.document_ids) == 1:
conditions.append({"document_id": doc_filter.document_ids[0]})
else:
conditions.append({"document_id": {"$in": doc_filter.document_ids}})
if doc_filter.tags:
# Tags stored as comma-separated string
for tag in doc_filter.tags:
conditions.append({"tags": {"$contains": tag}})
if not conditions:
return None
if len(conditions) == 1:
return conditions[0]
return {"$and": conditions}
def search(
self,
query: str,
doc_filter: Optional[DocumentFilter] = None,
top_k: int = None
) -> List[SearchResult]:
"""
Search with optional metadata filtering.
Args:
query: Search query
doc_filter: Optional filters to apply
top_k: Number of results to return
Returns:
List of SearchResult with source attribution
"""
top_k = top_k or config.TOP_K
# Build filter
where_filter = None
if doc_filter:
where_filter = self._build_filter(doc_filter)
# Search
if where_filter:
results = self.vectorstore.similarity_search_with_relevance_scores(
query,
k=top_k,
filter=where_filter
)
else:
results = self.vectorstore.similarity_search_with_relevance_scores(
query,
k=top_k
)
# Convert to SearchResult
search_results = []
for doc, score in results:
meta = doc.metadata
search_results.append(SearchResult(
content=doc.page_content,
score=score,
document_id=meta.get("document_id", "unknown"),
filename=meta.get("filename", "unknown"),
category=meta.get("category", "general"),
page_number=meta.get("page_number"),
chunk_index=meta.get("chunk_index", 0)
))
return search_results
def delete_document(self, document_id: str) -> bool:
"""Delete all chunks for a document."""
try:
self.vectorstore.delete(
filter={"document_id": document_id}
)
return True
except Exception:
return False
def get_document_ids(self) -> List[str]:
"""Get all unique document IDs in the store."""
results = self.vectorstore.get()
if not results or not results.get('metadatas'):
return []
doc_ids = set()
for meta in results['metadatas']:
if meta and 'document_id' in meta:
doc_ids.add(meta['document_id'])
return list(doc_ids)
def clear(self) -> None:
"""Clear all documents from the store."""
self.vectorstore.delete_collection()
self.vectorstore = Chroma(
collection_name="multi_doc_rag",
persist_directory=str(config.CHROMA_PERSIST_DIR),
embedding_function=self.embeddings
)Understanding Metadata Filtering
The key method is _build_filter(). It translates our DocumentFilter into ChromaDB's filter syntax:
ChromaDB Filter Syntax:
# Single value match
{"category": "financial"}
# Multiple value match (OR)
{"category": {"$in": ["financial", "legal"]}}
# Contains (for arrays/strings)
{"tags": {"$contains": "2024"}}
# Combine with AND
{"$and": [
{"category": "financial"},
{"tags": {"$contains": "Q1"}}
]}How filtering improves retrieval:
Without filter:
Query: "What was revenue?"
Results:
1. Financial report 2024 (relevant) ✓
2. Technical docs (irrelevant) ✗
3. Old financial report 2019 (outdated) ✗
4. Legal contract (irrelevant) ✗
With filter (category=financial, tags=2024):
Query: "What was revenue?"
Results:
1. Financial report Q1 2024 ✓
2. Financial report Q2 2024 ✓
3. Financial summary 2024 ✓
4. Annual report 2024 ✓All results are now relevant to the user's actual need.
Step 6: RAG Engine with Citations
The RAG engine ties everything together and ensures proper source attribution.
Create src/rag_engine.py:
"""Multi-document RAG engine with source attribution."""
from typing import Optional, List
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from src.config import config
from src.models import (
DocumentFilter,
DocumentCategory,
QueryResponse,
SearchResult
)
from src.document_manager import DocumentManager
from src.vector_store import MultiDocVectorStore
class MultiDocRAGEngine:
"""RAG engine for multi-document question answering."""
def __init__(self):
config.validate()
self.doc_manager = DocumentManager()
self.vector_store = MultiDocVectorStore()
self.llm = ChatOpenAI(
model=config.LLM_MODEL,
temperature=0.1,
openai_api_key=config.OPENAI_API_KEY
)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant that answers questions based on multiple documents.
IMPORTANT INSTRUCTIONS:
1. Answer ONLY based on the provided context
2. ALWAYS cite your sources using [Source N] format
3. If information comes from multiple documents, cite all relevant sources
4. If the answer is not in the context, say "I cannot find this information in the provided documents"
5. Be precise and thorough
Each source is labeled with its document name and page number for reference."""),
("human", """Context from multiple documents:
{context}
---
Question: {question}
Provide a comprehensive answer with citations:""")
])
def _format_context(self, results: List[SearchResult]) -> str:
"""Format search results as numbered context."""
formatted = []
for i, result in enumerate(results, 1):
source_info = f"[Source {i}] {result.filename}"
if result.page_number:
source_info += f", Page {result.page_number}"
source_info += f" (Category: {result.category})"
formatted.append(f"{source_info}\n{result.content}")
return "\n\n---\n\n".join(formatted)
def ingest_document(
self,
file_path: str,
title: Optional[str] = None,
category: Optional[DocumentCategory] = None,
tags: Optional[List[str]] = None,
author: Optional[str] = None
) -> dict:
"""
Ingest a document into the RAG system.
Returns:
Dict with document_id, filename, chunks_created
"""
doc_id, chunks = self.doc_manager.ingest_document(
file_path=file_path,
title=title,
category=category,
tags=tags,
author=author
)
# Add to vector store
chunks_added = self.vector_store.add_documents(chunks)
doc_meta = self.doc_manager.get_document(doc_id)
return {
"document_id": doc_id,
"filename": doc_meta.filename if doc_meta else "unknown",
"title": doc_meta.title if doc_meta else None,
"category": doc_meta.category.value if doc_meta else "general",
"chunks_created": chunks_added
}
def query(
self,
question: str,
categories: Optional[List[DocumentCategory]] = None,
tags: Optional[List[str]] = None,
document_ids: Optional[List[str]] = None,
top_k: Optional[int] = None
) -> QueryResponse:
"""
Query across multiple documents with optional filtering.
Args:
question: The question to answer
categories: Filter by document categories
tags: Filter by tags
document_ids: Filter to specific documents
top_k: Number of chunks to retrieve
Returns:
QueryResponse with answer and cited sources
"""
# Build filter
doc_filter = None
if categories or tags or document_ids:
doc_filter = DocumentFilter(
categories=categories,
tags=tags,
document_ids=document_ids
)
# Search
results = self.vector_store.search(
query=question,
doc_filter=doc_filter,
top_k=top_k
)
if not results:
return QueryResponse(
answer="No relevant documents found for your query.",
sources=[],
query=question,
filters_applied=doc_filter
)
# Format context
context = self._format_context(results)
# Generate answer
chain = self.prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"context": context,
"question": question
})
return QueryResponse(
answer=answer,
sources=results,
query=question,
filters_applied=doc_filter
)
def list_documents(self) -> List[dict]:
"""List all documents in the system."""
return [
{
"document_id": doc.document_id,
"filename": doc.filename,
"title": doc.title,
"category": doc.category.value,
"tags": doc.tags,
"created_date": doc.created_date.isoformat()
}
for doc in self.doc_manager.list_documents()
]
def delete_document(self, document_id: str) -> bool:
"""Delete a document from the system."""
vs_deleted = self.vector_store.delete_document(document_id)
dm_deleted = self.doc_manager.delete_document(document_id)
return vs_deleted or dm_deleted
def clear_all(self) -> None:
"""Clear all documents from the system."""
self.vector_store.clear()
for doc_id in list(self.doc_manager._documents.keys()):
self.doc_manager.delete_document(doc_id)How Source Attribution Works
The key to good citations is the context formatting:
def _format_context(self, results: List[SearchResult]) -> str:
formatted = []
for i, result in enumerate(results, 1):
source_info = f"[Source {i}] {result.filename}"
if result.page_number:
source_info += f", Page {result.page_number}"
...This produces context like:
[Source 1] financial_report_2024.pdf, Page 12 (Category: financial)
Revenue for Q1 2024 was $50 million, representing a 20% increase...
---
[Source 2] quarterly_summary.pdf, Page 3 (Category: financial)
The Q1 results exceeded expectations with strong performance in...The LLM then uses these labels in its answer:
"Revenue for Q1 2024 was $50 million [Source 1], which exceeded
expectations [Source 2]."Citation Quality Tip
The prompt instruction "ALWAYS cite your sources using [Source N] format" is critical. Without explicit instructions, LLMs often forget to cite or use inconsistent formats.
Step 7: FastAPI Application
Create src/api.py:
"""FastAPI application for multi-document RAG."""
import tempfile
from pathlib import Path
from typing import List, Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from src.models import DocumentCategory, QueryResponse
from src.rag_engine import MultiDocRAGEngine
app = FastAPI(
title="Multi-Document RAG API",
description="RAG system for querying across multiple documents with filtering",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize engine
engine = MultiDocRAGEngine()
# Request/Response Models
class UploadResponse(BaseModel):
document_id: str
filename: str
title: Optional[str]
category: str
chunks_created: int
class QueryRequest(BaseModel):
question: str = Field(..., min_length=1)
categories: Optional[List[DocumentCategory]] = None
tags: Optional[List[str]] = None
document_ids: Optional[List[str]] = None
top_k: int = Field(default=6, ge=1, le=20)
class DocumentInfo(BaseModel):
document_id: str
filename: str
title: Optional[str]
category: str
tags: List[str]
created_date: str
# Endpoints
@app.get("/")
async def root():
"""Health check."""
return {
"status": "healthy",
"service": "Multi-Document RAG API",
"features": [
"Multi-document ingestion",
"Category filtering",
"Tag-based search",
"Source attribution"
]
}
@app.post("/documents/upload", response_model=UploadResponse)
async def upload_document(
file: UploadFile = File(...),
title: Optional[str] = None,
category: Optional[DocumentCategory] = None,
tags: str = "", # Comma-separated
author: Optional[str] = None
):
"""Upload and ingest a document."""
suffix = Path(file.filename).suffix.lower()
if suffix not in {'.pdf', '.txt', '.md'}:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {suffix}. Supported: .pdf, .txt, .md"
)
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
result = engine.ingest_document(
file_path=tmp_path,
title=title,
category=category,
tags=tag_list,
author=author
)
Path(tmp_path).unlink()
return UploadResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query", response_model=QueryResponse)
async def query_documents(request: QueryRequest):
"""Query across documents with optional filters."""
try:
response = engine.query(
question=request.question,
categories=request.categories,
tags=request.tags,
document_ids=request.document_ids,
top_k=request.top_k
)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/documents", response_model=List[DocumentInfo])
async def list_documents():
"""List all documents in the system."""
docs = engine.list_documents()
return [DocumentInfo(**doc) for doc in docs]
@app.get("/documents/{document_id}")
async def get_document(document_id: str):
"""Get document details by ID."""
doc = engine.doc_manager.get_document(document_id)
if not doc:
raise HTTPException(status_code=404, detail="Document not found")
return {
"document_id": doc.document_id,
"filename": doc.filename,
"title": doc.title,
"category": doc.category.value,
"tags": doc.tags,
"author": doc.author,
"page_count": doc.page_count,
"created_date": doc.created_date.isoformat()
}
@app.delete("/documents/{document_id}")
async def delete_document(document_id: str):
"""Delete a document from the system."""
success = engine.delete_document(document_id)
if not success:
raise HTTPException(status_code=404, detail="Document not found")
return {"message": f"Document {document_id} deleted successfully"}
@app.delete("/documents")
async def clear_all_documents():
"""Clear all documents from the system."""
engine.clear_all()
return {"message": "All documents cleared"}
@app.get("/categories")
async def list_categories():
"""List available document categories."""
return {
"categories": [
{"value": c.value, "name": c.name}
for c in DocumentCategory
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Testing
Create tests/test_multi_doc.py:
"""Tests for multi-document RAG system."""
import pytest
from datetime import datetime
from src.models import (
DocumentMetadata,
DocumentCategory,
DocumentFilter,
ChunkMetadata
)
from src.chunking import SmartChunker
class TestDocumentModels:
"""Tests for data models."""
def test_document_metadata_defaults(self):
"""Test default values for metadata."""
meta = DocumentMetadata(
document_id="test123",
filename="test.pdf"
)
assert meta.category == DocumentCategory.GENERAL
assert meta.version == "1.0"
assert meta.tags == []
assert isinstance(meta.created_date, datetime)
def test_chunk_metadata_inherits(self):
"""Test chunk metadata includes parent fields."""
meta = ChunkMetadata(
document_id="test123",
filename="test.pdf",
chunk_index=5,
total_chunks=10,
page_number=2
)
assert meta.document_id == "test123"
assert meta.chunk_index == 5
assert meta.page_number == 2
class TestSmartChunker:
"""Tests for document chunking."""
def test_basic_chunking(self):
"""Test basic document chunking."""
chunker = SmartChunker(chunk_size=100, chunk_overlap=20)
meta = DocumentMetadata(
document_id="test123",
filename="test.pdf",
page_count=5
)
text = "A" * 250 # Will create multiple chunks
chunks = chunker.chunk_document(text, meta)
assert len(chunks) > 1
assert all(c.metadata["document_id"] == "test123" for c in chunks)
assert chunks[0].metadata["chunk_index"] == 0
def test_metadata_preservation(self):
"""Test that metadata is preserved in chunks."""
chunker = SmartChunker()
meta = DocumentMetadata(
document_id="doc456",
filename="report.pdf",
category=DocumentCategory.FINANCIAL,
tags=["q1", "2024"],
page_count=10
)
text = "Financial report content. " * 100
chunks = chunker.chunk_document(text, meta)
for chunk in chunks:
assert chunk.metadata["category"] == "financial"
assert chunk.metadata["total_chunks"] == len(chunks)
# Run with: pytest tests/test_multi_doc.py -vRunning the Application
Start the API
python -m uvicorn src.api:app --reloadUpload Documents
# Upload a financial PDF
curl -X POST "http://localhost:8000/documents/upload" \
-F "file=@financial_report.pdf" \
-F "title=Q1 Financial Report" \
-F "category=financial" \
-F "tags=q1,2024,revenue"
# Upload a technical document
curl -X POST "http://localhost:8000/documents/upload" \
-F "file=@technical_spec.pdf" \
-F "category=technical" \
-F "tags=api,documentation"Query with Filters
# Query all documents
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{"question": "What was the revenue in Q1?"}'
# Query only financial documents
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{
"question": "What was the revenue in Q1?",
"categories": ["financial"]
}'
# Query with multiple filters
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{
"question": "Explain the API authentication",
"categories": ["technical"],
"tags": ["api"]
}'Visit http://localhost:8000/docs for the interactive Swagger UI.
Key Concepts Recap
| Concept | What It Does | Why It Matters |
|---|---|---|
| Metadata Schema | Structures document information | Enables filtering and attribution |
| Category Filtering | Narrows search to document types | Improves relevance dramatically |
| Content-Based IDs | Hash-based unique identifiers | Enables deduplication |
| Source Attribution | Cites specific documents in answers | Builds trust, enables verification |
| Smart Chunking | Preserves structure while splitting | Better context in retrieval |
Extensions
| Level | Ideas |
|---|---|
| Easy | Add document preview, implement search history, add export |
| Medium | Hierarchical categories, document relationships, batch upload |
| Advanced | Auto-categorization with ML, cross-document entity linking, version diff |
Resources
Summary
You've built a production-ready multi-document RAG system that:
- Ingests documents with rich metadata schemas
- Provides precise filtering by category, tags, and document ID
- Attributes answers to specific source documents
- Manages document lifecycle (add, update, delete)
- Scales efficiently with metadata-based retrieval
Key Takeaways:
- Metadata is crucial - Good schema design enables powerful filtering
- Source attribution builds trust - Users need to verify AI answers
- Filters improve relevance - Narrow scope = better results
- Lifecycle management matters - Handle updates and deletions
Next: RAG with Reranking - Improve retrieval quality with cross-encoder reranking