Corrective RAG
Build a self-correcting RAG system that evaluates retrieval quality and triggers corrective actions
Corrective RAG (CRAG)
TL;DR
Traditional RAG blindly trusts retrieval—if it retrieves junk, it generates junk. Corrective RAG evaluates retrieval quality first, then takes action: use refined knowledge if good, fall back to web search if bad, or combine both if uncertain. This prevents hallucination from irrelevant documents and makes your RAG system robust to retrieval failures.
| Property | Value |
|---|---|
| Difficulty | Intermediate |
| Time | ~5 hours |
| Code Size | ~450 LOC |
| Prerequisites | RAG with Reranking |
Tech Stack
| Technology | Purpose |
|---|---|
| OpenAI | GPT-4o-mini + Embeddings |
| ChromaDB | Vector database |
| SearXNG | Web search fallback (self-hosted) |
| Pydantic | Structured outputs |
| FastAPI | REST API |
Prerequisites
- Completed RAG with Reranking tutorial
- Python 3.10+
- OpenAI API key
- Docker (for SearXNG) - self-hosted metasearch engine for web search fallback
What You'll Learn
- Build a retrieval evaluator to assess document relevance
- Implement three corrective actions: Correct, Incorrect, Ambiguous
- Design knowledge refinement using decompose-then-recompose
- Integrate web search as a fallback for poor retrievals
- Create a robust RAG pipeline that handles retrieval failures
Why Correct?
Retrieval quality is unpredictable. Some queries get excellent documents from the vector store; others get noise. Traditional RAG treats all retrieved documents as equally trustworthy, so when retrieval fails, the LLM generates from irrelevant context and confidently presents hallucinated answers.
Corrective RAG adds a quality gate between retrieval and generation. By evaluating document relevance before passing context to the LLM, the system can take corrective action: refine good documents to remove noise, fall back to web search when local retrieval fails, or combine both when results are mixed.
Why a Quality Gate Matters
Without Correction
With Correction
RecommendedThis is especially valuable for production systems where retrieval failures are inevitable -- knowledge bases have gaps, queries drift outside document coverage, and embeddings are imperfect.
Research Foundation
This project implements the concepts from Corrective Retrieval Augmented Generation (CRAG, January 2024).
The Problem: When Retrieval Goes Wrong
Standard RAG assumes retrieved documents are relevant. But what happens when they're not?
| Scenario | Traditional RAG | Result |
|---|---|---|
| Good retrieval | Uses documents | ✅ Good answer |
| Irrelevant documents | Still uses them | ❌ Hallucination |
| Partially relevant | Uses all equally | ⚠️ Noisy answer |
Traditional RAG vs Corrective RAG
Traditional RAG
Corrective RAG
RecommendedCRAG's insight: Evaluate retrieval quality BEFORE generation, then take corrective action.
CRAG Corrective Actions
Correct (score > 0.7)
Refine Knowledge
Ambiguous (0.3-0.7)
Use Both Sources
Incorrect (score < 0.3)
Web Search Fallback
Project Structure
corrective-rag/
├── config.py # Configuration
├── retriever.py # Document retrieval
├── evaluator.py # Retrieval quality evaluation
├── refiner.py # Knowledge refinement
├── web_search.py # Web search fallback
├── corrective_rag.py # Main orchestration
├── app.py # FastAPI application
└── requirements.txtStep 1: Configuration
# config.py
from pydantic_settings import BaseSettings
from pydantic import Field
from functools import lru_cache
from enum import Enum
class RetrievalAction(str, Enum):
"""Actions based on retrieval evaluation."""
CORRECT = "correct" # Retrieval is good - refine and use
INCORRECT = "incorrect" # Retrieval is bad - use web search
AMBIGUOUS = "ambiguous" # Uncertain - combine both
class Settings(BaseSettings):
"""Application configuration."""
openai_api_key: str
searxng_url: str = "http://localhost:8080" # SearXNG instance URL
# Model settings
embedding_model: str = "text-embedding-3-small"
llm_model: str = "gpt-4o-mini"
evaluator_model: str = "gpt-4o-mini"
# Retrieval settings
retrieval_k: int = 5
# Evaluation thresholds
correct_threshold: float = 0.7 # Above = CORRECT
incorrect_threshold: float = 0.3 # Below = INCORRECT
# Between = AMBIGUOUS
# Refinement settings
max_sentences_per_doc: int = 5
# Web search settings
web_search_max_results: int = 3
# ChromaDB
chroma_persist_dir: str = "./chroma_db"
collection_name: str = "corrective_rag_docs"
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()Step 2: Retrieval Evaluator
The core of CRAG: assessing whether retrieved documents are relevant.
# evaluator.py
from openai import OpenAI
from pydantic import BaseModel, Field
from config import get_settings, RetrievalAction
class DocumentRelevance(BaseModel):
"""Relevance assessment for a single document."""
doc_index: int
is_relevant: bool
relevance_score: float = Field(ge=0, le=1)
key_information: list[str]
irrelevant_parts: list[str]
class RetrievalEvaluation(BaseModel):
"""Overall evaluation of retrieval quality."""
action: RetrievalAction
confidence: float = Field(ge=0, le=1)
overall_score: float = Field(ge=0, le=1)
document_evaluations: list[DocumentRelevance]
reasoning: str
class RetrievalEvaluator:
"""Evaluates retrieval quality and determines corrective action."""
def __init__(self):
settings = get_settings()
self.client = OpenAI(api_key=settings.openai_api_key)
self.model = settings.evaluator_model
self.correct_threshold = settings.correct_threshold
self.incorrect_threshold = settings.incorrect_threshold
def evaluate(
self,
query: str,
documents: list[dict]
) -> RetrievalEvaluation:
"""
Evaluate retrieval quality and determine action.
Args:
query: User query
documents: List of retrieved documents with 'content' and 'source'
Returns:
Evaluation with recommended action
"""
# Format documents for evaluation
docs_text = ""
for i, doc in enumerate(documents):
docs_text += f"\n[Document {i}]\n{doc['content']}\n"
system_prompt = """You are a retrieval quality evaluator. Assess whether
the retrieved documents are relevant and sufficient to answer the query.
For each document, determine:
1. Is it relevant to the query? (true/false)
2. Relevance score (0.0-1.0)
3. What key information does it contain for answering the query?
4. What parts are irrelevant or noise?
Then provide an overall assessment:
- overall_score: Average relevance (0.0-1.0)
- action: "correct" (good retrieval), "incorrect" (bad retrieval), or "ambiguous" (mixed)
- confidence: How confident you are in this assessment (0.0-1.0)
Return JSON:
{
"document_evaluations": [
{
"doc_index": 0,
"is_relevant": true,
"relevance_score": 0.85,
"key_information": ["fact1", "fact2"],
"irrelevant_parts": ["noise1"]
}
],
"overall_score": 0.75,
"action": "correct",
"confidence": 0.8,
"reasoning": "Why this action was chosen"
}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Query: {query}\n\nDocuments:{docs_text}"}
],
response_format={"type": "json_object"}
)
import json
result = json.loads(response.choices[0].message.content)
# Determine action based on thresholds
overall_score = result.get("overall_score", 0.5)
if overall_score >= self.correct_threshold:
action = RetrievalAction.CORRECT
elif overall_score <= self.incorrect_threshold:
action = RetrievalAction.INCORRECT
else:
action = RetrievalAction.AMBIGUOUS
return RetrievalEvaluation(
action=action,
confidence=result.get("confidence", 0.5),
overall_score=overall_score,
document_evaluations=[
DocumentRelevance(**doc_eval)
for doc_eval in result.get("document_evaluations", [])
],
reasoning=result.get("reasoning", "")
)
class LightweightEvaluator:
"""Fast rule-based evaluator for latency-sensitive applications."""
def __init__(self):
settings = get_settings()
self.correct_threshold = settings.correct_threshold
self.incorrect_threshold = settings.incorrect_threshold
def evaluate(
self,
query: str,
documents: list[dict],
distances: list[float]
) -> RetrievalEvaluation:
"""
Evaluate using retrieval distances as proxy for relevance.
Lower distance = higher relevance in vector search.
"""
# Convert distances to relevance scores
relevance_scores = [max(0, 1 - dist) for dist in distances]
overall_score = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0
# Determine action
if overall_score >= self.correct_threshold:
action = RetrievalAction.CORRECT
elif overall_score <= self.incorrect_threshold:
action = RetrievalAction.INCORRECT
else:
action = RetrievalAction.AMBIGUOUS
# Create document evaluations
doc_evals = [
DocumentRelevance(
doc_index=i,
is_relevant=score >= 0.5,
relevance_score=score,
key_information=[],
irrelevant_parts=[]
)
for i, score in enumerate(relevance_scores)
]
return RetrievalEvaluation(
action=action,
confidence=0.6, # Lower confidence for rule-based
overall_score=overall_score,
document_evaluations=doc_evals,
reasoning=f"Distance-based evaluation: avg score {overall_score:.2f}"
)Understanding the Evaluation Decision:
Evaluation Example: What is machine learning?
Doc 0: ML enables computers to learn from data...
Doc 1: Common algorithms include neural networks...
Doc 2: The weather in Paris is mild...
Overall Result
The Three Actions Explained:
| Action | Score Range | What Happens | Why |
|---|---|---|---|
| CORRECT | > 0.7 | Refine documents, use them | High confidence in retrieval |
| INCORRECT | < 0.3 | Web search fallback | Documents are noise |
| AMBIGUOUS | 0.3 - 0.7 | Combine refined docs + web | Hedge our bets |
LLM vs Lightweight Evaluator:
| Aspect | LLM Evaluator | Lightweight (Distance-Based) |
|---|---|---|
| Accuracy | High (understands semantics) | Medium (proxy metric) |
| Latency | ~500ms | ~1ms |
| Cost | Tokens | Free |
| Use when | Quality matters most | Latency matters most |
Step 3: Knowledge Refiner
The decompose-then-recompose algorithm to extract only relevant information.
# refiner.py
from openai import OpenAI
from pydantic import BaseModel
from config import get_settings
from evaluator import DocumentRelevance
class RefinedKnowledge(BaseModel):
"""Refined knowledge extracted from documents."""
key_facts: list[str]
supporting_quotes: list[str]
source_documents: list[int]
refinement_ratio: float # How much was filtered out
class KnowledgeRefiner:
"""
Refines retrieved documents using decompose-then-recompose.
This removes irrelevant information while preserving key knowledge.
"""
def __init__(self):
settings = get_settings()
self.client = OpenAI(api_key=settings.openai_api_key)
self.model = settings.llm_model
self.max_sentences = settings.max_sentences_per_doc
def refine(
self,
query: str,
documents: list[dict],
evaluations: list[DocumentRelevance]
) -> RefinedKnowledge:
"""
Refine documents by extracting only relevant information.
Steps:
1. Decompose each document into atomic facts
2. Filter facts by relevance to query
3. Recompose into coherent knowledge
"""
# Step 1 & 2: Decompose and filter
all_facts = []
all_quotes = []
source_docs = []
for doc, eval_result in zip(documents, evaluations):
if not eval_result.is_relevant:
continue
# Extract facts from this document
facts = self._extract_facts(query, doc['content'])
for fact in facts:
all_facts.append(fact)
source_docs.append(eval_result.doc_index)
# Keep key quotes
if eval_result.key_information:
all_quotes.extend(eval_result.key_information[:2])
# Step 3: Recompose - deduplicate and organize
unique_facts = list(dict.fromkeys(all_facts)) # Preserve order, remove dupes
# Calculate refinement ratio
original_length = sum(len(d['content']) for d in documents)
refined_length = sum(len(f) for f in unique_facts)
ratio = 1 - (refined_length / original_length) if original_length > 0 else 0
return RefinedKnowledge(
key_facts=unique_facts[:10], # Top 10 facts
supporting_quotes=all_quotes[:5],
source_documents=list(set(source_docs)),
refinement_ratio=ratio
)
def _extract_facts(self, query: str, content: str) -> list[str]:
"""Extract atomic facts relevant to the query."""
system_prompt = """Extract atomic facts from the document that are relevant
to answering the query. Each fact should be:
1. Self-contained (understandable without context)
2. Directly relevant to the query
3. Concise (one sentence)
Return JSON: {"facts": ["fact1", "fact2", ...]}
Only include facts that help answer the query. Omit background info, examples,
and tangential information."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Query: {query}\n\nDocument:\n{content}"}
],
response_format={"type": "json_object"}
)
import json
result = json.loads(response.choices[0].message.content)
return result.get("facts", [])[:self.max_sentences]Understanding Decompose-Then-Recompose:
Knowledge Refinement: What is machine learning?
Why This Works:
| Without Refinement | With Refinement |
|---|---|
| LLM sees "weather is sunny" | Only query-relevant facts |
| May get confused by noise | Focused context |
| Wastes tokens on irrelevant content | Token-efficient |
| Risk of hallucination from noise | Grounded in filtered facts |
Step 4: Web Search Fallback
When retrieval fails, fall back to web search using SearXNG.
Setting Up SearXNG
First, run SearXNG locally with Docker:
docker run -d --name searxng \
-p 8080:8080 \
-e SEARXNG_SECRET="your-secret-key" \
searxng/searxng:latestOr use Docker Compose:
# docker-compose.searxng.yml
version: '3.8'
services:
searxng:
image: searxng/searxng:latest
container_name: searxng
ports:
- "8080:8080"
environment:
- SEARXNG_SECRET=your-secret-key
volumes:
- ./searxng:/etc/searxng:rw
restart: unless-stoppeddocker-compose -f docker-compose.searxng.yml up -dWeb Search Implementation
# web_search.py
import httpx
from pydantic import BaseModel
from config import get_settings
class WebSearchResult(BaseModel):
"""Result from web search."""
title: str
content: str
url: str
relevance_score: float
class WebSearchResults(BaseModel):
"""Collection of web search results."""
query: str
results: list[WebSearchResult]
search_successful: bool
class WebSearcher:
"""Web search fallback using SearXNG (self-hosted metasearch)."""
def __init__(self):
settings = get_settings()
self.base_url = settings.searxng_url
self.max_results = settings.web_search_max_results
def search(self, query: str) -> WebSearchResults:
"""
Perform web search using SearXNG.
SearXNG aggregates results from multiple search engines
without tracking, making it ideal for learning projects.
"""
try:
# SearXNG JSON API endpoint
response = httpx.get(
f"{self.base_url}/search",
params={
"q": query,
"format": "json",
"categories": "general",
"language": "en",
},
timeout=10.0
)
response.raise_for_status()
data = response.json()
results = []
for i, item in enumerate(data.get("results", [])[:self.max_results]):
results.append(WebSearchResult(
title=item.get("title", ""),
content=item.get("content", ""),
url=item.get("url", ""),
# SearXNG doesn't provide scores, use position-based ranking
relevance_score=1.0 - (i * 0.1)
))
return WebSearchResults(
query=query,
results=results,
search_successful=True
)
except httpx.ConnectError:
print("SearXNG not available. Start it with: docker run -p 8080:8080 searxng/searxng")
return WebSearchResults(
query=query,
results=[],
search_successful=False
)
except Exception as e:
print(f"Web search failed: {e}")
return WebSearchResults(
query=query,
results=[],
search_successful=False
)
class MockWebSearcher:
"""Mock web searcher for testing without SearXNG running."""
def search(self, query: str) -> WebSearchResults:
"""Return mock results for testing."""
return WebSearchResults(
query=query,
results=[
WebSearchResult(
title=f"Web result for: {query}",
content=f"This is simulated web content about {query}. "
"In production, this would be real SearXNG results.",
url="https://example.com/result",
relevance_score=0.8
)
],
search_successful=True
)Understanding the Web Search Fallback:
SearXNG is a self-hosted metasearch engine that aggregates results from Google, Bing, DuckDuckGo, and others without tracking users. It is used here instead of commercial APIs for two reasons: it is free and self-hosted (no API key needed), and it respects privacy.
| Design Decision | Why |
|---|---|
httpx instead of requests | Supports async, timeout control, HTTP/2 |
Position-based relevance (1.0 - i * 0.1) | SearXNG does not provide relevance scores; rank position is a reasonable proxy |
MockWebSearcher for testing | Allows development and testing without Docker/SearXNG running |
Graceful ConnectError handling | Falls back cleanly if SearXNG is not running |
Step 5: Corrective RAG Orchestration
# corrective_rag.py
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
from pydantic import BaseModel
from config import get_settings, RetrievalAction
from evaluator import RetrievalEvaluator, RetrievalEvaluation
from refiner import KnowledgeRefiner, RefinedKnowledge
from web_search import WebSearcher, WebSearchResults
class CRAGResponse(BaseModel):
"""Response from Corrective RAG."""
answer: str
action_taken: RetrievalAction
evaluation: RetrievalEvaluation
refined_knowledge: RefinedKnowledge | None
web_search_used: bool
web_results: WebSearchResults | None
sources: list[str]
confidence: float
class CorrectiveRAG:
"""Corrective RAG with retrieval evaluation and fallback."""
def __init__(self):
settings = get_settings()
# Initialize components
self.client = OpenAI(api_key=settings.openai_api_key)
self.llm_model = settings.llm_model
# ChromaDB
self.chroma = chromadb.PersistentClient(
path=settings.chroma_persist_dir
)
self.embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key=settings.openai_api_key,
model_name=settings.embedding_model
)
self.collection = self.chroma.get_or_create_collection(
name=settings.collection_name,
embedding_function=self.embedding_fn
)
# CRAG components
self.evaluator = RetrievalEvaluator()
self.refiner = KnowledgeRefiner()
self.web_searcher = WebSearcher()
self.settings = settings
def query(self, question: str) -> CRAGResponse:
"""
Process query with corrective retrieval.
Pipeline:
1. Retrieve documents
2. Evaluate retrieval quality
3. Take corrective action based on evaluation
4. Generate answer from corrected knowledge
"""
# Step 1: Retrieve
results = self.collection.query(
query_texts=[question],
n_results=self.settings.retrieval_k,
include=["documents", "metadatas", "distances"]
)
documents = [
{
"content": results["documents"][0][i],
"source": results["metadatas"][0][i].get("source", f"doc_{i}")
}
for i in range(len(results["documents"][0]))
]
# Step 2: Evaluate
evaluation = self.evaluator.evaluate(question, documents)
# Step 3: Take corrective action
refined_knowledge = None
web_results = None
context = ""
sources = []
if evaluation.action == RetrievalAction.CORRECT:
# Good retrieval - refine and use
refined_knowledge = self.refiner.refine(
question, documents, evaluation.document_evaluations
)
context = self._format_refined_knowledge(refined_knowledge)
sources = [documents[i]["source"] for i in refined_knowledge.source_documents]
elif evaluation.action == RetrievalAction.INCORRECT:
# Bad retrieval - use web search
web_results = self.web_searcher.search(question)
context = self._format_web_results(web_results)
sources = [r.url for r in web_results.results]
else: # AMBIGUOUS
# Mixed - combine both
refined_knowledge = self.refiner.refine(
question, documents, evaluation.document_evaluations
)
web_results = self.web_searcher.search(question)
context = "From knowledge base:\n"
context += self._format_refined_knowledge(refined_knowledge)
context += "\n\nFrom web search:\n"
context += self._format_web_results(web_results)
sources = [documents[i]["source"] for i in refined_knowledge.source_documents]
sources += [r.url for r in web_results.results]
# Step 4: Generate answer
answer = self._generate_answer(question, context)
return CRAGResponse(
answer=answer,
action_taken=evaluation.action,
evaluation=evaluation,
refined_knowledge=refined_knowledge,
web_search_used=web_results is not None,
web_results=web_results,
sources=sources,
confidence=evaluation.confidence
)
def _format_refined_knowledge(self, knowledge: RefinedKnowledge) -> str:
"""Format refined knowledge for generation."""
lines = ["Key facts:"]
for fact in knowledge.key_facts:
lines.append(f"- {fact}")
if knowledge.supporting_quotes:
lines.append("\nSupporting evidence:")
for quote in knowledge.supporting_quotes:
lines.append(f'- "{quote}"')
return "\n".join(lines)
def _format_web_results(self, results: WebSearchResults) -> str:
"""Format web search results for generation."""
if not results.results:
return "No web results found."
lines = []
for r in results.results:
lines.append(f"[{r.title}]")
lines.append(r.content)
lines.append("")
return "\n".join(lines)
def _generate_answer(self, question: str, context: str) -> str:
"""Generate answer from context."""
system_prompt = """Answer the question based on the provided context.
Be accurate and cite sources when possible.
If the context doesn't contain enough information, say so."""
response = self.client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
]
)
return response.choices[0].message.content
def add_documents(self, documents: list[str], sources: list[str]):
"""Add documents to the knowledge base."""
ids = [f"doc_{i}" for i in range(len(documents))]
self.collection.add(
documents=documents,
ids=ids,
metadatas=[{"source": src} for src in sources]
)Understanding the CRAG Orchestration Flow:
The query() method implements the core CRAG algorithm in four steps. The key insight is the branching logic after evaluation:
| Step | What Happens | Why |
|---|---|---|
| 1. Retrieve | Standard vector search with retrieval_k docs | Same as traditional RAG up to this point |
| 2. Evaluate | LLM scores each document for relevance | The quality gate that traditional RAG lacks |
| 3. Correct | Branch based on score: refine, web search, or both | Right-sized response to retrieval quality |
| 4. Generate | LLM produces answer from corrected context | Generates from clean, validated knowledge |
The AMBIGUOUS path is particularly interesting: it combines refined local documents and web search results, giving the LLM access to both your knowledge base and current web information. This hedging strategy is robust because it does not rely on either source alone.
Step 6: FastAPI Application
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from contextlib import asynccontextmanager
from corrective_rag import CorrectiveRAG, CRAGResponse
# Global
crag: CorrectiveRAG | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global crag
crag = CorrectiveRAG()
# Add sample documents (some relevant, some not)
sample_docs = [
"Python is a high-level programming language known for readability. It supports multiple paradigms including procedural, object-oriented, and functional programming.",
"Machine learning enables computers to learn from data without explicit programming. Common algorithms include decision trees, neural networks, and support vector machines.",
"The weather in Paris is generally mild. Summers are warm and winters are cool. The city receives moderate rainfall throughout the year.",
"RAG (Retrieval-Augmented Generation) combines retrieval with generation to ground LLM responses in external knowledge, reducing hallucination.",
"Coffee is a popular beverage made from roasted coffee beans. It contains caffeine which acts as a stimulant.",
"Vector databases store embeddings for similarity search. Popular options include Pinecone, Weaviate, Milvus, and ChromaDB.",
"The Eiffel Tower is a famous landmark in Paris, built in 1889. It stands 330 meters tall and attracts millions of visitors annually.",
"Fine-tuning adapts pre-trained language models to specific tasks. Techniques like LoRA make this more efficient by updating only low-rank matrices."
]
sources = [
"python_docs", "ml_intro", "paris_weather", "rag_overview",
"coffee_wiki", "vector_db_guide", "eiffel_tower", "finetuning_guide"
]
crag.add_documents(sample_docs, sources)
yield
crag = None
app = FastAPI(
title="Corrective RAG API",
description="Self-correcting RAG with retrieval evaluation and web search fallback",
lifespan=lifespan
)
class QueryRequest(BaseModel):
query: str
class DocumentsRequest(BaseModel):
documents: list[str]
sources: list[str]
@app.post("/query", response_model=CRAGResponse)
async def query(request: QueryRequest):
"""Query with Corrective RAG."""
if not crag:
raise HTTPException(status_code=503, detail="Service not initialized")
result = crag.query(request.query)
return result
@app.post("/documents")
async def add_documents(request: DocumentsRequest):
"""Add documents to the knowledge base."""
if not crag:
raise HTTPException(status_code=503, detail="Service not initialized")
if len(request.documents) != len(request.sources):
raise HTTPException(
status_code=400,
detail="Documents and sources must have same length"
)
crag.add_documents(request.documents, request.sources)
return {"status": "success", "documents_added": len(request.documents)}
@app.get("/health")
async def health():
return {"status": "healthy", "service": "corrective-rag"}Step 7: Requirements
# requirements.txt
openai>=1.12.0
chromadb>=0.4.22
httpx>=0.25.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
fastapi>=0.109.0
uvicorn>=0.27.0
python-dotenv>=1.0.0Usage Examples
Basic Usage
from corrective_rag import CorrectiveRAG
# Initialize
crag = CorrectiveRAG()
# Add documents
crag.add_documents(
documents=["Your content here..."],
sources=["source_name"]
)
# Query - system automatically evaluates and corrects
result = crag.query("What is RAG?")
print(f"Answer: {result.answer}")
print(f"Action taken: {result.action_taken}")
print(f"Confidence: {result.confidence:.2f}")
print(f"Web search used: {result.web_search_used}")
# Check evaluation details
print(f"\nEvaluation: {result.evaluation.reasoning}")
for doc_eval in result.evaluation.document_evaluations:
print(f" Doc {doc_eval.doc_index}: {doc_eval.relevance_score:.2f}")Handle Different Scenarios
# Good retrieval - uses refined knowledge
result = crag.query("Explain Python programming")
assert result.action_taken == "correct"
assert result.refined_knowledge is not None
# Bad retrieval - falls back to web search
result = crag.query("Latest news about quantum computing")
assert result.action_taken == "incorrect"
assert result.web_search_used == True
# Ambiguous - combines both sources
result = crag.query("How do vector databases work in RAG?")
if result.action_taken == "ambiguous":
print("Used both knowledge base and web search")API Usage
# Start server
uvicorn app:app --reload
# Query
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"query": "What is machine learning?"}'
# Response shows action taken
# {
# "answer": "...",
# "action_taken": "correct",
# "evaluation": {...},
# "web_search_used": false,
# ...
# }How CRAG Improves Robustness
CRAG Retrieval Evaluation & Corrective Actions
CORRECT (Score > 0.7)
Refine Knowledge -- decompose, filter, recompose
AMBIGUOUS (0.3-0.7)
Both Sources -- use refined docs + web search
INCORRECT (Score < 0.3)
Web Search -- discard retrieval, search the web
| Scenario | Traditional RAG | Corrective RAG |
|---|---|---|
| Good retrieval | ✅ Works | ✅ Works (refined) |
| Bad retrieval | ❌ Hallucinates | ✅ Uses web search |
| Mixed quality | ⚠️ Noisy | ✅ Filters + augments |
Key Concepts
Decompose-Then-Recompose
Decompose-Then-Recompose
This removes noise while preserving key information.
Confidence-Based Routing
| Confidence | Action | Rationale |
|---|---|---|
| > 0.7 | CORRECT | Trust retrieval, refine it |
| < 0.3 | INCORRECT | Don't trust, use web |
| 0.3-0.7 | AMBIGUOUS | Hedge with both sources |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Retrieval Evaluation | Score documents before using them | Catches bad retrieval before it causes hallucination |
| CORRECT Action | High confidence → refine and use | Trust good retrieval, just clean it up |
| INCORRECT Action | Low confidence → web search | Don't use garbage, get fresh data |
| AMBIGUOUS Action | Medium confidence → combine both | Hedge with multiple sources |
| Decompose-Then-Recompose | Extract facts → filter → reassemble | Removes noise, preserves signal |
| Web Search Fallback | SearXNG as backup knowledge source | Always have a plan B |
| Confidence Thresholds | 0.7 for correct, 0.3 for incorrect | Tunable based on your risk tolerance |
References
- Corrective RAG Paper (arxiv 2401.15884)
- SearXNG - Privacy-respecting metasearch engine
- Self-RAG for related self-correction techniques
Next Steps
- Add caching for repeated evaluations
- Implement streaming for real-time feedback
- Build evaluation metrics to measure CRAG improvement
- Explore Adaptive RAG for query complexity routing
- Try Speculative RAG for parallel generation