Document RAG with Docling
Build a production RAG system using Docling for advanced document parsing with table extraction, OCR, and multi-format support
Document RAG with Docling
| Property | Value |
|---|---|
| Difficulty | Advanced |
| Time | ~4 days |
| Code Size | ~700 LOC |
| Prerequisites | Production RAG |
TL;DR
Docling provides unified parsing for PDFs, Word, PowerPoint, and Excel with table detection, OCR, and structure preservation. Build element-aware RAG that treats tables differently from text and maintains heading context for better retrieval.
Tech Stack
| Technology | Purpose |
|---|---|
| Docling | Document parsing (PDF, Word, PPT, Excel) |
| LangChain | RAG orchestration |
| ChromaDB | Vector storage |
| OpenAI | Embeddings + GPT-4 |
| FastAPI | REST API |
Prerequisites
- Python 3.10+
- OpenAI API key
- 8GB+ RAM (for document processing)
pip install docling langchain langchain-openai chromadb fastapi uvicorn python-multipartWhat You'll Learn
- Parse complex documents with tables, images, and formulas
- Preserve document structure during chunking
- Build element-aware retrieval systems
- Handle multiple document formats in one pipeline
The Problem: Traditional Document Processing Fails
Traditional RAG pipelines use simple text extraction that loses critical structure:
| Challenge | Traditional Approach | Impact |
|---|---|---|
| Tables | Flattened to text | Data relationships lost |
| Images | Ignored or OCR'd poorly | Visual information missing |
| Formulas | Garbled text | Scientific docs unusable |
| Layout | Reading order broken | Context fragmented |
| Multi-format | Separate parsers | Inconsistent output |
┌─────────────────────────────────────────────────────────────────────────────┐
│ TRADITIONAL PARSING ❌ │
│ │
│ PDF ────► PyPDF ────────┐ │
│ Word ───► python-docx ──┼────► Lost Structure ⚠️ │
│ Excel ──► openpyxl ─────┘ (Tables flattened, images ignored) │
│ │
│ Problem: Different parsers, inconsistent output, structure destroyed │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ DOCLING PARSING ✅ │
│ │
│ PDF ──────┐ ┌──────────────────────────────────────┐ │
│ Word ─────┤ │ Structured Output │ │
│ PPT ──────┼────► Docling ─────►│ • Tables (rows/columns preserved) │ │
│ Excel ────┤ │ • Text (reading order maintained) │ │
│ HTML ─────┘ │ • Images (extracted + OCR'd) │ │
│ └──────────────────────────────────────┘ │
│ │
│ Solution: One API, all formats, structure preserved │
└─────────────────────────────────────────────────────────────────────────────┘Solution: Docling for Unified Document Processing
Docling provides:
- Unified parsing for PDF, Word, PowerPoint, Excel, HTML, images
- Table detection with row/column preservation
- OCR integration for scanned documents
- Formula extraction for scientific papers
- Reading order detection for proper context
Project Structure
document-rag-docling/
├── config.py # Configuration settings
├── document_processor.py # Docling document parsing
├── chunker.py # Element-aware chunking
├── retriever.py # Structured retrieval
├── rag_pipeline.py # RAG orchestration
├── app.py # FastAPI application
└── requirements.txtStep 1: Configuration
# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
from enum import Enum
from typing import Optional
class DocumentFormat(str, Enum):
PDF = "pdf"
DOCX = "docx"
PPTX = "pptx"
XLSX = "xlsx"
HTML = "html"
IMAGE = "image"
class Settings(BaseSettings):
# OpenAI
openai_api_key: str
embedding_model: str = "text-embedding-3-small"
llm_model: str = "gpt-4o"
# Docling
enable_ocr: bool = True
enable_table_detection: bool = True
enable_formula_detection: bool = True
# Chunking
chunk_size: int = 1000
chunk_overlap: int = 200
preserve_tables: bool = True
# Vector Store
collection_name: str = "docling_rag"
persist_directory: str = "./chroma_db"
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()Step 2: Document Processor with Docling
Docling handles all document formats with a unified API:
# document_processor.py
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import hashlib
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from config import get_settings, DocumentFormat
class ElementType(str, Enum):
TEXT = "text"
TABLE = "table"
IMAGE = "image"
FORMULA = "formula"
HEADING = "heading"
LIST = "list"
@dataclass
class DocumentElement:
"""Represents a structural element from a document."""
element_type: ElementType
content: str
metadata: Dict[str, Any]
page_number: Optional[int] = None
position: Optional[Dict[str, float]] = None
def to_dict(self) -> Dict[str, Any]:
return {
"element_type": self.element_type.value,
"content": self.content,
"metadata": self.metadata,
"page_number": self.page_number,
"position": self.position
}
@dataclass
class ProcessedDocument:
"""Contains all extracted elements from a document."""
source: str
elements: List[DocumentElement]
markdown: str
metadata: Dict[str, Any]
@property
def tables(self) -> List[DocumentElement]:
return [e for e in self.elements if e.element_type == ElementType.TABLE]
@property
def text_elements(self) -> List[DocumentElement]:
return [e for e in self.elements if e.element_type == ElementType.TEXT]
class DoclingProcessor:
"""Process documents using Docling for advanced extraction."""
def __init__(self):
self.settings = get_settings()
self.converter = self._create_converter()
def _create_converter(self) -> DocumentConverter:
"""Create Docling converter with configured options."""
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = self.settings.enable_ocr
pipeline_options.do_table_structure = self.settings.enable_table_detection
return DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.DOCX,
InputFormat.PPTX,
InputFormat.XLSX,
InputFormat.HTML,
InputFormat.IMAGE,
],
pdf_backend=PyPdfiumDocumentBackend,
)
def process(self, source: str | Path) -> ProcessedDocument:
"""
Process a document and extract structured elements.
Args:
source: File path or URL to document
Returns:
ProcessedDocument with all extracted elements
"""
source_str = str(source)
# Convert document
result = self.converter.convert(source_str)
doc = result.document
# Extract elements
elements = self._extract_elements(doc)
# Generate markdown representation
markdown = doc.export_to_markdown()
# Build metadata
metadata = {
"source": source_str,
"doc_id": self._generate_doc_id(source_str),
"num_pages": getattr(doc, 'num_pages', None),
"num_tables": len([e for e in elements if e.element_type == ElementType.TABLE]),
"num_images": len([e for e in elements if e.element_type == ElementType.IMAGE]),
}
return ProcessedDocument(
source=source_str,
elements=elements,
markdown=markdown,
metadata=metadata
)
def _extract_elements(self, doc) -> List[DocumentElement]:
"""Extract structural elements from Docling document."""
elements = []
# Extract tables
for idx, table in enumerate(doc.tables):
try:
df = table.export_to_dataframe()
content = df.to_markdown(index=False)
elements.append(DocumentElement(
element_type=ElementType.TABLE,
content=content,
metadata={
"table_index": idx,
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns)
},
page_number=getattr(table, 'page_no', None)
))
except Exception as e:
print(f"Warning: Could not extract table {idx}: {e}")
# Extract text blocks
for idx, text_item in enumerate(doc.texts):
content = text_item.text if hasattr(text_item, 'text') else str(text_item)
# Determine element type based on style
element_type = ElementType.TEXT
if hasattr(text_item, 'label'):
label = str(text_item.label).lower()
if 'heading' in label or 'title' in label:
element_type = ElementType.HEADING
elif 'list' in label:
element_type = ElementType.LIST
elements.append(DocumentElement(
element_type=element_type,
content=content,
metadata={"text_index": idx},
page_number=getattr(text_item, 'page_no', None)
))
# Extract images (as descriptions if available)
for idx, picture in enumerate(doc.pictures):
caption = getattr(picture, 'caption', f"Image {idx + 1}")
elements.append(DocumentElement(
element_type=ElementType.IMAGE,
content=f"[Image: {caption}]",
metadata={"image_index": idx},
page_number=getattr(picture, 'page_no', None)
))
return elements
def _generate_doc_id(self, source: str) -> str:
"""Generate unique document ID."""
return hashlib.md5(source.encode()).hexdigest()[:12]
def process_batch(self, sources: List[str | Path]) -> List[ProcessedDocument]:
"""Process multiple documents."""
return [self.process(source) for source in sources]★ Insight ─────────────────────────────────────
Docling's DocumentConverter provides a unified interface for all document types. The key insight is that it preserves structural relationships - tables remain as grids, reading order is detected, and elements maintain their semantic roles (heading, list, paragraph).
─────────────────────────────────────────────────
Step 3: Element-Aware Chunking
Traditional chunking breaks tables and loses context. We chunk intelligently:
# chunker.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from langchain.text_splitter import RecursiveCharacterTextSplitter
from document_processor import (
ProcessedDocument,
DocumentElement,
ElementType
)
from config import get_settings
@dataclass
class DocumentChunk:
"""A chunk with preserved structure and metadata."""
content: str
element_type: ElementType
metadata: Dict[str, Any]
chunk_index: int
def to_langchain_doc(self):
"""Convert to LangChain Document format."""
from langchain.schema import Document
return Document(
page_content=self.content,
metadata={
**self.metadata,
"element_type": self.element_type.value,
"chunk_index": self.chunk_index
}
)
class ElementAwareChunker:
"""
Chunk documents while preserving structural elements.
Strategy:
- Tables: Keep as single chunks (never split)
- Headings: Prepend to following content
- Text: Use recursive splitting with overlap
- Images: Keep with captions
"""
def __init__(self):
self.settings = get_settings()
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.settings.chunk_size,
chunk_overlap=self.settings.chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""]
)
def chunk(self, document: ProcessedDocument) -> List[DocumentChunk]:
"""
Chunk a processed document with element awareness.
Args:
document: ProcessedDocument from Docling
Returns:
List of DocumentChunk with preserved structure
"""
chunks = []
chunk_index = 0
current_heading = ""
for element in document.elements:
element_chunks = self._chunk_element(
element,
document.metadata,
current_heading
)
# Track current heading for context
if element.element_type == ElementType.HEADING:
current_heading = element.content
for content in element_chunks:
chunks.append(DocumentChunk(
content=content,
element_type=element.element_type,
metadata={
**document.metadata,
**element.metadata,
"page_number": element.page_number,
"heading_context": current_heading
},
chunk_index=chunk_index
))
chunk_index += 1
return chunks
def _chunk_element(
self,
element: DocumentElement,
doc_metadata: Dict[str, Any],
current_heading: str
) -> List[str]:
"""Chunk a single element based on its type."""
if element.element_type == ElementType.TABLE:
# Never split tables - keep as single chunk
if self.settings.preserve_tables:
return [self._format_table_chunk(element, current_heading)]
else:
return self._split_table_content(element)
elif element.element_type == ElementType.HEADING:
# Headings are typically short, keep whole
return [element.content]
elif element.element_type == ElementType.IMAGE:
# Keep image descriptions as-is
return [element.content]
elif element.element_type == ElementType.FORMULA:
# Keep formulas intact
return [element.content]
else:
# Text elements: use recursive splitting
content = element.content
# Add heading context if available
if current_heading and len(content) > 100:
content = f"## {current_heading}\n\n{content}"
if len(content) <= self.settings.chunk_size:
return [content]
return self.text_splitter.split_text(content)
def _format_table_chunk(
self,
element: DocumentElement,
heading: str
) -> str:
"""Format table with context."""
table_header = f"## Table: {heading}\n\n" if heading else "## Table\n\n"
# Add column info
if "column_names" in element.metadata:
cols = element.metadata["column_names"]
table_header += f"Columns: {', '.join(cols)}\n\n"
return table_header + element.content
def _split_table_content(self, element: DocumentElement) -> List[str]:
"""Split large tables by rows (fallback)."""
lines = element.content.split("\n")
if len(lines) <= 10:
return [element.content]
# Keep header with each chunk
header = lines[:2] # Markdown table header
data_lines = lines[2:]
chunks = []
for i in range(0, len(data_lines), 8):
chunk_lines = header + data_lines[i:i+8]
chunks.append("\n".join(chunk_lines))
return chunks
class HybridChunker:
"""
Combine element-aware chunking with semantic boundaries.
Uses Docling's built-in hierarchical chunking when available.
"""
def __init__(self):
self.element_chunker = ElementAwareChunker()
self.settings = get_settings()
def chunk_with_hierarchy(
self,
document: ProcessedDocument,
use_docling_chunker: bool = True
) -> List[DocumentChunk]:
"""
Chunk using Docling's hierarchical chunker for better semantic boundaries.
"""
if use_docling_chunker:
try:
return self._docling_hierarchical_chunk(document)
except Exception as e:
print(f"Docling chunking failed, falling back: {e}")
return self.element_chunker.chunk(document)
def _docling_hierarchical_chunk(
self,
document: ProcessedDocument
) -> List[DocumentChunk]:
"""Use Docling's native hierarchical chunker."""
from docling.chunking import HierarchicalChunker
from docling.document_converter import DocumentConverter
# Re-convert to get native document object
converter = DocumentConverter()
result = converter.convert(document.source)
chunker = HierarchicalChunker()
doc_chunks = list(chunker.chunk(result.document))
chunks = []
for idx, chunk in enumerate(doc_chunks):
chunks.append(DocumentChunk(
content=chunk.text,
element_type=ElementType.TEXT,
metadata={
**document.metadata,
"chunk_method": "hierarchical"
},
chunk_index=idx
))
return chunks★ Insight ─────────────────────────────────────
The key insight for element-aware chunking: tables should never be split. A table split across chunks loses its relational meaning. Instead, we keep tables as single units even if they exceed the chunk size, and use the heading context to maintain semantic coherence.
─────────────────────────────────────────────────
Step 4: Structured Retrieval
Retrieve by element type and combine results intelligently:
# retriever.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain.schema import Document
from chunker import DocumentChunk, ElementType
from config import get_settings
class RetrievalMode(str, Enum):
STANDARD = "standard"
TABLE_FIRST = "table_first"
ELEMENT_FILTERED = "element_filtered"
HYBRID = "hybrid"
@dataclass
class RetrievalResult:
"""Structured retrieval result with element awareness."""
chunks: List[DocumentChunk]
tables: List[DocumentChunk]
text: List[DocumentChunk]
scores: Dict[int, float]
def get_context(self, max_tokens: int = 4000) -> str:
"""Build context string prioritizing tables."""
context_parts = []
# Add tables first (they contain structured data)
for table in self.tables[:3]:
context_parts.append(f"[TABLE]\n{table.content}\n[/TABLE]")
# Add text chunks
for text in self.text:
context_parts.append(text.content)
return "\n\n---\n\n".join(context_parts)
class StructuredRetriever:
"""
Retriever with element-type awareness.
Features:
- Filter by element type
- Boost table results for data queries
- Combine element types intelligently
"""
def __init__(self):
self.settings = get_settings()
self.embeddings = OpenAIEmbeddings(
model=self.settings.embedding_model,
openai_api_key=self.settings.openai_api_key
)
self.vectorstore = self._init_vectorstore()
def _init_vectorstore(self) -> Chroma:
"""Initialize or load ChromaDB."""
return Chroma(
collection_name=self.settings.collection_name,
embedding_function=self.embeddings,
persist_directory=self.settings.persist_directory
)
def add_chunks(self, chunks: List[DocumentChunk]) -> None:
"""Add chunks to vector store."""
documents = [chunk.to_langchain_doc() for chunk in chunks]
self.vectorstore.add_documents(documents)
def retrieve(
self,
query: str,
mode: RetrievalMode = RetrievalMode.HYBRID,
k: int = 10,
element_filter: Optional[List[ElementType]] = None
) -> RetrievalResult:
"""
Retrieve chunks with element awareness.
Args:
query: Search query
mode: Retrieval strategy
k: Number of results
element_filter: Only return specific element types
Returns:
RetrievalResult with categorized chunks
"""
# Build filter
where_filter = None
if element_filter:
where_filter = {
"element_type": {"$in": [e.value for e in element_filter]}
}
# Retrieve with scores
results = self.vectorstore.similarity_search_with_score(
query,
k=k,
filter=where_filter
)
# Categorize results
chunks = []
tables = []
text = []
scores = {}
for idx, (doc, score) in enumerate(results):
chunk = DocumentChunk(
content=doc.page_content,
element_type=ElementType(doc.metadata.get("element_type", "text")),
metadata=doc.metadata,
chunk_index=idx
)
chunks.append(chunk)
scores[idx] = score
if chunk.element_type == ElementType.TABLE:
tables.append(chunk)
else:
text.append(chunk)
# Apply mode-specific ordering
if mode == RetrievalMode.TABLE_FIRST:
chunks = tables + text
return RetrievalResult(
chunks=chunks,
tables=tables,
text=text,
scores=scores
)
def retrieve_for_data_query(self, query: str, k: int = 10) -> RetrievalResult:
"""Specialized retrieval for data/number queries."""
# Detect if query is about data
data_keywords = ["how many", "total", "average", "sum", "count",
"percentage", "number of", "statistics", "data"]
is_data_query = any(kw in query.lower() for kw in data_keywords)
if is_data_query:
# Prioritize tables
return self.retrieve(
query,
mode=RetrievalMode.TABLE_FIRST,
k=k
)
return self.retrieve(query, mode=RetrievalMode.HYBRID, k=k)
class MultiSourceRetriever:
"""Retrieve across multiple document sources."""
def __init__(self):
self.retriever = StructuredRetriever()
self.source_stats: Dict[str, int] = {}
def add_document(self, chunks: List[DocumentChunk]) -> None:
"""Add document chunks and track source."""
if chunks:
source = chunks[0].metadata.get("source", "unknown")
self.source_stats[source] = len(chunks)
self.retriever.add_chunks(chunks)
def retrieve_with_source_diversity(
self,
query: str,
k: int = 10,
max_per_source: int = 5
) -> RetrievalResult:
"""Retrieve with diversity across sources."""
# Get more results to ensure diversity
result = self.retriever.retrieve(query, k=k * 2)
# Apply source diversity
seen_sources: Dict[str, int] = {}
diverse_chunks = []
for chunk in result.chunks:
source = chunk.metadata.get("source", "unknown")
if seen_sources.get(source, 0) < max_per_source:
diverse_chunks.append(chunk)
seen_sources[source] = seen_sources.get(source, 0) + 1
if len(diverse_chunks) >= k:
break
# Recategorize
tables = [c for c in diverse_chunks if c.element_type == ElementType.TABLE]
text = [c for c in diverse_chunks if c.element_type != ElementType.TABLE]
return RetrievalResult(
chunks=diverse_chunks,
tables=tables,
text=text,
scores=result.scores
)Step 5: RAG Pipeline
Orchestrate the complete pipeline with LangChain:
# rag_pipeline.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import json
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import HumanMessage, SystemMessage
from document_processor import DoclingProcessor, ProcessedDocument
from chunker import ElementAwareChunker, HybridChunker, DocumentChunk
from retriever import StructuredRetriever, RetrievalResult, RetrievalMode
from config import get_settings
@dataclass
class RAGResponse:
"""Complete RAG response with sources."""
answer: str
sources: List[Dict[str, Any]]
tables_used: int
confidence: float
class DoclingRAGPipeline:
"""
Complete RAG pipeline using Docling for document processing.
Flow:
1. Process documents with Docling
2. Chunk with element awareness
3. Index in ChromaDB
4. Retrieve with structure awareness
5. Generate with source attribution
"""
def __init__(self):
self.settings = get_settings()
self.processor = DoclingProcessor()
self.chunker = HybridChunker()
self.retriever = StructuredRetriever()
self.llm = ChatOpenAI(
model=self.settings.llm_model,
api_key=self.settings.openai_api_key,
temperature=0.1
)
def ingest_document(self, source: str) -> Dict[str, Any]:
"""
Ingest a document into the RAG system.
Args:
source: File path or URL
Returns:
Ingestion statistics
"""
# Process with Docling
doc = self.processor.process(source)
# Chunk with element awareness
chunks = self.chunker.chunk_with_hierarchy(doc)
# Add to vector store
self.retriever.add_chunks(chunks)
return {
"source": source,
"doc_id": doc.metadata["doc_id"],
"num_chunks": len(chunks),
"num_tables": doc.metadata["num_tables"],
"num_images": doc.metadata["num_images"]
}
def ingest_batch(self, sources: List[str]) -> List[Dict[str, Any]]:
"""Ingest multiple documents."""
return [self.ingest_document(source) for source in sources]
def query(
self,
question: str,
k: int = 10,
include_tables: bool = True
) -> RAGResponse:
"""
Query the RAG system.
Args:
question: User question
k: Number of chunks to retrieve
include_tables: Whether to prioritize table content
Returns:
RAGResponse with answer and sources
"""
# Retrieve relevant chunks
mode = RetrievalMode.TABLE_FIRST if include_tables else RetrievalMode.STANDARD
retrieval = self.retriever.retrieve(question, mode=mode, k=k)
# Build context
context = retrieval.get_context()
# Generate answer
answer = self._generate_answer(question, context, retrieval)
# Build sources
sources = self._extract_sources(retrieval)
return RAGResponse(
answer=answer,
sources=sources,
tables_used=len(retrieval.tables),
confidence=self._estimate_confidence(retrieval)
)
def _generate_answer(
self,
question: str,
context: str,
retrieval: RetrievalResult
) -> str:
"""Generate answer using LLM."""
# Build system prompt based on content type
has_tables = len(retrieval.tables) > 0
system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Guidelines:
- Answer based ONLY on the provided context
- If the context doesn't contain enough information, say so
- When citing data from tables, be precise with numbers
- Reference the source document when possible
- If multiple sources provide different information, note the discrepancy"""
if has_tables:
system_prompt += """
Special instructions for tables:
- Tables are marked with [TABLE] and [/TABLE] tags
- Extract exact values from tables when answering numerical questions
- Describe the table structure if asked about data format"""
user_prompt = f"""Context:
{context}
Question: {question}
Provide a clear, accurate answer based on the context above."""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
response = self.llm.invoke(messages)
return response.content
def _extract_sources(self, retrieval: RetrievalResult) -> List[Dict[str, Any]]:
"""Extract source information for attribution."""
sources = []
seen_sources = set()
for chunk in retrieval.chunks[:5]:
source = chunk.metadata.get("source", "unknown")
if source not in seen_sources:
sources.append({
"source": source,
"page": chunk.metadata.get("page_number"),
"element_type": chunk.element_type.value,
"preview": chunk.content[:200] + "..."
})
seen_sources.add(source)
return sources
def _estimate_confidence(self, retrieval: RetrievalResult) -> float:
"""Estimate answer confidence based on retrieval quality."""
if not retrieval.scores:
return 0.5
# Use average similarity score
avg_score = sum(retrieval.scores.values()) / len(retrieval.scores)
# Normalize to 0-1 (scores are distances, lower is better)
confidence = max(0, min(1, 1 - avg_score))
# Boost if tables found for data queries
if retrieval.tables:
confidence = min(1, confidence + 0.1)
return round(confidence, 2)
class ConversationalDoclingRAG:
"""RAG with conversation memory."""
def __init__(self):
self.pipeline = DoclingRAGPipeline()
self.conversation_history: List[Dict[str, str]] = []
self.max_history = 10
def chat(self, message: str) -> RAGResponse:
"""Chat with conversation context."""
# Add context from history
history_context = self._format_history()
# Reformulate query with history
enhanced_query = self._enhance_query(message, history_context)
# Get response
response = self.pipeline.query(enhanced_query)
# Update history
self.conversation_history.append({
"role": "user",
"content": message
})
self.conversation_history.append({
"role": "assistant",
"content": response.answer
})
# Trim history
if len(self.conversation_history) > self.max_history * 2:
self.conversation_history = self.conversation_history[-self.max_history * 2:]
return response
def _format_history(self) -> str:
"""Format conversation history."""
if not self.conversation_history:
return ""
lines = []
for msg in self.conversation_history[-6:]: # Last 3 exchanges
role = "User" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def _enhance_query(self, query: str, history: str) -> str:
"""Enhance query with conversation context."""
if not history:
return query
return f"Previous conversation:\n{history}\n\nCurrent question: {query}"
def clear_history(self) -> None:
"""Clear conversation history."""
self.conversation_history = []Step 6: FastAPI Application
# app.py
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import tempfile
import os
from contextlib import asynccontextmanager
from rag_pipeline import DoclingRAGPipeline, ConversationalDoclingRAG, RAGResponse
# Global instances
pipeline: Optional[DoclingRAGPipeline] = None
chat_sessions: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize RAG pipeline on startup."""
global pipeline
pipeline = DoclingRAGPipeline()
yield
# Cleanup if needed
app = FastAPI(
title="Docling RAG API",
description="RAG system with advanced document processing",
version="1.0.0",
lifespan=lifespan
)
# Request/Response models
class QueryRequest(BaseModel):
question: str
k: int = 10
include_tables: bool = True
class QueryResponse(BaseModel):
answer: str
sources: List[dict]
tables_used: int
confidence: float
class ChatRequest(BaseModel):
session_id: str
message: str
class IngestResponse(BaseModel):
source: str
doc_id: str
num_chunks: int
num_tables: int
num_images: int
# Endpoints
@app.post("/ingest/file", response_model=IngestResponse)
async def ingest_file(file: UploadFile = File(...)):
"""Upload and ingest a document."""
if not pipeline:
raise HTTPException(status_code=500, detail="Pipeline not initialized")
# Save uploaded file temporarily
suffix = os.path.splitext(file.filename)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
result = pipeline.ingest_document(tmp_path)
return IngestResponse(**result)
finally:
os.unlink(tmp_path)
@app.post("/ingest/url", response_model=IngestResponse)
async def ingest_url(url: str):
"""Ingest a document from URL."""
if not pipeline:
raise HTTPException(status_code=500, detail="Pipeline not initialized")
result = pipeline.ingest_document(url)
return IngestResponse(**result)
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""Query the RAG system."""
if not pipeline:
raise HTTPException(status_code=500, detail="Pipeline not initialized")
response = pipeline.query(
question=request.question,
k=request.k,
include_tables=request.include_tables
)
return QueryResponse(
answer=response.answer,
sources=response.sources,
tables_used=response.tables_used,
confidence=response.confidence
)
@app.post("/chat", response_model=QueryResponse)
async def chat(request: ChatRequest):
"""Conversational chat endpoint."""
if request.session_id not in chat_sessions:
chat_sessions[request.session_id] = ConversationalDoclingRAG()
session = chat_sessions[request.session_id]
response = session.chat(request.message)
return QueryResponse(
answer=response.answer,
sources=response.sources,
tables_used=response.tables_used,
confidence=response.confidence
)
@app.delete("/chat/{session_id}")
async def clear_chat(session_id: str):
"""Clear chat session history."""
if session_id in chat_sessions:
del chat_sessions[session_id]
return {"status": "cleared"}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"pipeline_initialized": pipeline is not None
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Step 7: Requirements
# requirements.txt
docling>=2.0.0
langchain>=0.3.0
langchain-openai>=0.2.0
langchain-chroma>=0.1.0
chromadb>=0.5.0
openai>=1.50.0
fastapi>=0.115.0
uvicorn>=0.32.0
python-multipart>=0.0.12
pydantic>=2.9.0
pydantic-settings>=2.6.0
pandas>=2.2.0
tabulate>=0.9.0Usage Examples
Basic Document Ingestion
from rag_pipeline import DoclingRAGPipeline
# Initialize pipeline
pipeline = DoclingRAGPipeline()
# Ingest a PDF with tables
result = pipeline.ingest_document("financial_report.pdf")
print(f"Ingested {result['num_chunks']} chunks, {result['num_tables']} tables")
# Query about table data
response = pipeline.query("What was the Q3 revenue?")
print(f"Answer: {response.answer}")
print(f"Tables used: {response.tables_used}")
print(f"Confidence: {response.confidence}")Multi-Format Ingestion
# Ingest various formats
pipeline.ingest_document("presentation.pptx")
pipeline.ingest_document("spreadsheet.xlsx")
pipeline.ingest_document("https://arxiv.org/pdf/2408.09869")
# Query across all sources
response = pipeline.query("Summarize the key findings across all documents")API Usage
# Ingest a document
curl -X POST "http://localhost:8000/ingest/file" \
-F "file=@report.pdf"
# Query
curl -X POST "http://localhost:8000/query" \
-H "Content-Type: application/json" \
-d '{"question": "What are the main conclusions?", "include_tables": true}'Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────────┐
│ DOCUMENT RAG ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DOCUMENT INPUT │
│ ┌─────┐ ┌──────┐ ┌─────┐ ┌───────┐ ┌─────┐ │
│ │ PDF │ │ Word │ │ PPT │ │ Excel │ │ URL │ │
│ └──┬──┘ └──┬───┘ └──┬──┘ └───┬───┘ └──┬──┘ │
│ └───────┴────────┼───────┴─────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DOCLING PROCESSING │ │
│ │ DocumentConverter ───► OCR Engine ───► Table Detection │ │
│ │ │ │ │
│ │ Reading Order ◄─┘ │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ELEMENT-AWARE CHUNKING │ │
│ │ ┌──────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ Table Chunks │ │ Text Chunks │ │ Heading Context │ │ │
│ │ │ (preserved) │ │ (ordered) │ │ (attached) │ │ │
│ │ └──────────────┘ └─────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ VECTOR STORAGE │ │
│ │ OpenAI Embeddings ───► ChromaDB (with element metadata) │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ STRUCTURED RETRIEVAL │ │
│ │ Query Analysis ───┬──► Table-First Mode (numeric queries) │ │
│ │ └──► Hybrid Mode (mixed content) │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LLM GENERATION │ │
│ │ Context Assembly ───► GPT-4o ───► Source Attribution │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Extensions
| Extension | Description | Complexity |
|---|---|---|
| Image Understanding | Add vision model for image content | High |
| Formula Parsing | Extract LaTeX from scientific papers | Medium |
| Streaming Response | Stream LLM output for better UX | Low |
| Metadata Filtering | Filter by date, author, document type | Medium |
| Hybrid Search | Combine with BM25 keyword search | Medium |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Unified Parsing | One API for PDF, Word, PPT, Excel, HTML | No more format-specific code, consistent output |
| Table Detection | Identify and preserve table structure | Financial reports, specs, comparisons stay usable |
| OCR Integration | Extract text from scanned documents | Handle legacy PDFs and images |
| Reading Order | Maintain logical document flow | Multi-column layouts don't get jumbled |
| Element-Aware Chunking | Different strategies for tables vs text | Tables stay intact, text gets semantic splits |
| Heading Context | Attach section headers to chunks | "Revenue" chunk knows it's under "Q3 Results" |
| Table-First Retrieval | Prioritize tables for numeric queries | "What was revenue?" finds the data table |
| Hybrid Mode | Combine table and text retrieval | Complex questions get both data and explanation |
Key Takeaways
- Docling unifies document processing - One API for PDF, Word, PowerPoint, Excel, and more
- Structure preservation matters - Tables and headings carry semantic meaning that chunking can destroy
- Element-aware retrieval - Knowing content types enables smarter retrieval strategies
- Context is king - Keeping heading context with chunks improves relevance