Long Context RAG
Build a RAG system that leverages 128K+ context windows for full document understanding without chunking
Long Context RAG
| Property | Value |
|---|---|
| Difficulty | Advanced |
| Time | ~4 days |
| Code Size | ~600 LOC |
| Prerequisites | Multi-Document RAG |
TL;DR
Modern LLMs support 128K-200K token context windows. Instead of always chunking, analyze document size and use full documents when possible, preserving cross-references and narrative flow that chunking destroys.
Tech Stack
| Technology | Purpose |
|---|---|
| LangChain | RAG orchestration |
| tiktoken | Token counting |
| ChromaDB | Vector storage for large docs |
| GPT-4-turbo / Claude | 128K+ context models |
| FastAPI | REST API |
Prerequisites
- Python 3.10+
- OpenAI API key (GPT-4-turbo for 128K context)
- Or Anthropic API key (Claude for 200K context)
pip install langchain langchain-openai langchain-anthropic tiktoken chromadb fastapi uvicornWhat You'll Learn
- When to use full documents vs. chunked retrieval
- Token counting and context window management
- Map-reduce strategies for documents exceeding context limits
- Hybrid approaches combining full docs with semantic search
The Problem: Chunking Loses Context
Traditional RAG chunks documents into small pieces, losing:
| Lost Element | Impact |
|---|---|
| Cross-references | "As mentioned in Section 3..." becomes meaningless |
| Document structure | Introduction, methodology, conclusion relationships lost |
| Cumulative context | Building arguments across paragraphs broken |
| Author's narrative | The "story" of the document fragmented |
┌─────────────────────────────────────────────────────────────────────────────┐
│ TRADITIONAL RAG ❌ │
│ │
│ Document ───┬───► Chunk 1 ───┐ │
│ ├───► Chunk 2 ───┼───► Vector Search ───► LLM with fragments │
│ └───► Chunk 3 ───┘ │ │
│ ▼ │
│ Problem: "As mentioned in Section 3..." → meaningless ⚠️ │
│ Cross-references, structure, narrative all lost │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ LONG CONTEXT RAG ✅ │
│ │
│ Document ───► Analyze Size ───┬───► [Fits: <100K] ───► Full Document │
│ │ │ │
│ └───► [Too Large] ───► Map-Reduce │
│ │ │
│ ▼ │
│ LLM with complete context │
│ │ │
│ ▼ │
│ Benefit: Cross-references work! Structure preserved! ✓ │
└─────────────────────────────────────────────────────────────────────────────┘Solution: Adaptive Context Strategy
Long Context RAG dynamically chooses the best strategy:
| Document Size | Strategy | Benefit |
|---|---|---|
| <100K tokens | Full document | Complete context |
| 100K-500K tokens | Smart selection | Most relevant sections |
| >500K tokens | Map-reduce | Parallel processing |
Project Structure
long-context-rag/
├── config.py # Configuration
├── token_manager.py # Token counting and limits
├── document_analyzer.py # Document size analysis
├── context_builder.py # Context window management
├── retrieval_strategy.py # Adaptive retrieval
├── map_reduce.py # Large document handling
├── rag_pipeline.py # Main orchestration
├── app.py # FastAPI application
└── requirements.txtStep 1: Configuration
# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
from enum import Enum
from typing import Optional
class ModelProvider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
class ContextStrategy(str, Enum):
FULL_DOCUMENT = "full_document"
SMART_SELECTION = "smart_selection"
MAP_REDUCE = "map_reduce"
HYBRID = "hybrid"
class Settings(BaseSettings):
# API Keys
openai_api_key: Optional[str] = None
anthropic_api_key: Optional[str] = None
# Model Configuration
provider: ModelProvider = ModelProvider.OPENAI
embedding_model: str = "text-embedding-3-small"
# OpenAI models
openai_model: str = "gpt-4-turbo" # 128K context
openai_context_limit: int = 128000
# Anthropic models
anthropic_model: str = "claude-3-5-sonnet-20241022" # 200K context
anthropic_context_limit: int = 200000
# Context Management
max_output_tokens: int = 4096
safety_margin: float = 0.9 # Use 90% of context to be safe
chunk_size: int = 2000 # For fallback chunking
# Vector Store
collection_name: str = "long_context_rag"
persist_directory: str = "./chroma_db"
class Config:
env_file = ".env"
@property
def effective_context_limit(self) -> int:
"""Get context limit for current provider."""
if self.provider == ModelProvider.OPENAI:
return int(self.openai_context_limit * self.safety_margin)
return int(self.anthropic_context_limit * self.safety_margin)
@property
def llm_model(self) -> str:
"""Get model name for current provider."""
if self.provider == ModelProvider.OPENAI:
return self.openai_model
return self.anthropic_model
@lru_cache
def get_settings() -> Settings:
return Settings()Step 2: Token Manager
Accurate token counting is critical for context management:
# token_manager.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import tiktoken
from config import get_settings, ModelProvider
@dataclass
class TokenCount:
"""Token count with breakdown."""
total: int
prompt_tokens: int
estimated_completion: int
available_for_context: int
def fits_in_context(self, context_limit: int) -> bool:
return self.total <= context_limit
@property
def utilization(self) -> float:
settings = get_settings()
return self.total / settings.effective_context_limit
class TokenManager:
"""
Manage token counting and context allocation.
Supports both OpenAI (tiktoken) and Anthropic token counting.
"""
def __init__(self):
self.settings = get_settings()
self._encoder = self._get_encoder()
def _get_encoder(self):
"""Get appropriate tokenizer."""
if self.settings.provider == ModelProvider.OPENAI:
try:
return tiktoken.encoding_for_model(self.settings.openai_model)
except KeyError:
return tiktoken.get_encoding("cl100k_base")
else:
# Anthropic uses similar tokenization to cl100k
return tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self._encoder.encode(text))
def count_messages(self, messages: List[Dict[str, str]]) -> int:
"""Count tokens in message list (chat format)."""
total = 0
for msg in messages:
# Approximate message overhead
total += 4 # Role tokens
total += self.count_tokens(msg.get("content", ""))
return total + 2 # Conversation overhead
def estimate_completion_tokens(self) -> int:
"""Estimate tokens for response."""
return self.settings.max_output_tokens
def calculate_available_context(
self,
system_prompt: str,
user_query: str
) -> TokenCount:
"""
Calculate available tokens for document context.
Args:
system_prompt: System message
user_query: User's question
Returns:
TokenCount with breakdown
"""
system_tokens = self.count_tokens(system_prompt)
query_tokens = self.count_tokens(user_query)
completion_tokens = self.estimate_completion_tokens()
prompt_tokens = system_tokens + query_tokens
total_reserved = prompt_tokens + completion_tokens
available = self.settings.effective_context_limit - total_reserved
return TokenCount(
total=total_reserved,
prompt_tokens=prompt_tokens,
estimated_completion=completion_tokens,
available_for_context=max(0, available)
)
def truncate_to_limit(
self,
text: str,
max_tokens: int,
truncation_strategy: str = "end"
) -> str:
"""
Truncate text to fit token limit.
Args:
text: Text to truncate
max_tokens: Maximum tokens allowed
truncation_strategy: "end", "start", or "middle"
Returns:
Truncated text
"""
tokens = self._encoder.encode(text)
if len(tokens) <= max_tokens:
return text
if truncation_strategy == "end":
truncated = tokens[:max_tokens]
elif truncation_strategy == "start":
truncated = tokens[-max_tokens:]
else: # middle - keep start and end
half = max_tokens // 2
truncated = tokens[:half] + tokens[-half:]
return self._encoder.decode(truncated)
def split_by_tokens(
self,
text: str,
chunk_size: int,
overlap: int = 100
) -> List[str]:
"""Split text into chunks by token count."""
tokens = self._encoder.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunks.append(self._encoder.decode(chunk_tokens))
start = end - overlap
return chunksWhat's Happening Here?
The TokenManager is your safety net against context overflow. Let's trace through how it manages token budgets:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Token Budget Calculation Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Total Context Window (e.g., GPT-4-turbo) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ 128,000 tokens ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ Step 1: Apply Safety Margin (90%) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ 128,000 × 0.9 = 115,200 effective tokens ││
│ │ Why? Tokenization edge cases, Unicode handling, safety buffer ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ Step 2: Reserve Space for Output (4,096 tokens) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 115,200 - 4,096 = 111,104 tokens for input │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 3: Subtract System Prompt + Query │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ System prompt: ~500 tokens │ │
│ │ User query: ~100 tokens │ │
│ │ Message overhead: ~50 tokens │ │
│ │ │ │
│ │ Available for document: 111,104 - 650 = 110,454 tokens │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ That's approximately 80,000-90,000 words of document content! │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Token Counting Deep Dive:
| Component | Tokens | Notes |
|---|---|---|
Message role (system, user) | 4 each | Fixed overhead per message |
| Conversation start/end | 2 | Fixed overhead |
| 1 English word | ~1.3 tokens avg | Varies by word complexity |
| 1 code character | ~0.25 tokens avg | Code is more token-efficient |
| 1 Chinese/Japanese character | ~1.5 tokens avg | Non-Latin scripts use more tokens |
Understanding Truncation Strategies:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Truncation Strategy Comparison │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Original Document (20,000 tokens) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ [Intro] ████████████████████████████████████████████████████ [Conclusion]│
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ Need to fit in 10,000 tokens. Three options: │
│ │
│ truncation="end" (Default) │
│ ┌───────────────────────────────────────────┐ │
│ │ [Intro] █████████████████████████████... │ ◄── Keeps beginning │
│ └───────────────────────────────────────────┘ Good for: reports, papers│
│ │
│ truncation="start" │
│ ┌───────────────────────────────────────────┐ │
│ │ ...█████████████████████████ [Conclusion] │ ◄── Keeps ending │
│ └───────────────────────────────────────────┘ Good for: chat logs │
│ │
│ truncation="middle" │
│ ┌───────────────────────────────────────────┐ │
│ │ [Intro] █████...█████████████ [Conclusion] │ ◄── Keeps both ends │
│ └───────────────────────────────────────────┘ Good for: structured docs│
│ │
└─────────────────────────────────────────────────────────────────────────────┘★ Insight ─────────────────────────────────────
Token counting must account for message overhead (role tokens, separators) and reserve space for output. A common mistake is using 100% of context for input, leaving no room for the response. We use a 90% safety margin.
─────────────────────────────────────────────────
Step 3: Document Analyzer
Analyze documents to determine the optimal strategy:
# document_analyzer.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from pathlib import Path
from enum import Enum
from token_manager import TokenManager
from config import get_settings, ContextStrategy
class DocumentComplexity(str, Enum):
SIMPLE = "simple"
MODERATE = "moderate"
COMPLEX = "complex"
@dataclass
class DocumentAnalysis:
"""Analysis results for a document."""
source: str
token_count: int
char_count: int
recommended_strategy: ContextStrategy
complexity: DocumentComplexity
fits_full_context: bool
sections_detected: int
estimated_chunks: int
def to_dict(self) -> Dict[str, Any]:
return {
"source": self.source,
"token_count": self.token_count,
"recommended_strategy": self.recommended_strategy.value,
"fits_full_context": self.fits_full_context,
"complexity": self.complexity.value
}
class DocumentAnalyzer:
"""Analyze documents to determine optimal RAG strategy."""
def __init__(self):
self.settings = get_settings()
self.token_manager = TokenManager()
def analyze(
self,
content: str,
source: str = "unknown"
) -> DocumentAnalysis:
"""
Analyze document and recommend strategy.
Args:
content: Document text
source: Source identifier
Returns:
DocumentAnalysis with recommendations
"""
token_count = self.token_manager.count_tokens(content)
char_count = len(content)
# Determine if fits in context
available = self.token_manager.calculate_available_context(
system_prompt="You are a helpful assistant.", # Placeholder
user_query="Answer the question." # Placeholder
)
fits_full = token_count <= available.available_for_context
# Detect sections (simple heuristic)
sections = self._count_sections(content)
# Determine complexity
complexity = self._assess_complexity(content, token_count, sections)
# Recommend strategy
strategy = self._recommend_strategy(
token_count,
available.available_for_context,
complexity
)
# Estimate chunks if needed
estimated_chunks = (
1 if fits_full
else (token_count // self.settings.chunk_size) + 1
)
return DocumentAnalysis(
source=source,
token_count=token_count,
char_count=char_count,
recommended_strategy=strategy,
complexity=complexity,
fits_full_context=fits_full,
sections_detected=sections,
estimated_chunks=estimated_chunks
)
def _count_sections(self, content: str) -> int:
"""Count document sections (headers, breaks)."""
indicators = ["#", "##", "###", "\n\n\n", "Chapter", "Section"]
count = 0
for indicator in indicators:
count += content.count(indicator)
return count
def _assess_complexity(
self,
content: str,
token_count: int,
sections: int
) -> DocumentComplexity:
"""Assess document complexity."""
# Check for complex elements
has_tables = "|" in content and "---" in content
has_code = "```" in content
has_math = "$" in content or "\\(" in content
complexity_score = 0
if has_tables:
complexity_score += 1
if has_code:
complexity_score += 1
if has_math:
complexity_score += 1
if sections > 10:
complexity_score += 1
if token_count > 50000:
complexity_score += 1
if complexity_score >= 3:
return DocumentComplexity.COMPLEX
elif complexity_score >= 1:
return DocumentComplexity.MODERATE
return DocumentComplexity.SIMPLE
def _recommend_strategy(
self,
token_count: int,
available_tokens: int,
complexity: DocumentComplexity
) -> ContextStrategy:
"""Recommend retrieval strategy."""
# Fits entirely
if token_count <= available_tokens * 0.8:
return ContextStrategy.FULL_DOCUMENT
# Slightly over - smart selection might work
if token_count <= available_tokens * 1.5:
return ContextStrategy.SMART_SELECTION
# Much larger - need map-reduce
if token_count > available_tokens * 3:
return ContextStrategy.MAP_REDUCE
# Medium size or complex - use hybrid
return ContextStrategy.HYBRID
def analyze_batch(
self,
documents: List[Dict[str, str]]
) -> List[DocumentAnalysis]:
"""Analyze multiple documents."""
return [
self.analyze(doc["content"], doc.get("source", f"doc_{i}"))
for i, doc in enumerate(documents)
]What's Happening Here?
The DocumentAnalyzer examines your document and recommends the optimal strategy:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Strategy Selection Decision Tree │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Document Analysis │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Count tokens │ │
│ │ Detect sections │ │
│ │ Assess complexity │ │
│ └──────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────┐ │
│ │ tokens ≤ 80% of available context? │ │
│ └────────────────┬───────────────────┘ │
│ ┌─────────┴─────────┐ │
│ YES NO │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌────────────────────────┐ │
│ │ FULL_DOCUMENT │ │ tokens ≤ 150% available?│ │
│ │ ✓ Best quality│ └────────────┬───────────┘ │
│ │ ✓ All context │ ┌─────┴─────┐ │
│ └───────────────┘ YES NO │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌────────────────────────┐ │
│ │SMART_SELECTION│ │ tokens > 300% available?│ │
│ │ Select best │ └────────────┬───────────┘ │
│ │ sections │ ┌─────┴─────┐ │
│ └───────────────┘ YES NO │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌──────────┐ │
│ │ MAP_REDUCE │ │ HYBRID │ │
│ │ Parallel proc │ │ Mixed │ │
│ └───────────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Complexity Assessment Heuristics:
| Signal | Complexity Score | Why It Matters |
|---|---|---|
Tables present (| + ---) | +1 | Tables often contain dense, structured information |
Code blocks (```) | +1 | Code requires precise context preservation |
Math notation ($, \() | +1 | Mathematical relationships need full context |
| Many sections (>10) | +1 | Highly structured = more cross-references |
| Very long (>50K tokens) | +1 | Length itself adds complexity |
Complexity → Score → Action:
| Complexity Score | Classification | Implication |
|---|---|---|
| 0 | SIMPLE | Plain text, can chunk safely |
| 1-2 | MODERATE | Some structure, prefer larger chunks |
| 3+ | COMPLEX | Preserve structure, prefer full doc or smart selection |
Step 4: Context Builder
Build optimal context from documents:
# context_builder.py
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import re
from token_manager import TokenManager
from document_analyzer import DocumentAnalysis, DocumentComplexity
from config import get_settings, ContextStrategy
@dataclass
class ContextWindow:
"""Built context with metadata."""
content: str
token_count: int
strategy_used: ContextStrategy
sources: List[str]
sections_included: List[str]
truncated: bool
class ContextBuilder:
"""
Build context windows for LLM queries.
Strategies:
1. Full Document: Include entire document
2. Smart Selection: Include most relevant sections
3. Hybrid: Mix of full sections and retrieved chunks
"""
def __init__(self):
self.settings = get_settings()
self.token_manager = TokenManager()
def build_full_document_context(
self,
document: str,
max_tokens: Optional[int] = None
) -> ContextWindow:
"""
Use full document as context.
Args:
document: Full document text
max_tokens: Override token limit
Returns:
ContextWindow with full document
"""
max_tokens = max_tokens or self.settings.effective_context_limit
token_count = self.token_manager.count_tokens(document)
truncated = False
if token_count > max_tokens:
document = self.token_manager.truncate_to_limit(
document,
max_tokens,
truncation_strategy="end"
)
truncated = True
token_count = max_tokens
return ContextWindow(
content=document,
token_count=token_count,
strategy_used=ContextStrategy.FULL_DOCUMENT,
sources=["full_document"],
sections_included=["all"],
truncated=truncated
)
def build_smart_selection_context(
self,
document: str,
query: str,
max_tokens: Optional[int] = None
) -> ContextWindow:
"""
Select most relevant sections based on query.
Uses a scoring system to prioritize sections.
"""
max_tokens = max_tokens or self.settings.effective_context_limit
# Split into sections
sections = self._split_into_sections(document)
# Score sections by relevance to query
scored_sections = self._score_sections(sections, query)
# Build context from highest scored sections
context_parts = []
current_tokens = 0
sections_included = []
for section, score in scored_sections:
section_tokens = self.token_manager.count_tokens(section)
if current_tokens + section_tokens <= max_tokens:
context_parts.append(section)
current_tokens += section_tokens
sections_included.append(f"score_{score:.2f}")
else:
# Try to fit a truncated version
remaining = max_tokens - current_tokens
if remaining > 500: # Worth including partial
truncated_section = self.token_manager.truncate_to_limit(
section, remaining
)
context_parts.append(truncated_section)
current_tokens += remaining
break
content = "\n\n---\n\n".join(context_parts)
return ContextWindow(
content=content,
token_count=current_tokens,
strategy_used=ContextStrategy.SMART_SELECTION,
sources=["selected_sections"],
sections_included=sections_included,
truncated=len(scored_sections) > len(sections_included)
)
def _split_into_sections(self, document: str) -> List[str]:
"""Split document into logical sections."""
# Split by markdown headers or double newlines
section_pattern = r'\n(?=#{1,3}\s|\n\n)'
sections = re.split(section_pattern, document)
# Filter empty sections and merge tiny ones
result = []
current = ""
for section in sections:
section = section.strip()
if not section:
continue
section_tokens = self.token_manager.count_tokens(section)
if section_tokens < 100 and current:
current += "\n\n" + section
elif current and self.token_manager.count_tokens(current) < 200:
current += "\n\n" + section
else:
if current:
result.append(current)
current = section
if current:
result.append(current)
return result if result else [document]
def _score_sections(
self,
sections: List[str],
query: str
) -> List[Tuple[str, float]]:
"""Score sections by relevance to query."""
query_words = set(query.lower().split())
scored = []
for section in sections:
section_lower = section.lower()
# Simple TF-based scoring
score = 0.0
# Word overlap
for word in query_words:
if len(word) > 3: # Skip short words
score += section_lower.count(word) * 0.5
# Boost for headers containing query words
first_line = section.split('\n')[0].lower()
for word in query_words:
if word in first_line:
score += 2.0
# Boost for introduction/conclusion sections
if any(kw in first_line for kw in ['introduction', 'abstract', 'summary', 'conclusion']):
score += 1.0
scored.append((section, score))
# Sort by score descending
scored.sort(key=lambda x: x[1], reverse=True)
return scored
def build_hybrid_context(
self,
document: str,
query: str,
retrieved_chunks: List[str],
max_tokens: Optional[int] = None
) -> ContextWindow:
"""
Combine full sections with retrieved chunks.
Strategy:
1. Include introduction/overview sections fully
2. Add relevant retrieved chunks
3. Fill remaining space with scored sections
"""
max_tokens = max_tokens or self.settings.effective_context_limit
context_parts = []
current_tokens = 0
sources = []
# Reserve space for retrieved chunks (30% of context)
chunk_budget = int(max_tokens * 0.3)
section_budget = max_tokens - chunk_budget
# Add key sections first
sections = self._split_into_sections(document)
key_sections = self._get_key_sections(sections)
for section in key_sections:
section_tokens = self.token_manager.count_tokens(section)
if current_tokens + section_tokens <= section_budget:
context_parts.append(f"[SECTION]\n{section}\n[/SECTION]")
current_tokens += section_tokens
sources.append("key_section")
# Add retrieved chunks
for chunk in retrieved_chunks:
chunk_tokens = self.token_manager.count_tokens(chunk)
if current_tokens + chunk_tokens <= max_tokens:
context_parts.append(f"[RETRIEVED]\n{chunk}\n[/RETRIEVED]")
current_tokens += chunk_tokens
sources.append("retrieved")
content = "\n\n".join(context_parts)
return ContextWindow(
content=content,
token_count=current_tokens,
strategy_used=ContextStrategy.HYBRID,
sources=sources,
sections_included=[],
truncated=False
)
def _get_key_sections(self, sections: List[str]) -> List[str]:
"""Get key sections (intro, abstract, conclusion)."""
key_keywords = ['abstract', 'introduction', 'overview', 'summary',
'conclusion', 'results', 'findings']
key_sections = []
for section in sections:
first_line = section.split('\n')[0].lower()
if any(kw in first_line for kw in key_keywords):
key_sections.append(section)
return key_sectionsWhat's Happening Here?
The ContextBuilder implements three strategies for fitting content into context windows:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Context Building Strategies Compared │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ STRATEGY 1: Full Document │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Input: Entire document ││
│ │ ││
│ │ ┌─ Document ──────────────────────────────────────────────────────────┐ ││
│ │ │ Introduction │ ││
│ │ │ Chapter 1: Background │ ││
│ │ │ Chapter 2: Methods (references Chapter 1) │ ││
│ │ │ Chapter 3: Results (references Methods) │ ││
│ │ │ Conclusion (synthesizes all chapters) │ ││
│ │ └─────────────────────────────────────────────────────────────────────┘ ││
│ │ ││
│ │ Output: Everything preserved, cross-references work ✓ ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ STRATEGY 2: Smart Selection │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Query: "What methodology was used?" ││
│ │ ││
│ │ Section Scoring: ││
│ │ ┌─────────────────────────────────────────────────────────────────────┐ ││
│ │ │ Introduction score=1.0 (always useful context) │ ││
│ │ │ Chapter 2: Methods score=4.5 ★ (contains "method*" in header!) │ ││
│ │ │ Chapter 3: Results score=1.2 (mentions "method" once) │ ││
│ │ │ Conclusion score=1.0 (always useful context) │ ││
│ │ └─────────────────────────────────────────────────────────────────────┘ ││
│ │ ││
│ │ Output: Methods chapter + Intro + Conclusion (sorted by score) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ STRATEGY 3: Hybrid │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Combines: ││
│ │ 1. Key sections (intro, abstract, conclusion) - 70% of budget ││
│ │ 2. Retrieved chunks from vector search - 30% of budget ││
│ │ ││
│ │ ┌─────────────────────────────────────────────────────────────────────┐ ││
│ │ │ [SECTION] │ ││
│ │ │ Introduction: This paper presents... │ ││
│ │ │ [/SECTION] │ ││
│ │ │ │ ││
│ │ │ [RETRIEVED] │ ││
│ │ │ ...specific paragraph matching the query... │ ││
│ │ │ [/RETRIEVED] │ ││
│ │ └─────────────────────────────────────────────────────────────────────┘ ││
│ │ ││
│ │ Output: Context + targeted retrieval, best of both worlds ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘Section Scoring Algorithm:
┌─────────────────────────────────────────────────────────────────────────────┐
│ How Section Scoring Works │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Query: "machine learning accuracy improvements" │
│ │
│ For each section, calculate score: │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐│
│ │ 1. Word Overlap (×0.5 per match) ││
│ │ "machine" appears 5 times → +2.5 ││
│ │ "learning" appears 8 times → +4.0 ││
│ │ "accuracy" appears 3 times → +1.5 ││
│ │ "improvements" appears 2 times → +1.0 ││
│ │ Subtotal: 9.0 ││
│ └────────────────────────────────────────────────────────────────────────┘│
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐│
│ │ 2. Header Boost (×2.0 per match) ││
│ │ Header: "## Improving ML Accuracy" ││
│ │ "accuracy" in header → +2.0 ││
│ │ Subtotal: 2.0 ││
│ └────────────────────────────────────────────────────────────────────────┘│
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐│
│ │ 3. Structural Boost (×1.0 for key sections) ││
│ │ Is "introduction"? No ││
│ │ Is "conclusion"? No ││
│ │ Is "results"? Yes! → +1.0 ││
│ │ Subtotal: 1.0 ││
│ └────────────────────────────────────────────────────────────────────────┘│
│ │
│ Total Score: 9.0 + 2.0 + 1.0 = 12.0 │
│ │
│ Sections sorted by score, included until context budget exhausted. │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Step 5: Map-Reduce for Large Documents
Handle documents exceeding context limits:
# map_reduce.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain.schema import HumanMessage, SystemMessage
from token_manager import TokenManager
from config import get_settings, ModelProvider
@dataclass
class MapReduceResult:
"""Result from map-reduce processing."""
final_answer: str
intermediate_answers: List[str]
chunks_processed: int
total_tokens_used: int
class MapReduceProcessor:
"""
Process large documents using map-reduce pattern.
Map: Process each chunk independently
Reduce: Combine intermediate results
"""
def __init__(self):
self.settings = get_settings()
self.token_manager = TokenManager()
self.llm = self._create_llm()
def _create_llm(self):
"""Create LLM based on provider."""
if self.settings.provider == ModelProvider.OPENAI:
return ChatOpenAI(
model=self.settings.openai_model,
api_key=self.settings.openai_api_key,
temperature=0.1
)
return ChatAnthropic(
model=self.settings.anthropic_model,
api_key=self.settings.anthropic_api_key,
temperature=0.1
)
def process(
self,
document: str,
query: str,
chunk_size: Optional[int] = None
) -> MapReduceResult:
"""
Process large document with map-reduce.
Args:
document: Full document text
query: User query
chunk_size: Tokens per chunk
Returns:
MapReduceResult with final answer
"""
chunk_size = chunk_size or self.settings.chunk_size
# Split into chunks
chunks = self.token_manager.split_by_tokens(
document,
chunk_size,
overlap=200
)
# Map phase: process each chunk
intermediate_results = self._map_phase(chunks, query)
# Reduce phase: combine results
final_answer = self._reduce_phase(intermediate_results, query)
total_tokens = sum(
self.token_manager.count_tokens(r) for r in intermediate_results
)
return MapReduceResult(
final_answer=final_answer,
intermediate_answers=intermediate_results,
chunks_processed=len(chunks),
total_tokens_used=total_tokens
)
def _map_phase(
self,
chunks: List[str],
query: str
) -> List[str]:
"""Process each chunk independently."""
results = []
map_system = """You are analyzing a portion of a larger document.
Extract relevant information that helps answer the user's question.
If this portion doesn't contain relevant information, respond with "NO_RELEVANT_INFO".
Be concise but thorough in extracting key points."""
for i, chunk in enumerate(chunks):
map_prompt = f"""Document Portion {i + 1} of {len(chunks)}:
{chunk}
---
Question: {query}
Extract relevant information from this portion:"""
messages = [
SystemMessage(content=map_system),
HumanMessage(content=map_prompt)
]
response = self.llm.invoke(messages)
result = response.content
if "NO_RELEVANT_INFO" not in result:
results.append(result)
return results
def _reduce_phase(
self,
intermediate_results: List[str],
query: str
) -> str:
"""Combine intermediate results into final answer."""
if not intermediate_results:
return "No relevant information found in the document."
if len(intermediate_results) == 1:
return intermediate_results[0]
# Combine all intermediate results
combined = "\n\n---\n\n".join([
f"Finding {i + 1}:\n{result}"
for i, result in enumerate(intermediate_results)
])
reduce_system = """You are synthesizing information from multiple document portions.
Combine the findings into a coherent, comprehensive answer.
Remove redundancy and organize the information logically.
If findings conflict, note the discrepancy."""
reduce_prompt = f"""Extracted information from document portions:
{combined}
---
Original Question: {query}
Provide a comprehensive answer by synthesizing the above findings:"""
messages = [
SystemMessage(content=reduce_system),
HumanMessage(content=reduce_prompt)
]
response = self.llm.invoke(messages)
return response.content
async def process_async(
self,
document: str,
query: str,
chunk_size: Optional[int] = None,
max_concurrent: int = 5
) -> MapReduceResult:
"""
Process with concurrent map phase for speed.
"""
chunk_size = chunk_size or self.settings.chunk_size
chunks = self.token_manager.split_by_tokens(
document,
chunk_size,
overlap=200
)
# Concurrent map phase
semaphore = asyncio.Semaphore(max_concurrent)
async def process_chunk(chunk: str, index: int) -> Optional[str]:
async with semaphore:
return await asyncio.to_thread(
self._process_single_chunk,
chunk,
query,
index,
len(chunks)
)
tasks = [
process_chunk(chunk, i)
for i, chunk in enumerate(chunks)
]
intermediate_results = await asyncio.gather(*tasks)
intermediate_results = [r for r in intermediate_results if r]
# Reduce phase
final_answer = self._reduce_phase(intermediate_results, query)
return MapReduceResult(
final_answer=final_answer,
intermediate_answers=intermediate_results,
chunks_processed=len(chunks),
total_tokens_used=0
)
def _process_single_chunk(
self,
chunk: str,
query: str,
index: int,
total: int
) -> Optional[str]:
"""Process a single chunk."""
map_system = """Extract relevant information for the question.
If not relevant, respond with "NO_RELEVANT_INFO"."""
map_prompt = f"""Chunk {index + 1}/{total}:
{chunk}
Question: {query}
Relevant information:"""
messages = [
SystemMessage(content=map_system),
HumanMessage(content=map_prompt)
]
response = self.llm.invoke(messages)
if "NO_RELEVANT_INFO" in response.content:
return None
return response.contentWhat's Happening Here?
Map-reduce processes documents that exceed even the largest context windows:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Map-Reduce Processing Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Input: 500,000 token document (too large for any context window) │
│ │
│ STEP 1: Split into Chunks (2,000 tokens each with 200 overlap) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ││
│ │ │ C1 │ │ C2 │ │ C3 │ │ C4 │ ... │C250 │ (250 chunks) ││
│ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ ││
│ │ └──┬──┘ ││
│ │ overlap ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ STEP 2: MAP Phase (Can Run in Parallel!) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Question: "What are the main findings?" ││
│ │ ││
│ │ ┌─────┐ ┌─────────────┐ ┌─────────────────────────────────────┐ ││
│ │ │ C1 │ ─► │ LLM Extract │ ─► │ "The study found that X increased" │ ││
│ │ └─────┘ └─────────────┘ └─────────────────────────────────────┘ ││
│ │ ││
│ │ ┌─────┐ ┌─────────────┐ ┌─────────────────────────────────────┐ ││
│ │ │ C2 │ ─► │ LLM Extract │ ─► │ "NO_RELEVANT_INFO" (skipped) │ ││
│ │ └─────┘ └─────────────┘ └─────────────────────────────────────┘ ││
│ │ ││
│ │ ┌─────┐ ┌─────────────┐ ┌─────────────────────────────────────┐ ││
│ │ │ C3 │ ─► │ LLM Extract │ ─► │ "Results showed Y with p<0.01" │ ││
│ │ └─────┘ └─────────────┘ └─────────────────────────────────────┘ ││
│ │ ││
│ │ ... (all chunks processed in parallel) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ STEP 3: REDUCE Phase (Sequential - Combines Results) │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ Intermediate Findings: ││
│ │ ┌─────────────────────────────────────────────────────────────────────┐││
│ │ │ Finding 1: "The study found that X increased..." │││
│ │ │ Finding 2: "Results showed Y with p<0.01..." │││
│ │ │ Finding 3: "The conclusion states Z..." │││
│ │ │ ... │││
│ │ └─────────────────────────────────────────────────────────────────────┘││
│ │ │ ││
│ │ ▼ ││
│ │ ┌─────────────────────────────────────────────────────────────────────┐││
│ │ │ LLM Synthesize: "The main findings are: 1) X increased... 2) Y... │││
│ │ │ 3) Z... These findings together suggest..." │││
│ │ └─────────────────────────────────────────────────────────────────────┘││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘Map-Reduce Design Decisions:
| Design Choice | Reasoning |
|---|---|
| 200 token overlap | Prevents losing information at chunk boundaries. Sentences that span chunks are captured in both. |
| "NO_RELEVANT_INFO" response | Saves reduce phase from processing irrelevant chunks. LLM explicitly signals when chunk doesn't help. |
| Async processing with semaphore | Limits concurrent API calls to avoid rate limiting while maximizing parallelism. |
| Numbered findings in reduce | Helps LLM track and synthesize multiple sources, reduces hallucination. |
When to Use Map-Reduce vs Other Strategies:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Strategy Selection Guide │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Document Size Best Strategy Why │
│ ────────────────────────────────────────────────────────────────────────── │
│ <80% context FULL_DOCUMENT Complete context, best quality │
│ │
│ 80%-150% context SMART_SELECTION Fit by dropping low-value sections │
│ │
│ 150%-300% context HYBRID Key sections + targeted retrieval │
│ │
│ >300% context MAP_REDUCE Only option for very large docs │
│ │
│ Query Type Considerations: │
│ ────────────────────────────────────────────────────────────────────────── │
│ "Summarize the document" → Needs broad coverage → MAP_REDUCE │
│ "What does section 3 say?" → Targeted → SMART_SELECTION │
│ "How does X relate to Y?" → Cross-references → FULL_DOCUMENT if fits │
│ "Find all mentions of Z" → Exhaustive → MAP_REDUCE │
│ │
└─────────────────────────────────────────────────────────────────────────────┘★ Insight ─────────────────────────────────────
Map-reduce trades latency for capability. The map phase can be parallelized (each chunk is independent), but the reduce phase must be sequential. For very large documents, consider a hierarchical reduce - combine pairs of results iteratively until you have one final answer.
─────────────────────────────────────────────────
Step 6: RAG Pipeline
Orchestrate all components:
# rag_pipeline.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_anthropic import ChatAnthropic
from langchain_chroma import Chroma
from langchain.schema import HumanMessage, SystemMessage
from config import get_settings, ContextStrategy, ModelProvider
from token_manager import TokenManager
from document_analyzer import DocumentAnalyzer, DocumentAnalysis
from context_builder import ContextBuilder, ContextWindow
from map_reduce import MapReduceProcessor
@dataclass
class LongContextResponse:
"""Response with strategy metadata."""
answer: str
strategy_used: ContextStrategy
tokens_used: int
document_tokens: int
confidence: str
class LongContextRAG:
"""
RAG system that adapts to document size.
Automatically chooses optimal strategy based on:
- Document length
- Context window size
- Query complexity
"""
def __init__(self):
self.settings = get_settings()
self.token_manager = TokenManager()
self.analyzer = DocumentAnalyzer()
self.context_builder = ContextBuilder()
self.map_reduce = MapReduceProcessor()
self.llm = self._create_llm()
self.embeddings = OpenAIEmbeddings(
model=self.settings.embedding_model,
openai_api_key=self.settings.openai_api_key
)
self.vectorstore = self._init_vectorstore()
self.documents: Dict[str, str] = {} # doc_id -> content
def _create_llm(self):
"""Create LLM based on provider."""
if self.settings.provider == ModelProvider.OPENAI:
return ChatOpenAI(
model=self.settings.openai_model,
api_key=self.settings.openai_api_key,
temperature=0.1
)
return ChatAnthropic(
model=self.settings.anthropic_model,
api_key=self.settings.anthropic_api_key,
temperature=0.1
)
def _init_vectorstore(self) -> Chroma:
"""Initialize vector store for fallback retrieval."""
return Chroma(
collection_name=self.settings.collection_name,
embedding_function=self.embeddings,
persist_directory=self.settings.persist_directory
)
def add_document(
self,
content: str,
doc_id: str,
metadata: Optional[Dict[str, Any]] = None
) -> DocumentAnalysis:
"""
Add document and analyze for optimal strategy.
Args:
content: Document text
doc_id: Unique identifier
metadata: Additional metadata
Returns:
DocumentAnalysis with recommendations
"""
# Store full document
self.documents[doc_id] = content
# Analyze document
analysis = self.analyzer.analyze(content, doc_id)
# Also index chunks for hybrid retrieval
chunks = self.token_manager.split_by_tokens(
content,
self.settings.chunk_size,
overlap=200
)
from langchain.schema import Document
docs = [
Document(
page_content=chunk,
metadata={"doc_id": doc_id, "chunk_index": i, **(metadata or {})}
)
for i, chunk in enumerate(chunks)
]
self.vectorstore.add_documents(docs)
return analysis
def query(
self,
question: str,
doc_id: Optional[str] = None,
force_strategy: Optional[ContextStrategy] = None
) -> LongContextResponse:
"""
Query with adaptive strategy.
Args:
question: User question
doc_id: Specific document to query (optional)
force_strategy: Override automatic strategy selection
Returns:
LongContextResponse with answer and metadata
"""
# Get document
if doc_id and doc_id in self.documents:
document = self.documents[doc_id]
elif doc_id:
raise ValueError(f"Document {doc_id} not found")
else:
# Use all documents concatenated
document = "\n\n---\n\n".join(self.documents.values())
# Analyze and select strategy
analysis = self.analyzer.analyze(document)
strategy = force_strategy or analysis.recommended_strategy
# Execute strategy
if strategy == ContextStrategy.FULL_DOCUMENT:
return self._query_full_document(question, document, analysis)
elif strategy == ContextStrategy.SMART_SELECTION:
return self._query_smart_selection(question, document, analysis)
elif strategy == ContextStrategy.MAP_REDUCE:
return self._query_map_reduce(question, document, analysis)
else: # HYBRID
return self._query_hybrid(question, document, analysis)
def _query_full_document(
self,
question: str,
document: str,
analysis: DocumentAnalysis
) -> LongContextResponse:
"""Query using full document context."""
context = self.context_builder.build_full_document_context(document)
system_prompt = """You are a helpful assistant with access to a complete document.
Answer questions thoroughly using the full context available.
Cite specific sections when relevant."""
user_prompt = f"""Document:
{context.content}
---
Question: {question}
Answer:"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
response = self.llm.invoke(messages)
return LongContextResponse(
answer=response.content,
strategy_used=ContextStrategy.FULL_DOCUMENT,
tokens_used=context.token_count,
document_tokens=analysis.token_count,
confidence="high" if not context.truncated else "medium"
)
def _query_smart_selection(
self,
question: str,
document: str,
analysis: DocumentAnalysis
) -> LongContextResponse:
"""Query using smart section selection."""
context = self.context_builder.build_smart_selection_context(
document,
question
)
system_prompt = """You are answering based on selected relevant sections.
These sections were chosen as most relevant to the question.
If information seems incomplete, mention that the full document may contain more details."""
user_prompt = f"""Selected Document Sections:
{context.content}
---
Question: {question}
Answer:"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
response = self.llm.invoke(messages)
return LongContextResponse(
answer=response.content,
strategy_used=ContextStrategy.SMART_SELECTION,
tokens_used=context.token_count,
document_tokens=analysis.token_count,
confidence="medium"
)
def _query_map_reduce(
self,
question: str,
document: str,
analysis: DocumentAnalysis
) -> LongContextResponse:
"""Query using map-reduce for large documents."""
result = self.map_reduce.process(document, question)
return LongContextResponse(
answer=result.final_answer,
strategy_used=ContextStrategy.MAP_REDUCE,
tokens_used=result.total_tokens_used,
document_tokens=analysis.token_count,
confidence="medium" if result.chunks_processed > 1 else "high"
)
def _query_hybrid(
self,
question: str,
document: str,
analysis: DocumentAnalysis
) -> LongContextResponse:
"""Query using hybrid approach."""
# Retrieve relevant chunks
results = self.vectorstore.similarity_search(question, k=5)
retrieved_chunks = [doc.page_content for doc in results]
# Build hybrid context
context = self.context_builder.build_hybrid_context(
document,
question,
retrieved_chunks
)
system_prompt = """You are answering using a combination of:
1. Key document sections (marked with [SECTION])
2. Retrieved relevant passages (marked with [RETRIEVED])
Synthesize information from both sources for a complete answer."""
user_prompt = f"""Context:
{context.content}
---
Question: {question}
Answer:"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
response = self.llm.invoke(messages)
return LongContextResponse(
answer=response.content,
strategy_used=ContextStrategy.HYBRID,
tokens_used=context.token_count,
document_tokens=analysis.token_count,
confidence="high"
)What's Happening Here?
The LongContextRAG class orchestrates all components into a unified system:
┌─────────────────────────────────────────────────────────────────────────────┐
│ LongContextRAG Orchestration Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ add_document() Flow: │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ Document ──┬──► Store in self.documents dict (for full doc access) ││
│ │ │ ││
│ │ ├──► Analyze → Get token count, complexity, recommendations ││
│ │ │ ││
│ │ └──► Chunk → Store in ChromaDB (for hybrid retrieval) ││
│ │ ││
│ │ Return: DocumentAnalysis (so caller knows recommended strategy) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
│ query() Flow: │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ Question + doc_id ││
│ │ │ ││
│ │ ▼ ││
│ │ ┌───────────────────────────────────────┐ ││
│ │ │ 1. Get document(s) from storage │ ││
│ │ │ 2. Analyze document size + complexity │ ││
│ │ │ 3. Select strategy (or use override) │ ││
│ │ └───────────────────┬───────────────────┘ ││
│ │ │ ││
│ │ ┌────────────┼────────────┬────────────┐ ││
│ │ ▼ ▼ ▼ ▼ ││
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││
│ │ │ FULL │ │ SMART │ │ MAP │ │ HYBRID │ ││
│ │ │DOCUMENT │ │SELECTION│ │ REDUCE │ │ │ ││
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ ││
│ │ │ │ │ │ ││
│ │ └────────────┴────────────┴────────────┘ ││
│ │ │ ││
│ │ ▼ ││
│ │ ┌───────────────────────────────────────────────────────────────────┐ ││
│ │ │ LongContextResponse(answer, strategy_used, tokens, confidence) │ ││
│ │ └───────────────────────────────────────────────────────────────────┘ ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘Confidence Scoring Logic:
| Strategy | Condition | Confidence | Reasoning |
|---|---|---|---|
| FULL_DOCUMENT | Not truncated | HIGH | All context available |
| FULL_DOCUMENT | Truncated | MEDIUM | Some content lost |
| SMART_SELECTION | Any | MEDIUM | May miss relevant sections |
| MAP_REDUCE | Single chunk relevant | HIGH | Clear single source |
| MAP_REDUCE | Multiple chunks | MEDIUM | Info synthesized from parts |
| HYBRID | Any | HIGH | Best of both approaches |
Why Store Both Full Docs AND Chunks?
┌─────────────────────────────────────────────────────────────────────────────┐
│ Dual Storage Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ self.documents (Dict) ChromaDB (Vector Store) │
│ ┌───────────────────────┐ ┌───────────────────────────────────────┐ │
│ │ "report_2024": │ │ Chunks from report_2024: │ │
│ │ [Full 80K doc] │ │ chunk_0: [embedding] + metadata │ │
│ │ │ │ chunk_1: [embedding] + metadata │ │
│ │ "contract_v2": │ │ chunk_2: [embedding] + metadata │ │
│ │ [Full 45K doc] │ │ ... │ │
│ │ │ │ │ │
│ └───────────────────────┘ │ Chunks from contract_v2: │ │
│ │ │ chunk_0: [embedding] + metadata │ │
│ │ │ ... │ │
│ │ └───────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ Used for: FULL_DOCUMENT, Used for: HYBRID retrieval, │
│ SMART_SELECTION, MAP_REDUCE semantic search, fallback │
│ │
│ Why both? │
│ • Full docs: Preserve structure, cross-references │
│ • Chunks: Enable fast semantic search for hybrid strategy │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Step 7: FastAPI Application
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
from contextlib import asynccontextmanager
from rag_pipeline import LongContextRAG, LongContextResponse
from document_analyzer import DocumentAnalysis
from config import ContextStrategy
# Global instance
rag: Optional[LongContextRAG] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global rag
rag = LongContextRAG()
yield
app = FastAPI(
title="Long Context RAG API",
description="RAG with adaptive context strategies for any document size",
version="1.0.0",
lifespan=lifespan
)
# Request/Response Models
class AddDocumentRequest(BaseModel):
content: str
doc_id: str
metadata: Optional[dict] = None
class QueryRequest(BaseModel):
question: str
doc_id: Optional[str] = None
force_strategy: Optional[str] = None
class AnalysisResponse(BaseModel):
source: str
token_count: int
recommended_strategy: str
fits_full_context: bool
complexity: str
class QueryResponse(BaseModel):
answer: str
strategy_used: str
tokens_used: int
document_tokens: int
confidence: str
# Endpoints
@app.post("/documents", response_model=AnalysisResponse)
async def add_document(request: AddDocumentRequest):
"""Add a document and get analysis."""
if not rag:
raise HTTPException(500, "RAG not initialized")
analysis = rag.add_document(
content=request.content,
doc_id=request.doc_id,
metadata=request.metadata
)
return AnalysisResponse(**analysis.to_dict())
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""Query the RAG system."""
if not rag:
raise HTTPException(500, "RAG not initialized")
strategy = None
if request.force_strategy:
strategy = ContextStrategy(request.force_strategy)
response = rag.query(
question=request.question,
doc_id=request.doc_id,
force_strategy=strategy
)
return QueryResponse(
answer=response.answer,
strategy_used=response.strategy_used.value,
tokens_used=response.tokens_used,
document_tokens=response.document_tokens,
confidence=response.confidence
)
@app.get("/documents")
async def list_documents():
"""List all indexed documents."""
if not rag:
raise HTTPException(500, "RAG not initialized")
return {
"documents": list(rag.documents.keys()),
"count": len(rag.documents)
}
@app.get("/health")
async def health():
return {"status": "healthy", "rag_initialized": rag is not None}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Usage Examples
Basic Usage
from rag_pipeline import LongContextRAG
# Initialize
rag = LongContextRAG()
# Add a document (automatically analyzed)
with open("long_report.txt") as f:
content = f.read()
analysis = rag.add_document(content, "report_2024")
print(f"Recommended strategy: {analysis.recommended_strategy}")
print(f"Document tokens: {analysis.token_count}")
# Query - strategy selected automatically
response = rag.query("What are the main findings?", doc_id="report_2024")
print(f"Answer: {response.answer}")
print(f"Strategy used: {response.strategy_used}")Force Specific Strategy
from config import ContextStrategy
# Force full document (if you know it fits)
response = rag.query(
"Summarize everything",
doc_id="report_2024",
force_strategy=ContextStrategy.FULL_DOCUMENT
)
# Force map-reduce for thorough analysis
response = rag.query(
"List every person mentioned",
doc_id="report_2024",
force_strategy=ContextStrategy.MAP_REDUCE
)Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────────┐
│ LONG CONTEXT RAG ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DOCUMENT INPUT │
│ ┌──────────┐ ┌─────────┐ │
│ │ Document │ │ Query │ │
│ └────┬─────┘ └────┬────┘ │
│ │ │ │
│ ─────┼───────────────┼──────────────────────────────────────────────────────│
│ ▼ │ │
│ DOCUMENT ANALYSIS │ │
│ Token Analyzer ───► Complexity ───► Strategy Selector ◄────────┘ │
│ Assessment │ │
│ │ │
│ ─────────────────────────────────────────────┼────────────────────────────│
│ │ │
│ CONTEXT STRATEGIES │ │
│ ┌───────────────────────────────────┼───────────────────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Full Document │ │Smart Selection│ │ Map-Reduce │ │ Hybrid │ │
│ │ (<100K) │ │ (100K-150K) │ │ (>500K) │ │ (Complex) │ │
│ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │
│ │ │ │ │ │
│ ─────────┼─────────────────┼─────────────────┼─────────────────┼─────────│
│ │ │ │ │ │
│ CONTEXT BUILDING ▼ │ Vector Store │
│ └────────► Context Builder ◄────────┘ │ │
│ │ │ │
│ ▼ │ │
│ Token Manager ◄────────────────────────────┘ │
│ │ │
│ ──────────────────────────┼────────────────────────────────────────────── │
│ │ │
│ LLM GENERATION ▼ │
│ GPT-4-turbo / Claude (128K-200K context) │
│ │ │
│ ▼ │
│ Response │
└─────────────────────────────────────────────────────────────────────────────┘Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Long Context Windows | 128K-200K token capacity in modern LLMs | Can fit entire documents without chunking |
| Token Counting | Accurate measurement with tiktoken | Prevents context overflow, enables planning |
| Full Document Mode | Send entire doc when it fits | Preserves all cross-references and structure |
| Smart Selection | Extract most relevant sections | Fit large docs by removing less relevant parts |
| Map-Reduce | Process in parallel, then combine | Handle docs larger than any context window |
| Hybrid Approach | Full doc + vector search for specific queries | Best of both: full context + targeted retrieval |
| Complexity Assessment | Analyze query needs | Some queries need full doc, some just a section |
| Strategy Selection | Pick approach based on doc + query | Optimize for both quality and cost |
Key Takeaways
- Context size matters - Modern LLMs have 128K-200K context windows; use them
- Chunking is a last resort - Full documents preserve relationships that chunks destroy
- Adaptive strategies win - Different documents need different approaches
- Token counting is critical - Accurate counting prevents context overflow