Corrective RAG
Build a self-correcting RAG system that evaluates retrieval quality and triggers corrective actions
Corrective RAG (CRAG)
TL;DR
Traditional RAG blindly trusts retrieval—if it retrieves junk, it generates junk. Corrective RAG evaluates retrieval quality first, then takes action: use refined knowledge if good, fall back to web search if bad, or combine both if uncertain. This prevents hallucination from irrelevant documents and makes your RAG system robust to retrieval failures.
| Property | Value |
|---|---|
| Difficulty | Intermediate |
| Time | ~5 hours |
| Code Size | ~450 LOC |
| Prerequisites | RAG with Reranking |
Tech Stack
| Technology | Purpose |
|---|---|
| OpenAI | GPT-4 + Embeddings |
| ChromaDB | Vector database |
| SearXNG | Web search fallback (self-hosted) |
| Pydantic | Structured outputs |
| FastAPI | REST API |
Prerequisites
- Completed RAG with Reranking tutorial
- Python 3.10+
- OpenAI API key
- Docker (for SearXNG) - self-hosted metasearch engine for web search fallback
What You'll Learn
- Build a retrieval evaluator to assess document relevance
- Implement three corrective actions: Correct, Incorrect, Ambiguous
- Design knowledge refinement using decompose-then-recompose
- Integrate web search as a fallback for poor retrievals
- Create a robust RAG pipeline that handles retrieval failures
Research Foundation
This project implements the concepts from Corrective Retrieval Augmented Generation (CRAG, January 2024).
The Problem: When Retrieval Goes Wrong
Standard RAG assumes retrieved documents are relevant. But what happens when they're not?
| Scenario | Traditional RAG | Result |
|---|---|---|
| Good retrieval | Uses documents | ✅ Good answer |
| Irrelevant documents | Still uses them | ❌ Hallucination |
| Partially relevant | Uses all equally | ⚠️ Noisy answer |
┌─────────────────────────────────────────────────────────────────┐
│ TRADITIONAL RAG ❌ │
│ │
│ Query ───► Retrieve ───► Generate ───► May Hallucinate ⚠️ │
│ │
│ (No quality check - bad documents → bad answers) │
└─────────────────────────────────────────────────────────────────┘CRAG's insight: Evaluate retrieval quality BEFORE generation, then take corrective action.
┌─────────────────────────────────────────────────────────────────┐
│ CORRECTIVE RAG ✅ │
│ │
│ Query ───► Retrieve ───► Evaluator │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ [Correct] [Ambiguous] [Incorrect] │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Refine Both Sources Web Search │
│ Knowledge │ │ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ │ │
│ ▼ │
│ Generate │
│ │ │
│ ▼ │
│ Grounded Answer ✓ │
└─────────────────────────────────────────────────────────────────┘Project Structure
corrective-rag/
├── config.py # Configuration
├── retriever.py # Document retrieval
├── evaluator.py # Retrieval quality evaluation
├── refiner.py # Knowledge refinement
├── web_search.py # Web search fallback
├── corrective_rag.py # Main orchestration
├── app.py # FastAPI application
└── requirements.txtStep 1: Configuration
# config.py
from pydantic_settings import BaseSettings
from pydantic import Field
from functools import lru_cache
from enum import Enum
class RetrievalAction(str, Enum):
"""Actions based on retrieval evaluation."""
CORRECT = "correct" # Retrieval is good - refine and use
INCORRECT = "incorrect" # Retrieval is bad - use web search
AMBIGUOUS = "ambiguous" # Uncertain - combine both
class Settings(BaseSettings):
"""Application configuration."""
openai_api_key: str
searxng_url: str = "http://localhost:8080" # SearXNG instance URL
# Model settings
embedding_model: str = "text-embedding-3-small"
llm_model: str = "gpt-4o-mini"
evaluator_model: str = "gpt-4o-mini"
# Retrieval settings
retrieval_k: int = 5
# Evaluation thresholds
correct_threshold: float = 0.7 # Above = CORRECT
incorrect_threshold: float = 0.3 # Below = INCORRECT
# Between = AMBIGUOUS
# Refinement settings
max_sentences_per_doc: int = 5
# Web search settings
web_search_max_results: int = 3
# ChromaDB
chroma_persist_dir: str = "./chroma_db"
collection_name: str = "corrective_rag_docs"
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()Step 2: Retrieval Evaluator
The core of CRAG: assessing whether retrieved documents are relevant.
# evaluator.py
from openai import OpenAI
from pydantic import BaseModel, Field
from config import get_settings, RetrievalAction
class DocumentRelevance(BaseModel):
"""Relevance assessment for a single document."""
doc_index: int
is_relevant: bool
relevance_score: float = Field(ge=0, le=1)
key_information: list[str]
irrelevant_parts: list[str]
class RetrievalEvaluation(BaseModel):
"""Overall evaluation of retrieval quality."""
action: RetrievalAction
confidence: float = Field(ge=0, le=1)
overall_score: float = Field(ge=0, le=1)
document_evaluations: list[DocumentRelevance]
reasoning: str
class RetrievalEvaluator:
"""Evaluates retrieval quality and determines corrective action."""
def __init__(self):
settings = get_settings()
self.client = OpenAI(api_key=settings.openai_api_key)
self.model = settings.evaluator_model
self.correct_threshold = settings.correct_threshold
self.incorrect_threshold = settings.incorrect_threshold
def evaluate(
self,
query: str,
documents: list[dict]
) -> RetrievalEvaluation:
"""
Evaluate retrieval quality and determine action.
Args:
query: User query
documents: List of retrieved documents with 'content' and 'source'
Returns:
Evaluation with recommended action
"""
# Format documents for evaluation
docs_text = ""
for i, doc in enumerate(documents):
docs_text += f"\n[Document {i}]\n{doc['content']}\n"
system_prompt = """You are a retrieval quality evaluator. Assess whether
the retrieved documents are relevant and sufficient to answer the query.
For each document, determine:
1. Is it relevant to the query? (true/false)
2. Relevance score (0.0-1.0)
3. What key information does it contain for answering the query?
4. What parts are irrelevant or noise?
Then provide an overall assessment:
- overall_score: Average relevance (0.0-1.0)
- action: "correct" (good retrieval), "incorrect" (bad retrieval), or "ambiguous" (mixed)
- confidence: How confident you are in this assessment (0.0-1.0)
Return JSON:
{
"document_evaluations": [
{
"doc_index": 0,
"is_relevant": true,
"relevance_score": 0.85,
"key_information": ["fact1", "fact2"],
"irrelevant_parts": ["noise1"]
}
],
"overall_score": 0.75,
"action": "correct",
"confidence": 0.8,
"reasoning": "Why this action was chosen"
}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Query: {query}\n\nDocuments:{docs_text}"}
],
response_format={"type": "json_object"}
)
import json
result = json.loads(response.choices[0].message.content)
# Determine action based on thresholds
overall_score = result.get("overall_score", 0.5)
if overall_score >= self.correct_threshold:
action = RetrievalAction.CORRECT
elif overall_score <= self.incorrect_threshold:
action = RetrievalAction.INCORRECT
else:
action = RetrievalAction.AMBIGUOUS
return RetrievalEvaluation(
action=action,
confidence=result.get("confidence", 0.5),
overall_score=overall_score,
document_evaluations=[
DocumentRelevance(**doc_eval)
for doc_eval in result.get("document_evaluations", [])
],
reasoning=result.get("reasoning", "")
)
class LightweightEvaluator:
"""Fast rule-based evaluator for latency-sensitive applications."""
def __init__(self):
settings = get_settings()
self.correct_threshold = settings.correct_threshold
self.incorrect_threshold = settings.incorrect_threshold
def evaluate(
self,
query: str,
documents: list[dict],
distances: list[float]
) -> RetrievalEvaluation:
"""
Evaluate using retrieval distances as proxy for relevance.
Lower distance = higher relevance in vector search.
"""
# Convert distances to relevance scores
relevance_scores = [max(0, 1 - dist) for dist in distances]
overall_score = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0
# Determine action
if overall_score >= self.correct_threshold:
action = RetrievalAction.CORRECT
elif overall_score <= self.incorrect_threshold:
action = RetrievalAction.INCORRECT
else:
action = RetrievalAction.AMBIGUOUS
# Create document evaluations
doc_evals = [
DocumentRelevance(
doc_index=i,
is_relevant=score >= 0.5,
relevance_score=score,
key_information=[],
irrelevant_parts=[]
)
for i, score in enumerate(relevance_scores)
]
return RetrievalEvaluation(
action=action,
confidence=0.6, # Lower confidence for rule-based
overall_score=overall_score,
document_evaluations=doc_evals,
reasoning=f"Distance-based evaluation: avg score {overall_score:.2f}"
)Understanding the Evaluation Decision:
┌─────────────────────────────────────────────────────────────┐
│ Query: "What is machine learning?" │
│ │
│ Retrieved Documents: │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Doc 0: "ML enables computers to learn from data..." │ │
│ │ Score: 0.92 ✓ Highly relevant │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ Doc 1: "Common algorithms include neural networks..." │ │
│ │ Score: 0.85 ✓ Relevant │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ Doc 2: "The weather in Paris is mild..." │ │
│ │ Score: 0.15 ✗ Irrelevant │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Overall Score: (0.92 + 0.85 + 0.15) / 3 = 0.64 │
│ Action: AMBIGUOUS (between 0.3 and 0.7) │
└─────────────────────────────────────────────────────────────┘The Three Actions Explained:
| Action | Score Range | What Happens | Why |
|---|---|---|---|
| CORRECT | > 0.7 | Refine documents, use them | High confidence in retrieval |
| INCORRECT | < 0.3 | Web search fallback | Documents are noise |
| AMBIGUOUS | 0.3 - 0.7 | Combine refined docs + web | Hedge our bets |
LLM vs Lightweight Evaluator:
| Aspect | LLM Evaluator | Lightweight (Distance-Based) |
|---|---|---|
| Accuracy | High (understands semantics) | Medium (proxy metric) |
| Latency | ~500ms | ~1ms |
| Cost | Tokens | Free |
| Use when | Quality matters most | Latency matters most |
Step 3: Knowledge Refiner
The decompose-then-recompose algorithm to extract only relevant information.
# refiner.py
from openai import OpenAI
from pydantic import BaseModel
from config import get_settings
from evaluator import DocumentRelevance
class RefinedKnowledge(BaseModel):
"""Refined knowledge extracted from documents."""
key_facts: list[str]
supporting_quotes: list[str]
source_documents: list[int]
refinement_ratio: float # How much was filtered out
class KnowledgeRefiner:
"""
Refines retrieved documents using decompose-then-recompose.
This removes irrelevant information while preserving key knowledge.
"""
def __init__(self):
settings = get_settings()
self.client = OpenAI(api_key=settings.openai_api_key)
self.model = settings.llm_model
self.max_sentences = settings.max_sentences_per_doc
def refine(
self,
query: str,
documents: list[dict],
evaluations: list[DocumentRelevance]
) -> RefinedKnowledge:
"""
Refine documents by extracting only relevant information.
Steps:
1. Decompose each document into atomic facts
2. Filter facts by relevance to query
3. Recompose into coherent knowledge
"""
# Step 1 & 2: Decompose and filter
all_facts = []
all_quotes = []
source_docs = []
for doc, eval_result in zip(documents, evaluations):
if not eval_result.is_relevant:
continue
# Extract facts from this document
facts = self._extract_facts(query, doc['content'])
for fact in facts:
all_facts.append(fact)
source_docs.append(eval_result.doc_index)
# Keep key quotes
if eval_result.key_information:
all_quotes.extend(eval_result.key_information[:2])
# Step 3: Recompose - deduplicate and organize
unique_facts = list(dict.fromkeys(all_facts)) # Preserve order, remove dupes
# Calculate refinement ratio
original_length = sum(len(d['content']) for d in documents)
refined_length = sum(len(f) for f in unique_facts)
ratio = 1 - (refined_length / original_length) if original_length > 0 else 0
return RefinedKnowledge(
key_facts=unique_facts[:10], # Top 10 facts
supporting_quotes=all_quotes[:5],
source_documents=list(set(source_docs)),
refinement_ratio=ratio
)
def _extract_facts(self, query: str, content: str) -> list[str]:
"""Extract atomic facts relevant to the query."""
system_prompt = """Extract atomic facts from the document that are relevant
to answering the query. Each fact should be:
1. Self-contained (understandable without context)
2. Directly relevant to the query
3. Concise (one sentence)
Return JSON: {"facts": ["fact1", "fact2", ...]}
Only include facts that help answer the query. Omit background info, examples,
and tangential information."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Query: {query}\n\nDocument:\n{content}"}
],
response_format={"type": "json_object"}
)
import json
result = json.loads(response.choices[0].message.content)
return result.get("facts", [])[:self.max_sentences]Understanding Decompose-Then-Recompose:
ORIGINAL DOCUMENT:
"Machine learning is a subset of AI. It was coined by Arthur Samuel in 1959.
ML algorithms learn from data. Popular libraries include TensorFlow, PyTorch,
and scikit-learn. The weather today is sunny. Training requires GPUs for
large models. Arthur Samuel worked at IBM."
Query: "What is machine learning?"
│
▼
┌─────────────────────────────────────────────────────────────┐
│ DECOMPOSE: Extract atomic facts │
│ │
│ 1. "Machine learning is a subset of AI" ✓ │
│ 2. "It was coined by Arthur Samuel in 1959" ✓ │
│ 3. "ML algorithms learn from data" ✓ │
│ 4. "Popular libraries: TensorFlow, PyTorch, sklearn" ~ │
│ 5. "The weather today is sunny" ✗ │
│ 6. "Training requires GPUs for large models" ~ │
│ 7. "Arthur Samuel worked at IBM" ~ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ FILTER: Keep only relevant facts │
│ │
│ ✓ "Machine learning is a subset of AI" │
│ ✓ "It was coined by Arthur Samuel in 1959" │
│ ✓ "ML algorithms learn from data" │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ RECOMPOSE: Deduplicate and organize │
│ │
│ Refinement ratio: 70% filtered out │
│ (Removed weather, tangential details) │
└─────────────────────────────────────────────────────────────┘Why This Works:
| Without Refinement | With Refinement |
|---|---|
| LLM sees "weather is sunny" | Only query-relevant facts |
| May get confused by noise | Focused context |
| Wastes tokens on irrelevant content | Token-efficient |
| Risk of hallucination from noise | Grounded in filtered facts |
Step 4: Web Search Fallback
When retrieval fails, fall back to web search using SearXNG.
Setting Up SearXNG
First, run SearXNG locally with Docker:
docker run -d --name searxng \
-p 8080:8080 \
-e SEARXNG_SECRET="your-secret-key" \
searxng/searxng:latestOr use Docker Compose:
# docker-compose.searxng.yml
version: '3.8'
services:
searxng:
image: searxng/searxng:latest
container_name: searxng
ports:
- "8080:8080"
environment:
- SEARXNG_SECRET=your-secret-key
volumes:
- ./searxng:/etc/searxng:rw
restart: unless-stoppeddocker-compose -f docker-compose.searxng.yml up -dWeb Search Implementation
# web_search.py
import httpx
from pydantic import BaseModel
from config import get_settings
class WebSearchResult(BaseModel):
"""Result from web search."""
title: str
content: str
url: str
relevance_score: float
class WebSearchResults(BaseModel):
"""Collection of web search results."""
query: str
results: list[WebSearchResult]
search_successful: bool
class WebSearcher:
"""Web search fallback using SearXNG (self-hosted metasearch)."""
def __init__(self):
settings = get_settings()
self.base_url = settings.searxng_url
self.max_results = settings.web_search_max_results
def search(self, query: str) -> WebSearchResults:
"""
Perform web search using SearXNG.
SearXNG aggregates results from multiple search engines
without tracking, making it ideal for learning projects.
"""
try:
# SearXNG JSON API endpoint
response = httpx.get(
f"{self.base_url}/search",
params={
"q": query,
"format": "json",
"categories": "general",
"language": "en",
},
timeout=10.0
)
response.raise_for_status()
data = response.json()
results = []
for i, item in enumerate(data.get("results", [])[:self.max_results]):
results.append(WebSearchResult(
title=item.get("title", ""),
content=item.get("content", ""),
url=item.get("url", ""),
# SearXNG doesn't provide scores, use position-based ranking
relevance_score=1.0 - (i * 0.1)
))
return WebSearchResults(
query=query,
results=results,
search_successful=True
)
except httpx.ConnectError:
print("SearXNG not available. Start it with: docker run -p 8080:8080 searxng/searxng")
return WebSearchResults(
query=query,
results=[],
search_successful=False
)
except Exception as e:
print(f"Web search failed: {e}")
return WebSearchResults(
query=query,
results=[],
search_successful=False
)
class MockWebSearcher:
"""Mock web searcher for testing without SearXNG running."""
def search(self, query: str) -> WebSearchResults:
"""Return mock results for testing."""
return WebSearchResults(
query=query,
results=[
WebSearchResult(
title=f"Web result for: {query}",
content=f"This is simulated web content about {query}. "
"In production, this would be real SearXNG results.",
url="https://example.com/result",
relevance_score=0.8
)
],
search_successful=True
)Step 5: Corrective RAG Orchestration
# corrective_rag.py
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
from pydantic import BaseModel
from config import get_settings, RetrievalAction
from evaluator import RetrievalEvaluator, RetrievalEvaluation
from refiner import KnowledgeRefiner, RefinedKnowledge
from web_search import WebSearcher, WebSearchResults
class CRAGResponse(BaseModel):
"""Response from Corrective RAG."""
answer: str
action_taken: RetrievalAction
evaluation: RetrievalEvaluation
refined_knowledge: RefinedKnowledge | None
web_search_used: bool
web_results: WebSearchResults | None
sources: list[str]
confidence: float
class CorrectiveRAG:
"""Corrective RAG with retrieval evaluation and fallback."""
def __init__(self):
settings = get_settings()
# Initialize components
self.client = OpenAI(api_key=settings.openai_api_key)
self.llm_model = settings.llm_model
# ChromaDB
self.chroma = chromadb.PersistentClient(
path=settings.chroma_persist_dir
)
self.embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key=settings.openai_api_key,
model_name=settings.embedding_model
)
self.collection = self.chroma.get_or_create_collection(
name=settings.collection_name,
embedding_function=self.embedding_fn
)
# CRAG components
self.evaluator = RetrievalEvaluator()
self.refiner = KnowledgeRefiner()
self.web_searcher = WebSearcher()
self.settings = settings
def query(self, question: str) -> CRAGResponse:
"""
Process query with corrective retrieval.
Pipeline:
1. Retrieve documents
2. Evaluate retrieval quality
3. Take corrective action based on evaluation
4. Generate answer from corrected knowledge
"""
# Step 1: Retrieve
results = self.collection.query(
query_texts=[question],
n_results=self.settings.retrieval_k,
include=["documents", "metadatas", "distances"]
)
documents = [
{
"content": results["documents"][0][i],
"source": results["metadatas"][0][i].get("source", f"doc_{i}")
}
for i in range(len(results["documents"][0]))
]
# Step 2: Evaluate
evaluation = self.evaluator.evaluate(question, documents)
# Step 3: Take corrective action
refined_knowledge = None
web_results = None
context = ""
sources = []
if evaluation.action == RetrievalAction.CORRECT:
# Good retrieval - refine and use
refined_knowledge = self.refiner.refine(
question, documents, evaluation.document_evaluations
)
context = self._format_refined_knowledge(refined_knowledge)
sources = [documents[i]["source"] for i in refined_knowledge.source_documents]
elif evaluation.action == RetrievalAction.INCORRECT:
# Bad retrieval - use web search
web_results = self.web_searcher.search(question)
context = self._format_web_results(web_results)
sources = [r.url for r in web_results.results]
else: # AMBIGUOUS
# Mixed - combine both
refined_knowledge = self.refiner.refine(
question, documents, evaluation.document_evaluations
)
web_results = self.web_searcher.search(question)
context = "From knowledge base:\n"
context += self._format_refined_knowledge(refined_knowledge)
context += "\n\nFrom web search:\n"
context += self._format_web_results(web_results)
sources = [documents[i]["source"] for i in refined_knowledge.source_documents]
sources += [r.url for r in web_results.results]
# Step 4: Generate answer
answer = self._generate_answer(question, context)
return CRAGResponse(
answer=answer,
action_taken=evaluation.action,
evaluation=evaluation,
refined_knowledge=refined_knowledge,
web_search_used=web_results is not None,
web_results=web_results,
sources=sources,
confidence=evaluation.confidence
)
def _format_refined_knowledge(self, knowledge: RefinedKnowledge) -> str:
"""Format refined knowledge for generation."""
lines = ["Key facts:"]
for fact in knowledge.key_facts:
lines.append(f"- {fact}")
if knowledge.supporting_quotes:
lines.append("\nSupporting evidence:")
for quote in knowledge.supporting_quotes:
lines.append(f'- "{quote}"')
return "\n".join(lines)
def _format_web_results(self, results: WebSearchResults) -> str:
"""Format web search results for generation."""
if not results.results:
return "No web results found."
lines = []
for r in results.results:
lines.append(f"[{r.title}]")
lines.append(r.content)
lines.append("")
return "\n".join(lines)
def _generate_answer(self, question: str, context: str) -> str:
"""Generate answer from context."""
system_prompt = """Answer the question based on the provided context.
Be accurate and cite sources when possible.
If the context doesn't contain enough information, say so."""
response = self.client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
]
)
return response.choices[0].message.content
def add_documents(self, documents: list[str], sources: list[str]):
"""Add documents to the knowledge base."""
ids = [f"doc_{i}" for i in range(len(documents))]
self.collection.add(
documents=documents,
ids=ids,
metadatas=[{"source": src} for src in sources]
)Step 6: FastAPI Application
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
from corrective_rag import CorrectiveRAG, CRAGResponse
# Global
crag: CorrectiveRAG | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global crag
crag = CorrectiveRAG()
# Add sample documents (some relevant, some not)
sample_docs = [
"Python is a high-level programming language known for readability. It supports multiple paradigms including procedural, object-oriented, and functional programming.",
"Machine learning enables computers to learn from data without explicit programming. Common algorithms include decision trees, neural networks, and support vector machines.",
"The weather in Paris is generally mild. Summers are warm and winters are cool. The city receives moderate rainfall throughout the year.",
"RAG (Retrieval-Augmented Generation) combines retrieval with generation to ground LLM responses in external knowledge, reducing hallucination.",
"Coffee is a popular beverage made from roasted coffee beans. It contains caffeine which acts as a stimulant.",
"Vector databases store embeddings for similarity search. Popular options include Pinecone, Weaviate, Milvus, and ChromaDB.",
"The Eiffel Tower is a famous landmark in Paris, built in 1889. It stands 330 meters tall and attracts millions of visitors annually.",
"Fine-tuning adapts pre-trained language models to specific tasks. Techniques like LoRA make this more efficient by updating only low-rank matrices."
]
sources = [
"python_docs", "ml_intro", "paris_weather", "rag_overview",
"coffee_wiki", "vector_db_guide", "eiffel_tower", "finetuning_guide"
]
crag.add_documents(sample_docs, sources)
yield
crag = None
app = FastAPI(
title="Corrective RAG API",
description="Self-correcting RAG with retrieval evaluation and web search fallback",
lifespan=lifespan
)
class QueryRequest(BaseModel):
query: str
class DocumentsRequest(BaseModel):
documents: list[str]
sources: list[str]
@app.post("/query", response_model=CRAGResponse)
async def query(request: QueryRequest):
"""Query with Corrective RAG."""
if not crag:
raise HTTPException(status_code=503, detail="Service not initialized")
result = crag.query(request.query)
return result
@app.post("/documents")
async def add_documents(request: DocumentsRequest):
"""Add documents to the knowledge base."""
if not crag:
raise HTTPException(status_code=503, detail="Service not initialized")
if len(request.documents) != len(request.sources):
raise HTTPException(
status_code=400,
detail="Documents and sources must have same length"
)
crag.add_documents(request.documents, request.sources)
return {"status": "success", "documents_added": len(request.documents)}
@app.get("/health")
async def health():
return {"status": "healthy", "service": "corrective-rag"}Step 7: Requirements
# requirements.txt
openai>=1.12.0
chromadb>=0.4.22
httpx>=0.25.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
fastapi>=0.109.0
uvicorn>=0.27.0
python-dotenv>=1.0.0Usage Examples
Basic Usage
from corrective_rag import CorrectiveRAG
# Initialize
crag = CorrectiveRAG()
# Add documents
crag.add_documents(
documents=["Your content here..."],
sources=["source_name"]
)
# Query - system automatically evaluates and corrects
result = crag.query("What is RAG?")
print(f"Answer: {result.answer}")
print(f"Action taken: {result.action_taken}")
print(f"Confidence: {result.confidence:.2f}")
print(f"Web search used: {result.web_search_used}")
# Check evaluation details
print(f"\nEvaluation: {result.evaluation.reasoning}")
for doc_eval in result.evaluation.document_evaluations:
print(f" Doc {doc_eval.doc_index}: {doc_eval.relevance_score:.2f}")Handle Different Scenarios
# Good retrieval - uses refined knowledge
result = crag.query("Explain Python programming")
assert result.action_taken == "correct"
assert result.refined_knowledge is not None
# Bad retrieval - falls back to web search
result = crag.query("Latest news about quantum computing")
assert result.action_taken == "incorrect"
assert result.web_search_used == True
# Ambiguous - combines both sources
result = crag.query("How do vector databases work in RAG?")
if result.action_taken == "ambiguous":
print("Used both knowledge base and web search")API Usage
# Start server
uvicorn app:app --reload
# Query
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"query": "What is machine learning?"}'
# Response shows action taken
# {
# "answer": "...",
# "action_taken": "correct",
# "evaluation": {...},
# "web_search_used": false,
# ...
# }How CRAG Improves Robustness
┌─────────────────────────────────────────────────────────────────┐
│ RETRIEVAL EVALUATION │
│ │
│ ┌───────────┐ │
│ │ Evaluator │ │
│ └─────┬─────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ Score > 0.7 0.3 ≤ Score ≤ 0.7 Score < 0.3 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌───────────┐ ┌───────────┐ │
│ │ CORRECT │ │ AMBIGUOUS │ │ INCORRECT │ │
│ └────┬────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
└───────────┼─────────────────┼─────────────────┼─────────────────┘
│ │ │
┌───────────┼─────────────────┼─────────────────┼─────────────────┐
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Refine │ │ Both │ │ Web │ │
│ │ Knowledge│ │ Sources │ │ Search │ │
│ └────┬─────┘ └─────┬────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────────┼────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Generate │ │
│ └──────────────┘ │
│ │
│ CORRECTIVE ACTIONS │
└─────────────────────────────────────────────────────────────────┘| Scenario | Traditional RAG | Corrective RAG |
|---|---|---|
| Good retrieval | ✅ Works | ✅ Works (refined) |
| Bad retrieval | ❌ Hallucinates | ✅ Uses web search |
| Mixed quality | ⚠️ Noisy | ✅ Filters + augments |
Key Concepts
Decompose-Then-Recompose
┌─────────────────────────────────────────────────────────────────┐
│ DECOMPOSE-THEN-RECOMPOSE │
│ │
│ Document ───► Decompose ───► Filter ───► Recompose │
│ │ into Facts Relevant Knowledge │
│ │ │ Facts │ │
│ ▼ ▼ ▼ ▼ │
│ "Long text [Fact 1] [Fact 1] ✓ "Relevant │
│ with noise [Fact 2] [Fact 3] ✓ facts only" │
│ and extra [Fact 3] │
│ content..." [Fact 4] │
│ │
│ Removes noise while preserving key information │
└─────────────────────────────────────────────────────────────────┘This removes noise while preserving key information.
Confidence-Based Routing
| Confidence | Action | Rationale |
|---|---|---|
| > 0.7 | CORRECT | Trust retrieval, refine it |
| < 0.3 | INCORRECT | Don't trust, use web |
| 0.3-0.7 | AMBIGUOUS | Hedge with both sources |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Retrieval Evaluation | Score documents before using them | Catches bad retrieval before it causes hallucination |
| CORRECT Action | High confidence → refine and use | Trust good retrieval, just clean it up |
| INCORRECT Action | Low confidence → web search | Don't use garbage, get fresh data |
| AMBIGUOUS Action | Medium confidence → combine both | Hedge with multiple sources |
| Decompose-Then-Recompose | Extract facts → filter → reassemble | Removes noise, preserves signal |
| Web Search Fallback | SearXNG as backup knowledge source | Always have a plan B |
| Confidence Thresholds | 0.7 for correct, 0.3 for incorrect | Tunable based on your risk tolerance |
References
- Corrective RAG Paper (arxiv 2401.15884)
- SearXNG - Privacy-respecting metasearch engine
- Self-RAG for related self-correction techniques
Next Steps
- Add caching for repeated evaluations
- Implement streaming for real-time feedback
- Build evaluation metrics to measure CRAG improvement
- Explore Adaptive RAG for query complexity routing
- Try Speculative RAG for parallel generation