Agentic RAG
Build self-correcting RAG systems with autonomous retrieval, query decomposition, and adaptive strategies
Agentic RAG
Build an intelligent RAG system that autonomously decides when to retrieve, how to decompose complex questions, and self-corrects when initial answers are insufficient.
| Property | Value |
|---|---|
| Difficulty | π΄ Advanced |
| Category | RAG |
| Time Estimate | ~5 days |
| Code Size | ~900 lines |
| Prerequisites | Intermediate RAG, AI Agents basics |
TL;DR
Agentic RAG transforms RAG from a fixed pipeline into an autonomous system that analyzes queries, decomposes complex questions, self-evaluates retrieval quality, and adaptively combines knowledge base results with web search. Uses LangGraph for workflow orchestration.
Tech Stack
| Component | Technology |
|---|---|
| Agent Framework | LangGraph |
| LLM | OpenAI GPT-4o |
| Vector Store | ChromaDB |
| Web Search | Serper (Google Search API) |
| Orchestration | LangChain |
| Framework | FastAPI |
Prerequisites
Before starting this project, you should:
- Complete the intermediate RAG projects
- Understand basic agent concepts (tools, reasoning, planning)
- Have experience with LangChain
- Know how to work with async Python
What You'll Learn
- Design self-correcting RAG architectures
- Implement query decomposition for complex questions
- Build retrieval quality assessment
- Create adaptive retrieval strategies
- Use LangGraph for agentic workflows
- Handle multi-step reasoning with tool use
Why Agentic RAG?
Traditional RAG is a fixed pipeline - retrieve once, generate once. But what happens when:
- The retrieval misses relevant documents?
- The question requires multiple pieces of information?
- The answer needs verification from external sources?
- The query is ambiguous and needs clarification?
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRADITIONAL RAG β
β β
β Query ββββΊ Retrieve ββββΊ Generate ββββΊ Answer β
β β
β (Fixed pipeline - no adaptation or self-correction) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AGENTIC RAG β
β β
β Query ββββΊ Analyze ββββ¬ββββΊ [Simple] ββββΊ Retrieve β
β β β β
β βββββΊ [Complex] ββββΊ Decompose ββββΊ Retrieve β
β β β
β βββββββββββββββββββββββ β
β βΌ β
β Evaluate β
β / \ β
β [Good] [Poor] β
β β β β
β βΌ βΌ β
β Generate ββββββ Reformulate ββββΊ (retry) β
β β β
β βΌ β
β Verify β
β / \ β
β [Correct] [Incomplete] β
β β β β
β βΌ βΌ β
β Answer ββββ Web Search β
β β
β (Self-correcting loop with multiple retrieval strategies) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββAgentic RAG capabilities:
- Self-Evaluation: Assess if retrieved content is sufficient
- Query Decomposition: Break complex questions into sub-queries
- Adaptive Retrieval: Choose retrieval strategy based on query type
- Self-Correction: Reformulate queries when results are poor
- Multi-Source: Combine knowledge base with web search
System Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AGENTIC RAG ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β QUERY PROCESSING β β
β β β β
β β User Query ββββΊ Query Analyzer ββββ¬βββΊ [Simple] ββββΊ Planner β β
β β β β β
β β ββββΊ [Complex] ββββΊ Decomposer β β
β β β β β
β β βΌ β β
β β Planner β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β AGENTIC CORE β β
β β β β
β β Planner ββββΊ Executor ββββΊ Evaluator ββββ¬βββΊ [Sufficient] β β
β β β² β β β β
β β β β ββββΊ [Insufficient] β β
β β ββββ Reformulator βββββββββββ β β β
β β β² β β β
β β βββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββ β
β β TOOL SUITE β β β
β β βΌ β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β
β β βVector Search β β Web Search β β Graph Query β β SQL Query β β β
β β β (ChromaDB) β β (Serper) β β (Neo4j) β β(PostgreSQL)β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RESPONSE GENERATION β β
β β β β
β β Answer Generator ββββΊ Verifier ββββ¬βββΊ [Valid] ββββΊ Synthesizer β β
β β β β β β β
β β β ββββΊ [Invalid] ββββΊ (retry) β β
β β β β β β
β β βββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β Final Answer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββProject Structure
agentic-rag/
βββ app/
β βββ __init__.py
β βββ main.py # FastAPI application
β βββ config.py # Configuration settings
β βββ models.py # Pydantic models
β βββ agents/
β β βββ __init__.py
β β βββ query_analyzer.py # Query analysis and routing
β β βββ decomposer.py # Query decomposition
β β βββ planner.py # Action planning
β β βββ evaluator.py # Result evaluation
β βββ tools/
β β βββ __init__.py
β β βββ retriever.py # Vector retrieval tool
β β βββ web_search.py # Web search tool
β β βββ sql_query.py # Database query tool
β βββ graph/
β β βββ __init__.py
β β βββ state.py # Agent state definition
β β βββ nodes.py # Graph nodes
β β βββ workflow.py # LangGraph workflow
β βββ generation/
β βββ __init__.py
β βββ generator.py # Answer generation
β βββ verifier.py # Answer verification
βββ tests/
β βββ __init__.py
β βββ test_agents.py
β βββ test_tools.py
β βββ test_workflow.py
βββ docker-compose.yml
βββ Dockerfile
βββ requirements.txt
βββ README.mdStep 1: Configuration and Models
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
from typing import Optional
class Settings(BaseSettings):
# API Keys
openai_api_key: str
serper_api_key: Optional[str] = None # For Google Search via Serper
# Model Settings
planning_model: str = "gpt-4o"
generation_model: str = "gpt-4o"
evaluation_model: str = "gpt-4o-mini"
# Retrieval Settings
chroma_persist_dir: str = "./chroma_agentic"
collection_name: str = "knowledge_base"
top_k: int = 5
relevance_threshold: float = 0.7
# Agent Settings
max_iterations: int = 5
max_decomposition_depth: int = 3
enable_web_search: bool = True
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
return Settings()# app/models.py
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Literal
from enum import Enum
from datetime import datetime
class QueryType(str, Enum):
SIMPLE = "simple" # Direct factual question
COMPLEX = "complex" # Multi-part question
COMPARATIVE = "comparative" # Comparing entities
TEMPORAL = "temporal" # Time-based question
AGGREGATION = "aggregation" # Summarization/counting
class RetrievalStrategy(str, Enum):
VECTOR = "vector" # Standard vector search
HYBRID = "hybrid" # Vector + keyword
WEB = "web" # Web search
MULTI = "multi" # Multiple sources
class QueryAnalysis(BaseModel):
"""Analysis of user query"""
original_query: str
query_type: QueryType
complexity_score: float = Field(ge=0, le=1)
requires_decomposition: bool
suggested_strategy: RetrievalStrategy
key_entities: List[str]
time_constraints: Optional[str] = None
class SubQuery(BaseModel):
"""Decomposed sub-query"""
id: str
query: str
parent_id: Optional[str] = None
depends_on: List[str] = Field(default_factory=list)
priority: int = 1
status: Literal["pending", "completed", "failed"] = "pending"
answer: Optional[str] = None
class RetrievalResult(BaseModel):
"""Result from retrieval tool"""
query: str
documents: List[Dict[str, Any]]
source: str # vector, web, sql, etc.
relevance_scores: List[float]
metadata: Dict[str, Any] = Field(default_factory=dict)
class EvaluationResult(BaseModel):
"""Evaluation of retrieval quality"""
is_sufficient: bool
confidence: float = Field(ge=0, le=1)
missing_information: List[str]
suggested_actions: List[str]
reasoning: str
class AgentAction(BaseModel):
"""Action to be executed by agent"""
tool: str
tool_input: Dict[str, Any]
reasoning: str
class AgentState(BaseModel):
"""State maintained by the agent"""
query: str
query_analysis: Optional[QueryAnalysis] = None
sub_queries: List[SubQuery] = Field(default_factory=list)
retrieval_results: List[RetrievalResult] = Field(default_factory=list)
evaluations: List[EvaluationResult] = Field(default_factory=list)
current_answer: Optional[str] = None
is_complete: bool = False
iteration: int = 0
errors: List[str] = Field(default_factory=list)
class QueryRequest(BaseModel):
"""API request model"""
query: str
max_iterations: int = 5
enable_web_search: bool = True
stream: bool = False
class QueryResponse(BaseModel):
"""API response model"""
query: str
answer: str
sources: List[Dict[str, Any]]
sub_queries: List[Dict[str, str]]
iterations: int
reasoning_trace: List[str]
processing_time_ms: floatStep 2: Query Analysis and Decomposition
# app/agents/query_analyzer.py
import openai
from typing import Dict, Any
import json
import logging
from ..models import QueryAnalysis, QueryType, RetrievalStrategy
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class QueryAnalyzer:
"""Analyze queries to determine processing strategy"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
def analyze(self, query: str) -> QueryAnalysis:
"""Analyze query to determine type and strategy"""
prompt = f"""Analyze this query and determine how to process it:
Query: "{query}"
Respond in JSON format:
{{
"query_type": "simple|complex|comparative|temporal|aggregation",
"complexity_score": 0.0-1.0,
"requires_decomposition": true/false,
"suggested_strategy": "vector|hybrid|web|multi",
"key_entities": ["entity1", "entity2"],
"time_constraints": "any time-related constraints or null",
"reasoning": "Brief explanation of your analysis"
}}
Guidelines:
- simple: Direct factual questions ("What is X?")
- complex: Multi-part or multi-step questions
- comparative: Questions comparing entities
- temporal: Questions about time periods or sequences
- aggregation: Questions requiring summarization
Strategies:
- vector: Standard knowledge base search
- hybrid: Knowledge base + keyword matching
- web: Current/real-time information needed
- multi: Multiple sources needed for complete answer"""
response = self.client.chat.completions.create(
model=settings.evaluation_model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=500
)
result = json.loads(response.choices[0].message.content)
return QueryAnalysis(
original_query=query,
query_type=QueryType(result["query_type"]),
complexity_score=result["complexity_score"],
requires_decomposition=result["requires_decomposition"],
suggested_strategy=RetrievalStrategy(result["suggested_strategy"]),
key_entities=result.get("key_entities", []),
time_constraints=result.get("time_constraints")
)# app/agents/decomposer.py
import openai
from typing import List
import json
import uuid
import logging
from ..models import SubQuery, QueryAnalysis
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class QueryDecomposer:
"""Decompose complex queries into sub-queries"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
def decompose(
self,
query: str,
analysis: QueryAnalysis,
max_sub_queries: int = 5
) -> List[SubQuery]:
"""Break down complex query into manageable sub-queries"""
if not analysis.requires_decomposition:
return [SubQuery(
id=str(uuid.uuid4()),
query=query,
priority=1
)]
prompt = f"""Decompose this complex query into simpler sub-queries:
Original Query: "{query}"
Analysis:
- Type: {analysis.query_type.value}
- Key Entities: {analysis.key_entities}
- Time Constraints: {analysis.time_constraints or 'None'}
Create {max_sub_queries} or fewer sub-queries that together answer the original.
Respond in JSON format:
{{
"sub_queries": [
{{
"query": "Sub-question 1",
"depends_on": [], // IDs of queries this depends on
"priority": 1 // Lower = higher priority
}},
...
],
"reasoning": "Explanation of decomposition strategy"
}}
Guidelines:
- Make sub-queries independent when possible
- Order dependencies correctly
- Each sub-query should be answerable with single retrieval
- Prioritize foundational questions first"""
response = self.client.chat.completions.create(
model=settings.planning_model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=800
)
result = json.loads(response.choices[0].message.content)
sub_queries = []
id_map = {} # Map index to generated ID
for i, sq in enumerate(result.get("sub_queries", [])):
sq_id = str(uuid.uuid4())
id_map[i] = sq_id
# Map dependency indices to IDs
depends_on = []
for dep_idx in sq.get("depends_on", []):
if isinstance(dep_idx, int) and dep_idx in id_map:
depends_on.append(id_map[dep_idx])
sub_queries.append(SubQuery(
id=sq_id,
query=sq["query"],
depends_on=depends_on,
priority=sq.get("priority", i + 1)
))
# Sort by priority
sub_queries.sort(key=lambda x: x.priority)
return sub_queries
def merge_answers(
self,
original_query: str,
sub_queries: List[SubQuery]
) -> str:
"""Merge sub-query answers into final answer"""
answered = [sq for sq in sub_queries if sq.answer]
if not answered:
return "Unable to find relevant information."
sub_answers = "\n".join([
f"Q: {sq.query}\nA: {sq.answer}"
for sq in answered
])
prompt = f"""Synthesize these sub-answers into a comprehensive response:
Original Question: "{original_query}"
Sub-Questions and Answers:
{sub_answers}
Create a coherent, well-structured answer that:
1. Addresses the original question directly
2. Integrates all relevant sub-answers
3. Maintains logical flow
4. Avoids redundancy"""
response = self.client.chat.completions.create(
model=settings.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1000
)
return response.choices[0].message.contentStep 3: Retrieval Tools
# app/tools/retriever.py
import chromadb
from chromadb.config import Settings as ChromaSettings
import openai
from typing import List, Dict, Any, Optional
import logging
from ..models import RetrievalResult
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class VectorRetriever:
"""Vector-based document retrieval"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
self.chroma_client = chromadb.Client(ChromaSettings(
chroma_db_impl="duckdb+parquet",
persist_directory=settings.chroma_persist_dir,
anonymized_telemetry=False
))
self.collection = self.chroma_client.get_or_create_collection(
name=settings.collection_name,
metadata={"hnsw:space": "cosine"}
)
def retrieve(
self,
query: str,
top_k: int = None,
filters: Optional[Dict[str, Any]] = None
) -> RetrievalResult:
"""Retrieve relevant documents"""
top_k = top_k or settings.top_k
# Get embedding
embedding_response = self.client.embeddings.create(
model="text-embedding-3-small",
input=query
)
query_embedding = embedding_response.data[0].embedding
# Search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=filters,
include=["documents", "metadatas", "distances"]
)
documents = []
scores = []
for i, doc_id in enumerate(results["ids"][0]):
doc = {
"id": doc_id,
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i]
}
documents.append(doc)
scores.append(1 - results["distances"][0][i])
return RetrievalResult(
query=query,
documents=documents,
source="vector",
relevance_scores=scores,
metadata={"top_k": top_k}
)
def add_documents(
self,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> int:
"""Add documents to the knowledge base"""
if not ids:
ids = [f"doc_{i}" for i in range(len(documents))]
embeddings = []
for doc in documents:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=doc
)
embeddings.append(response.data[0].embedding)
self.collection.add(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas or [{}] * len(documents)
)
return len(documents)# app/tools/web_search.py
import httpx
from typing import List, Dict, Any, Optional
import logging
from ..models import RetrievalResult
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class WebSearchTool:
"""Web search using Serper API (Google Search)"""
def __init__(self):
self.api_key = settings.serper_api_key
self.base_url = "https://google.serper.dev"
async def search(
self,
query: str,
max_results: int = 5,
search_type: str = "search"
) -> RetrievalResult:
"""
Search Google via Serper API.
Serper provides high-quality Google search results,
ideal for production agentic RAG systems.
"""
if not self.api_key:
return RetrievalResult(
query=query,
documents=[],
source="web",
relevance_scores=[],
metadata={"error": "Serper API key not configured"}
)
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/{search_type}",
headers={
"X-API-KEY": self.api_key,
"Content-Type": "application/json"
},
json={
"q": query,
"num": max_results
},
timeout=30.0
)
if response.status_code != 200:
logger.error(f"Web search failed: {response.text}")
return RetrievalResult(
query=query,
documents=[],
source="web",
relevance_scores=[],
metadata={"error": response.text}
)
data = response.json()
documents = []
scores = []
# Process organic search results
for i, result in enumerate(data.get("organic", [])[:max_results]):
documents.append({
"id": result.get("link", ""),
"content": result.get("snippet", ""),
"metadata": {
"title": result.get("title", ""),
"url": result.get("link", ""),
"position": result.get("position", i + 1)
}
})
# Position-based relevance score (higher position = higher score)
scores.append(1.0 - (i * 0.1))
return RetrievalResult(
query=query,
documents=documents,
source="web",
relevance_scores=scores,
metadata={
"answer_box": data.get("answerBox"),
"knowledge_graph": data.get("knowledgeGraph"),
"related_searches": data.get("relatedSearches", [])
}
)Step 4: Evaluation and Self-Correction
# app/agents/evaluator.py
import openai
from typing import List
import json
import logging
from ..models import RetrievalResult, EvaluationResult
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class RetrievalEvaluator:
"""Evaluate retrieval quality and suggest improvements"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
def evaluate(
self,
query: str,
results: List[RetrievalResult]
) -> EvaluationResult:
"""Evaluate if retrieved content is sufficient"""
# Combine all retrieved content
all_content = []
for result in results:
for doc in result.documents:
all_content.append({
"source": result.source,
"content": doc.get("content", "")[:500], # Truncate
"score": doc.get("metadata", {}).get("score", 0)
})
prompt = f"""Evaluate if this retrieved content can answer the query:
Query: "{query}"
Retrieved Content:
{json.dumps(all_content, indent=2)}
Evaluate and respond in JSON format:
{{
"is_sufficient": true/false,
"confidence": 0.0-1.0,
"missing_information": ["What's missing if not sufficient"],
"suggested_actions": ["What to do next"],
"reasoning": "Explanation of evaluation"
}}
Guidelines:
- is_sufficient: Can the query be fully answered?
- confidence: How confident in the answer quality?
- missing_information: Specific gaps in the content
- suggested_actions: Concrete next steps (reformulate, web search, etc.)"""
response = self.client.chat.completions.create(
model=settings.evaluation_model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=500
)
result = json.loads(response.choices[0].message.content)
return EvaluationResult(
is_sufficient=result["is_sufficient"],
confidence=result["confidence"],
missing_information=result.get("missing_information", []),
suggested_actions=result.get("suggested_actions", []),
reasoning=result["reasoning"]
)
def should_retry(self, evaluation: EvaluationResult) -> bool:
"""Determine if retrieval should be retried"""
return (
not evaluation.is_sufficient and
evaluation.confidence < settings.relevance_threshold
)
class QueryReformulator:
"""Reformulate queries for better retrieval"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
def reformulate(
self,
original_query: str,
evaluation: EvaluationResult,
previous_queries: List[str]
) -> str:
"""Generate improved query based on evaluation feedback"""
prompt = f"""Reformulate this query to get better retrieval results:
Original Query: "{original_query}"
Evaluation:
- Missing Information: {evaluation.missing_information}
- Reasoning: {evaluation.reasoning}
Previous Query Attempts:
{json.dumps(previous_queries)}
Create an improved query that:
1. Addresses the missing information
2. Uses different keywords/phrasing
3. Is more specific or broader as needed
4. Avoids previous unsuccessful approaches
Return ONLY the reformulated query, nothing else."""
response = self.client.chat.completions.create(
model=settings.evaluation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=200
)
return response.choices[0].message.content.strip()Step 5: LangGraph Workflow
# app/graph/state.py
from typing import TypedDict, List, Annotated, Optional
from operator import add
from ..models import (
QueryAnalysis, SubQuery, RetrievalResult,
EvaluationResult
)
class AgentGraphState(TypedDict):
"""State for the agentic RAG workflow"""
# Input
query: str
# Analysis
query_analysis: Optional[QueryAnalysis]
# Decomposition
sub_queries: List[SubQuery]
current_sub_query_idx: int
# Retrieval
retrieval_results: Annotated[List[RetrievalResult], add]
query_history: Annotated[List[str], add]
# Evaluation
evaluation: Optional[EvaluationResult]
# Generation
current_answer: Optional[str]
final_answer: Optional[str]
# Control
iteration: int
max_iterations: int
is_complete: bool
reasoning_trace: Annotated[List[str], add]
errors: Annotated[List[str], add]# app/graph/nodes.py
from typing import Dict, Any
import logging
from .state import AgentGraphState
from ..agents.query_analyzer import QueryAnalyzer
from ..agents.decomposer import QueryDecomposer
from ..agents.evaluator import RetrievalEvaluator, QueryReformulator
from ..tools.retriever import VectorRetriever
from ..tools.web_search import WebSearchTool
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
# Initialize components
query_analyzer = QueryAnalyzer()
decomposer = QueryDecomposer()
evaluator = RetrievalEvaluator()
reformulator = QueryReformulator()
retriever = VectorRetriever()
web_search = WebSearchTool()
async def analyze_query(state: AgentGraphState) -> Dict[str, Any]:
"""Analyze the input query"""
logger.info(f"Analyzing query: {state['query']}")
analysis = query_analyzer.analyze(state["query"])
return {
"query_analysis": analysis,
"reasoning_trace": [
f"Query type: {analysis.query_type.value}, "
f"Strategy: {analysis.suggested_strategy.value}"
]
}
async def decompose_query(state: AgentGraphState) -> Dict[str, Any]:
"""Decompose complex query into sub-queries"""
analysis = state["query_analysis"]
if not analysis.requires_decomposition:
return {
"sub_queries": [],
"reasoning_trace": ["Query is simple, no decomposition needed"]
}
sub_queries = decomposer.decompose(state["query"], analysis)
return {
"sub_queries": sub_queries,
"current_sub_query_idx": 0,
"reasoning_trace": [
f"Decomposed into {len(sub_queries)} sub-queries"
]
}
async def retrieve_documents(state: AgentGraphState) -> Dict[str, Any]:
"""Execute retrieval based on current strategy"""
analysis = state["query_analysis"]
sub_queries = state.get("sub_queries", [])
# Determine which query to use
if sub_queries:
idx = state.get("current_sub_query_idx", 0)
if idx < len(sub_queries):
current_query = sub_queries[idx].query
else:
current_query = state["query"]
else:
current_query = state["query"]
results = []
# Vector retrieval
vector_result = retriever.retrieve(current_query)
results.append(vector_result)
# Web search if needed
if (analysis.suggested_strategy.value in ["web", "multi"] and
settings.enable_web_search):
web_result = await web_search.search(current_query)
results.append(web_result)
return {
"retrieval_results": results,
"query_history": [current_query],
"reasoning_trace": [
f"Retrieved from {len(results)} sources for: {current_query}"
]
}
async def evaluate_results(state: AgentGraphState) -> Dict[str, Any]:
"""Evaluate retrieval quality"""
sub_queries = state.get("sub_queries", [])
# Get current query
if sub_queries:
idx = state.get("current_sub_query_idx", 0)
if idx < len(sub_queries):
current_query = sub_queries[idx].query
else:
current_query = state["query"]
else:
current_query = state["query"]
evaluation = evaluator.evaluate(
current_query,
state["retrieval_results"]
)
return {
"evaluation": evaluation,
"reasoning_trace": [
f"Evaluation: sufficient={evaluation.is_sufficient}, "
f"confidence={evaluation.confidence:.2f}"
]
}
async def reformulate_query(state: AgentGraphState) -> Dict[str, Any]:
"""Reformulate query for better retrieval"""
sub_queries = state.get("sub_queries", [])
if sub_queries:
idx = state.get("current_sub_query_idx", 0)
if idx < len(sub_queries):
original = sub_queries[idx].query
else:
original = state["query"]
else:
original = state["query"]
new_query = reformulator.reformulate(
original,
state["evaluation"],
state.get("query_history", [])
)
# Update sub-query if applicable
if sub_queries and idx < len(sub_queries):
sub_queries[idx].query = new_query
return {
"sub_queries": sub_queries,
"reasoning_trace": [f"Reformulated to: {new_query}"],
"iteration": state["iteration"] + 1
}
async def generate_answer(state: AgentGraphState) -> Dict[str, Any]:
"""Generate answer from retrieved content"""
from ..generation.generator import AnswerGenerator
generator = AnswerGenerator()
sub_queries = state.get("sub_queries", [])
if sub_queries:
# Update current sub-query answer
idx = state.get("current_sub_query_idx", 0)
if idx < len(sub_queries):
answer = generator.generate(
sub_queries[idx].query,
state["retrieval_results"]
)
sub_queries[idx].answer = answer
sub_queries[idx].status = "completed"
# Move to next sub-query
return {
"sub_queries": sub_queries,
"current_sub_query_idx": idx + 1,
"reasoning_trace": [f"Answered sub-query {idx + 1}"]
}
else:
# Generate final answer directly
answer = generator.generate(
state["query"],
state["retrieval_results"]
)
return {
"current_answer": answer,
"reasoning_trace": ["Generated direct answer"]
}
async def synthesize_answer(state: AgentGraphState) -> Dict[str, Any]:
"""Synthesize final answer from sub-answers"""
sub_queries = state.get("sub_queries", [])
if sub_queries:
final = decomposer.merge_answers(state["query"], sub_queries)
else:
final = state.get("current_answer", "")
return {
"final_answer": final,
"is_complete": True,
"reasoning_trace": ["Synthesized final answer"]
}
def should_continue(state: AgentGraphState) -> str:
"""Determine next step in workflow"""
# Check iteration limit
if state["iteration"] >= state["max_iterations"]:
return "synthesize"
# Check if evaluation passed
if state.get("evaluation") and state["evaluation"].is_sufficient:
# Check if more sub-queries to process
sub_queries = state.get("sub_queries", [])
idx = state.get("current_sub_query_idx", 0)
if sub_queries and idx < len(sub_queries):
return "retrieve" # Process next sub-query
return "synthesize"
# Need to reformulate
return "reformulate"
def needs_decomposition(state: AgentGraphState) -> str:
"""Check if query needs decomposition"""
analysis = state.get("query_analysis")
if analysis and analysis.requires_decomposition:
return "decompose"
return "retrieve"# app/graph/workflow.py
from langgraph.graph import StateGraph, END
from .state import AgentGraphState
from .nodes import (
analyze_query, decompose_query, retrieve_documents,
evaluate_results, reformulate_query, generate_answer,
synthesize_answer, should_continue, needs_decomposition
)
def create_workflow() -> StateGraph:
"""Create the agentic RAG workflow"""
workflow = StateGraph(AgentGraphState)
# Add nodes
workflow.add_node("analyze", analyze_query)
workflow.add_node("decompose", decompose_query)
workflow.add_node("retrieve", retrieve_documents)
workflow.add_node("evaluate", evaluate_results)
workflow.add_node("reformulate", reformulate_query)
workflow.add_node("generate", generate_answer)
workflow.add_node("synthesize", synthesize_answer)
# Set entry point
workflow.set_entry_point("analyze")
# Add edges
workflow.add_conditional_edges(
"analyze",
needs_decomposition,
{
"decompose": "decompose",
"retrieve": "retrieve"
}
)
workflow.add_edge("decompose", "retrieve")
workflow.add_edge("retrieve", "evaluate")
workflow.add_conditional_edges(
"evaluate",
should_continue,
{
"synthesize": "generate",
"reformulate": "reformulate",
"retrieve": "retrieve"
}
)
workflow.add_edge("reformulate", "retrieve")
workflow.add_edge("generate", "synthesize")
workflow.add_edge("synthesize", END)
return workflow.compile()
# Create global workflow instance
agentic_workflow = create_workflow()Step 6: Answer Generation
# app/generation/generator.py
import openai
from typing import List
import logging
from ..models import RetrievalResult
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class AnswerGenerator:
"""Generate answers from retrieved content"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
def generate(
self,
query: str,
results: List[RetrievalResult]
) -> str:
"""Generate answer from retrieval results"""
# Combine relevant content
context_parts = []
for result in results:
for doc in result.documents:
content = doc.get("content", "")
source = result.source
context_parts.append(f"[Source: {source}]\n{content}")
context = "\n\n".join(context_parts[:10]) # Limit context
prompt = f"""Answer this question based on the provided context:
Question: {query}
Context:
{context}
Instructions:
1. Answer directly and concisely
2. Use information only from the provided context
3. If the context doesn't contain enough information, say so
4. Cite sources when relevant using [Source: type] format
5. If there's conflicting information, note the discrepancy"""
response = self.client.chat.completions.create(
model=settings.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1000
)
return response.choices[0].message.content# app/generation/verifier.py
import openai
from typing import Tuple
import json
import logging
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class AnswerVerifier:
"""Verify answer quality and accuracy"""
def __init__(self):
self.client = openai.OpenAI(api_key=settings.openai_api_key)
def verify(
self,
query: str,
answer: str,
context: str
) -> Tuple[bool, str]:
"""Verify if answer is accurate and complete"""
prompt = f"""Verify if this answer correctly addresses the question:
Question: {query}
Answer: {answer}
Source Context:
{context[:2000]}
Evaluate and respond in JSON format:
{{
"is_valid": true/false,
"completeness": 0.0-1.0,
"accuracy": 0.0-1.0,
"issues": ["List any issues found"],
"suggestion": "How to improve if not valid"
}}"""
response = self.client.chat.completions.create(
model=settings.evaluation_model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
max_tokens=500
)
result = json.loads(response.choices[0].message.content)
is_valid = (
result["is_valid"] and
result["completeness"] > 0.7 and
result["accuracy"] > 0.8
)
return is_valid, result.get("suggestion", "")Step 7: FastAPI Application
# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import time
import logging
import json
from typing import AsyncGenerator
from .config import get_settings
from .models import QueryRequest, QueryResponse
from .graph.workflow import agentic_workflow
from .graph.state import AgentGraphState
from .tools.retriever import VectorRetriever
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
settings = get_settings()
retriever: VectorRetriever
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize components on startup"""
global retriever
logger.info("Initializing Agentic RAG components...")
retriever = VectorRetriever()
logger.info("Components initialized successfully")
yield
logger.info("Shutting down...")
app = FastAPI(
title="Agentic RAG API",
description="Self-correcting RAG with autonomous retrieval",
version="1.0.0",
lifespan=lifespan
)
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""Process a query using agentic RAG"""
start_time = time.time()
# Initialize state
initial_state: AgentGraphState = {
"query": request.query,
"query_analysis": None,
"sub_queries": [],
"current_sub_query_idx": 0,
"retrieval_results": [],
"query_history": [],
"evaluation": None,
"current_answer": None,
"final_answer": None,
"iteration": 0,
"max_iterations": request.max_iterations,
"is_complete": False,
"reasoning_trace": [],
"errors": []
}
try:
# Run workflow
final_state = await agentic_workflow.ainvoke(initial_state)
processing_time = (time.time() - start_time) * 1000
# Format sources
sources = []
for result in final_state.get("retrieval_results", []):
for doc in result.documents[:3]:
sources.append({
"source": result.source,
"content": doc.get("content", "")[:200],
"metadata": doc.get("metadata", {})
})
# Format sub-queries
sub_query_info = []
for sq in final_state.get("sub_queries", []):
sub_query_info.append({
"query": sq.query,
"answer": sq.answer or "Not answered",
"status": sq.status
})
return QueryResponse(
query=request.query,
answer=final_state.get("final_answer", "Unable to generate answer"),
sources=sources,
sub_queries=sub_query_info,
iterations=final_state.get("iteration", 0),
reasoning_trace=final_state.get("reasoning_trace", []),
processing_time_ms=round(processing_time, 2)
)
except Exception as e:
logger.error(f"Query processing failed: {e}")
raise HTTPException(500, str(e))
@app.post("/query/stream")
async def query_stream(request: QueryRequest):
"""Stream query processing with reasoning steps"""
async def generate() -> AsyncGenerator[str, None]:
initial_state: AgentGraphState = {
"query": request.query,
"query_analysis": None,
"sub_queries": [],
"current_sub_query_idx": 0,
"retrieval_results": [],
"query_history": [],
"evaluation": None,
"current_answer": None,
"final_answer": None,
"iteration": 0,
"max_iterations": request.max_iterations,
"is_complete": False,
"reasoning_trace": [],
"errors": []
}
async for event in agentic_workflow.astream(initial_state):
for key, value in event.items():
if "reasoning_trace" in value:
for trace in value["reasoning_trace"]:
yield f"data: {json.dumps({'step': key, 'trace': trace})}\n\n"
if "final_answer" in value and value["final_answer"]:
yield f"data: {json.dumps({'answer': value['final_answer']})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.post("/ingest")
async def ingest_documents(documents: list[str]):
"""Add documents to the knowledge base"""
count = retriever.add_documents(documents)
return {"status": "success", "documents_added": count}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}Step 8: Docker Setup
# docker-compose.yml
version: '3.8'
services:
agentic-rag:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- SERPER_API_KEY=${SERPER_API_KEY:-}
- CHROMA_PERSIST_DIR=/app/data/chroma
volumes:
- ./data:/app/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
volumes:
chroma_data:# Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./app/
RUN mkdir -p /app/data/chroma
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]# requirements.txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.3
pydantic-settings==2.1.0
# AI/ML
openai==1.12.0
langchain==0.1.5
langchain-openai==0.0.5
langgraph==0.0.26
# Vector Store
chromadb==0.4.22
# Web Search
httpx==0.26.0
# Utilities
python-dotenv==1.0.0Running the Application
1. Environment Setup
cat > .env << EOF
OPENAI_API_KEY=your-openai-key
SERPER_API_KEY=your-serper-key # Optional for web search (https://serper.dev)
EOF2. Start the Service
# With Docker
docker-compose up --build
# Or locally
pip install -r requirements.txt
uvicorn app.main:app --reload3. Test the System
# Add documents to knowledge base
curl -X POST "http://localhost:8000/ingest" \
-H "Content-Type: application/json" \
-d '["Document 1 content...", "Document 2 content..."]'
# Simple query
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{"query": "What is machine learning?"}'
# Complex query with decomposition
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{
"query": "Compare the performance of transformers vs RNNs for NLP tasks and explain when to use each",
"max_iterations": 5,
"enable_web_search": true
}'
# Stream reasoning steps
curl -X POST "http://localhost:8000/query/stream" \
-H "Content-Type: application/json" \
-d '{"query": "What are the latest developments in AI?", "stream": true}'Testing
# tests/test_workflow.py
import pytest
from app.graph.workflow import agentic_workflow
from app.graph.state import AgentGraphState
@pytest.mark.asyncio
async def test_simple_query():
"""Test simple query processing"""
state: AgentGraphState = {
"query": "What is Python?",
"query_analysis": None,
"sub_queries": [],
"current_sub_query_idx": 0,
"retrieval_results": [],
"query_history": [],
"evaluation": None,
"current_answer": None,
"final_answer": None,
"iteration": 0,
"max_iterations": 3,
"is_complete": False,
"reasoning_trace": [],
"errors": []
}
result = await agentic_workflow.ainvoke(state)
assert result["is_complete"]
assert result["final_answer"]
assert len(result["reasoning_trace"]) > 0
@pytest.mark.asyncio
async def test_complex_query_decomposition():
"""Test complex query decomposition"""
state: AgentGraphState = {
"query": "Compare Python and JavaScript for web development, considering performance, ecosystem, and learning curve",
"query_analysis": None,
"sub_queries": [],
"current_sub_query_idx": 0,
"retrieval_results": [],
"query_history": [],
"evaluation": None,
"current_answer": None,
"final_answer": None,
"iteration": 0,
"max_iterations": 5,
"is_complete": False,
"reasoning_trace": [],
"errors": []
}
result = await agentic_workflow.ainvoke(state)
assert result["is_complete"]
assert len(result["sub_queries"]) > 1
assert result["final_answer"]Debugging Tips
Common Issues
-
Infinite loops: Check
max_iterationsand ensure evaluation logic terminates -
LangGraph state issues: Ensure all state fields are properly initialized
-
Web search failures: Verify Serper API key and handle gracefully when unavailable
-
Slow performance: Consider caching embeddings and reducing LLM calls
Debug Logging
# Enable detailed workflow logging
import logging
logging.getLogger("langgraph").setLevel(logging.DEBUG)
# Trace state changes
for event in workflow.stream(state):
print(f"Event: {event}")Extensions
| Extension | Description | Complexity |
|---|---|---|
| Memory | Add conversation history | Medium |
| Tool Expansion | Add SQL, API, calculator tools | Medium |
| Human-in-the-Loop | Add approval steps | Medium |
| Multi-Agent | Specialized agents for domains | High |
| Caching | Cache retrieval and LLM results | Medium |
Resources
- LangGraph Documentation
- Self-RAG Paper
- Corrective RAG Paper
- Adaptive RAG Paper
- Serper API - Google Search API for production
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Query Analysis | Classify queries as simple or complex | Routes to appropriate strategy, saves compute on simple queries |
| Query Decomposition | Break complex questions into sub-queries | Each sub-query retrieves focused context, better coverage |
| Self-Evaluation | LLM judges if retrieval is sufficient | Prevents generating from poor context, triggers retry |
| Query Reformulation | Rewrite failed queries with different terms | Recovers from initial retrieval failures |
| Multi-Source Retrieval | Combine vector DB + web search | Knowledge base for domain facts, web for current info |
| LangGraph Workflows | State machine for agentic loops | Clean separation of nodes/edges, supports complex flows |
| Answer Verification | Check if answer is complete and grounded | Catches hallucinations before returning to user |
| Streaming Traces | Return reasoning steps as they happen | User sees progress, easier debugging |
Summary
You've built a sophisticated Agentic RAG system that:
- Analyzes queries to determine complexity and strategy
- Decomposes complex questions into manageable sub-queries
- Self-evaluates retrieval quality and decides when to retry
- Reformulates queries when initial retrieval is insufficient
- Combines sources from knowledge base and web search
- Synthesizes answers from multiple sub-answers
This represents the current state-of-the-art in RAG systems, moving from static pipelines to intelligent, adaptive systems that can handle complex real-world queries.
Congratulations!
You've completed the entire RAG learning path:
- Basic: Intelligent Document Q&A
- Intermediate: Multi-Document, Reranking, Hybrid Search, Conversational
- Advanced: Production RAG, Graph RAG, Multi-Modal RAG, Agentic RAG
You now have the skills to build production-grade RAG systems for any use case. Consider exploring the AI Agents section to learn more about autonomous AI systems.