Legal Contract Analysis Platform
Build a RAG system for contract review, clause extraction, and risk analysis used by legal teams
Legal Contract Analysis Platform
TL;DR
Build a system that parses contracts (PDF/OCR), extracts clauses by type (termination, liability, IP), scores risks (missing clauses, unlimited liability, one-sided terms), and answers questions via RAG. The secret sauce: section-aware segmentation (regex + LLM fallback), clause classification, and a multi-layered risk analysis pipeline.
Build a sophisticated legal document analysis platform that helps legal teams review contracts 10x faster with AI-powered clause extraction, risk identification, and comparison analysis.
| Industry | Legal Tech |
| Difficulty | Advanced |
| Time | 2 weeks |
| Code | ~2000 lines |
What You'll Build
A comprehensive contract analysis system that:
- Parses complex documents - PDFs, Word docs, scanned contracts with OCR
- Extracts key clauses - Obligations, termination, liability, IP rights
- Identifies risks - Non-standard terms, missing clauses, unfavorable conditions
- Compares contracts - Against templates, previous versions, industry standards
- Answers questions - Natural language queries about contract terms
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ LEGAL CONTRACT ANALYSIS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DOCUMENT INPUT │ │
│ │ PDF Contracts ──────► Word Documents ──────► Scanned Documents │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DOCUMENT PROCESSING │ │
│ │ OCR Processing ──────► Structure Parsing ──────► Segmentation │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LEGAL ANALYSIS │ │
│ │ Clause Extraction ────┬────► Entity Recognition │ │
│ │ └────► Clause Classification │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AI INTELLIGENCE │ │
│ │ Legal Embeddings ────────► RAG Pipeline ────────► Risk Scoring │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ANALYSIS OUTPUT │ │
│ │ ┌─────────────┬──────────────┬─────────────┐ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ Summary Comparison Risk Alerts Q&A Interface │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Project Structure
legal-contracts/
├── src/
│ ├── __init__.py
│ ├── config.py # Configuration
│ ├── ingestion/
│ │ ├── __init__.py
│ │ ├── pdf_parser.py # PDF extraction
│ │ ├── docx_parser.py # Word document parsing
│ │ ├── ocr_processor.py # OCR for scanned docs
│ │ └── section_segmenter.py # Contract section detection
│ ├── extraction/
│ │ ├── __init__.py
│ │ ├── clause_extractor.py # Clause identification
│ │ ├── entity_extractor.py # Legal entity NER
│ │ └── obligation_parser.py # Obligation extraction
│ ├── analysis/
│ │ ├── __init__.py
│ │ ├── risk_analyzer.py # Risk identification
│ │ ├── clause_classifier.py # Clause type classification
│ │ └── comparator.py # Contract comparison
│ ├── retrieval/
│ │ ├── __init__.py
│ │ ├── vector_store.py # Weaviate integration
│ │ └── hybrid_search.py # Hybrid retrieval
│ ├── generation/
│ │ ├── __init__.py
│ │ ├── rag_pipeline.py # RAG for Q&A
│ │ └── summary_generator.py # Executive summaries
│ └── api/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ └── models.py # Pydantic models
├── models/
│ └── clause_classifier/ # Fine-tuned classifier
├── templates/
│ └── standard_clauses.json # Standard clause templates
├── tests/
├── docker-compose.yml
└── requirements.txtStep 1: Configuration
# src/config.py
from pydantic_settings import BaseSettings
from typing import List
from pathlib import Path
class Settings(BaseSettings):
# API Keys
openai_api_key: str
anthropic_api_key: str # Claude for analysis
# Models
embedding_model: str = "text-embedding-3-large"
analysis_model: str = "claude-3-5-sonnet-20241022"
# Weaviate
weaviate_url: str = "http://localhost:8080"
weaviate_class: str = "ContractClause"
# OCR
tesseract_path: str = "/usr/bin/tesseract"
# Clause types to extract
clause_types: List[str] = [
"termination",
"liability",
"indemnification",
"confidentiality",
"intellectual_property",
"payment",
"warranty",
"force_majeure",
"dispute_resolution",
"governing_law",
"assignment",
"notice",
"amendment",
"severability",
"entire_agreement"
]
# Risk thresholds
high_risk_threshold: float = 0.7
medium_risk_threshold: float = 0.4
class Config:
env_file = ".env"
settings = Settings()Understanding the Risk Thresholds:
| Setting | Value | Purpose |
|---|---|---|
high_risk_threshold | 0.7 | Issues requiring immediate legal attention |
medium_risk_threshold | 0.4 | Issues to flag for review, not blockers |
clause_types | 15 types | Standard clauses every commercial contract should have |
Why Claude (Anthropic) for analysis? Legal language requires nuanced understanding - Claude excels at structured extraction and maintaining context across long documents.
Step 2: Document Processing
PDF Parser
# src/ingestion/pdf_parser.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import fitz # PyMuPDF
from PIL import Image
import io
@dataclass
class ParsedPage:
page_number: int
text: str
tables: List[Dict[str, Any]]
images: List[bytes]
has_signature: bool
@dataclass
class ParsedDocument:
filename: str
total_pages: int
pages: List[ParsedPage]
metadata: Dict[str, Any]
class PDFParser:
"""Extract text, tables, and images from PDF contracts."""
def parse(self, pdf_path: str) -> ParsedDocument:
"""Parse a PDF document."""
doc = fitz.open(pdf_path)
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
parsed_page = self._parse_page(page, page_num + 1)
pages.append(parsed_page)
# Extract metadata
metadata = {
"title": doc.metadata.get("title", ""),
"author": doc.metadata.get("author", ""),
"creation_date": doc.metadata.get("creationDate", ""),
"modification_date": doc.metadata.get("modDate", "")
}
doc.close()
return ParsedDocument(
filename=pdf_path,
total_pages=len(pages),
pages=pages,
metadata=metadata
)
def _parse_page(self, page: fitz.Page, page_number: int) -> ParsedPage:
"""Parse a single page."""
# Extract text with layout preservation
text = page.get_text("text")
# Extract tables
tables = self._extract_tables(page)
# Extract images
images = self._extract_images(page)
# Check for signature blocks
has_signature = self._detect_signature(page)
return ParsedPage(
page_number=page_number,
text=text,
tables=tables,
images=images,
has_signature=has_signature
)
def _extract_tables(self, page: fitz.Page) -> List[Dict[str, Any]]:
"""Extract tables from page."""
tables = []
# Use PyMuPDF's table detection
tabs = page.find_tables()
for tab in tabs:
table_data = tab.extract()
if table_data:
tables.append({
"headers": table_data[0] if table_data else [],
"rows": table_data[1:] if len(table_data) > 1 else [],
"bbox": tab.bbox
})
return tables
def _extract_images(self, page: fitz.Page) -> List[bytes]:
"""Extract images from page."""
images = []
image_list = page.get_images()
for img_index, img in enumerate(image_list):
xref = img[0]
base_image = page.parent.extract_image(xref)
if base_image:
images.append(base_image["image"])
return images
def _detect_signature(self, page: fitz.Page) -> bool:
"""Detect if page contains signature blocks."""
text_lower = page.get_text("text").lower()
signature_indicators = [
"signature:", "signed:", "by:", "authorized signature",
"witness:", "notary", "________________________"
]
return any(indicator in text_lower for indicator in signature_indicators)Why Detect Signatures and Tables Separately?
Legal documents have specific structures that matter:
┌─────────────────────────────────────────────────────────────┐
│ CONTRACT DOCUMENT STRUCTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Text ───────► Main content for clause extraction │
│ │
│ Tables ─────► Payment schedules, fee structures, │
│ milestone deliverables (structured data) │
│ │
│ Signatures ─► Indicates executed vs. draft contract │
│ (affects risk analysis) │
│ │
│ Images ─────► Logos, diagrams (usually skip for RAG) │
│ │
└─────────────────────────────────────────────────────────────┘Tables often contain the actual numbers (prices, dates, quantities) while clauses contain the rules about those numbers.
Section Segmenter
# src/ingestion/section_segmenter.py
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
import re
from anthropic import Anthropic
from ..config import settings
@dataclass
class ContractSection:
section_number: str
title: str
content: str
subsections: List['ContractSection']
page_numbers: List[int]
clause_type: str = None
class SectionSegmenter:
"""Segment contracts into logical sections."""
def __init__(self):
self.client = Anthropic(api_key=settings.anthropic_api_key)
# Common section patterns
self.section_patterns = [
r'^(\d+\.)\s+([A-Z][A-Z\s]+)$', # 1. DEFINITIONS
r'^(\d+\.\d+)\s+(.+)$', # 1.1 Term
r'^(Article\s+\d+)[:\.]?\s*(.+)?$', # Article 1: Definitions
r'^(Section\s+\d+)[:\.]?\s*(.+)?$', # Section 1. Definitions
r'^([A-Z]\.)\s+(.+)$', # A. Definitions
r'^(ARTICLE\s+[IVXLC]+)[:\.]?\s*(.+)?$', # ARTICLE I: DEFINITIONS
]
def segment(self, full_text: str, page_breaks: List[int] = None) -> List[ContractSection]:
"""Segment contract text into sections."""
# First pass: regex-based section detection
sections = self._regex_segment(full_text)
# If regex fails, use LLM-based segmentation
if len(sections) < 3:
sections = self._llm_segment(full_text)
# Assign page numbers if available
if page_breaks:
sections = self._assign_page_numbers(sections, full_text, page_breaks)
return sections
def _regex_segment(self, text: str) -> List[ContractSection]:
"""Segment using regex patterns."""
sections = []
lines = text.split('\n')
current_section = None
current_content = []
for line in lines:
matched = False
for pattern in self.section_patterns:
match = re.match(pattern, line.strip(), re.IGNORECASE)
if match:
# Save previous section
if current_section:
current_section.content = '\n'.join(current_content).strip()
sections.append(current_section)
# Start new section
section_num = match.group(1)
title = match.group(2) if match.lastindex > 1 else ""
current_section = ContractSection(
section_number=section_num,
title=title.strip() if title else "",
content="",
subsections=[],
page_numbers=[]
)
current_content = []
matched = True
break
if not matched and current_section:
current_content.append(line)
# Don't forget the last section
if current_section:
current_section.content = '\n'.join(current_content).strip()
sections.append(current_section)
return sections
def _llm_segment(self, text: str) -> List[ContractSection]:
"""Use LLM to segment complex contracts."""
prompt = f"""Analyze this contract text and identify all distinct sections.
For each section, extract:
1. Section number/identifier
2. Section title
3. The full content of that section
Contract text:
{text[:15000]} # Limit for context window
Return as JSON array:
[
{{
"section_number": "1",
"title": "DEFINITIONS",
"content": "full section text..."
}}
]"""
response = self.client.messages.create(
model=settings.analysis_model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
# Parse response and create sections
import json
try:
sections_data = json.loads(response.content[0].text)
return [
ContractSection(
section_number=s["section_number"],
title=s["title"],
content=s["content"],
subsections=[],
page_numbers=[]
)
for s in sections_data
]
except json.JSONDecodeError:
return []
def _assign_page_numbers(
self,
sections: List[ContractSection],
full_text: str,
page_breaks: List[int]
) -> List[ContractSection]:
"""Assign page numbers to sections based on character positions."""
for section in sections:
start_pos = full_text.find(section.content[:100])
if start_pos != -1:
# Find which page(s) this section spans
for i, break_pos in enumerate(page_breaks):
if start_pos < break_pos:
section.page_numbers.append(i + 1)
break
return sectionsWhy Regex First, LLM Fallback?
┌─────────────────────────────────────────────────────────────┐
│ SECTION DETECTION STRATEGY │
├─────────────────────────────────────────────────────────────┤
│ │
│ Document ───► Regex Patterns ───┬───► Success (fewer than 3 │
│ │ found? Likely wrong) │
│ │ │
│ └───► LLM Fallback │
│ (structured prompt) │
│ │
│ PATTERNS RECOGNIZED: │
│ • "1. DEFINITIONS" (numbered all-caps) │
│ • "1.1 Term" (subsection) │
│ • "Article 1:" (article style) │
│ • "Section 1." (section style) │
│ • "ARTICLE IV:" (roman numerals) │
│ │
└─────────────────────────────────────────────────────────────┘| Approach | Cost | Latency | Accuracy |
|---|---|---|---|
| Regex only | $0 | ~10ms | 60-80% (fails on unusual formats) |
| LLM only | $0.02 | ~2s | 90-95% (expensive for every doc) |
| Hybrid | ~$0.005 avg | ~200ms avg | 95%+ (best of both) |
The "fewer than 3 sections" threshold catches cases where regex patterns don't match the document's formatting style.
Step 3: Clause Extraction
# src/extraction/clause_extractor.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from anthropic import Anthropic
import json
from ..config import settings
from ..ingestion.section_segmenter import ContractSection
class ClauseType(Enum):
TERMINATION = "termination"
LIABILITY = "liability"
INDEMNIFICATION = "indemnification"
CONFIDENTIALITY = "confidentiality"
IP_RIGHTS = "intellectual_property"
PAYMENT = "payment"
WARRANTY = "warranty"
FORCE_MAJEURE = "force_majeure"
DISPUTE_RESOLUTION = "dispute_resolution"
GOVERNING_LAW = "governing_law"
ASSIGNMENT = "assignment"
NOTICE = "notice"
AMENDMENT = "amendment"
DATA_PRIVACY = "data_privacy"
NON_COMPETE = "non_compete"
NON_SOLICITATION = "non_solicitation"
@dataclass
class ExtractedClause:
clause_type: ClauseType
title: str
full_text: str
key_terms: List[str]
obligations: List[Dict[str, str]]
dates: List[str]
monetary_values: List[str]
parties_mentioned: List[str]
section_reference: str
confidence: float
class ClauseExtractor:
"""Extract and classify clauses from contract sections."""
def __init__(self):
self.client = Anthropic(api_key=settings.anthropic_api_key)
def extract_clauses(
self,
sections: List[ContractSection]
) -> List[ExtractedClause]:
"""Extract clauses from all sections."""
all_clauses = []
for section in sections:
clauses = self._extract_from_section(section)
all_clauses.extend(clauses)
return all_clauses
def _extract_from_section(
self,
section: ContractSection
) -> List[ExtractedClause]:
"""Extract clauses from a single section."""
prompt = f"""Analyze this contract section and extract all legal clauses.
Section: {section.section_number} - {section.title}
Content:
{section.content}
For each clause found, extract:
1. clause_type: One of {[t.value for t in ClauseType]}
2. title: The clause title or heading
3. key_terms: Important defined terms used
4. obligations: List of obligations with {{party, action, condition}}
5. dates: Any dates or time periods mentioned
6. monetary_values: Any monetary amounts
7. parties_mentioned: Parties referenced in this clause
8. confidence: Your confidence in the classification (0-1)
Return as JSON array:
[
{{
"clause_type": "termination",
"title": "Termination for Convenience",
"key_terms": ["Notice Period", "Termination Fee"],
"obligations": [
{{"party": "Either Party", "action": "provide written notice", "condition": "30 days prior"}}
],
"dates": ["30 days"],
"monetary_values": [],
"parties_mentioned": ["Company", "Client"],
"confidence": 0.95
}}
]
If no standard clauses are found, return an empty array."""
response = self.client.messages.create(
model=settings.analysis_model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
try:
clauses_data = json.loads(response.content[0].text)
return [
ExtractedClause(
clause_type=ClauseType(c["clause_type"]),
title=c["title"],
full_text=section.content,
key_terms=c["key_terms"],
obligations=c["obligations"],
dates=c["dates"],
monetary_values=c["monetary_values"],
parties_mentioned=c["parties_mentioned"],
section_reference=f"{section.section_number} - {section.title}",
confidence=c["confidence"]
)
for c in clauses_data
if c.get("clause_type") in [t.value for t in ClauseType]
]
except (json.JSONDecodeError, KeyError, ValueError):
return []
def extract_key_dates(self, clauses: List[ExtractedClause]) -> List[Dict[str, Any]]:
"""Extract and consolidate all important dates."""
dates = []
for clause in clauses:
for date in clause.dates:
dates.append({
"date_text": date,
"clause_type": clause.clause_type.value,
"context": clause.title
})
return dates
def extract_obligations_summary(
self,
clauses: List[ExtractedClause],
party_name: str
) -> List[Dict[str, Any]]:
"""Get all obligations for a specific party."""
obligations = []
for clause in clauses:
for obligation in clause.obligations:
if party_name.lower() in obligation.get("party", "").lower():
obligations.append({
"action": obligation["action"],
"condition": obligation.get("condition", ""),
"clause_type": clause.clause_type.value,
"section": clause.section_reference
})
return obligationsUnderstanding Clause Extraction Output:
Each clause extracts multiple structured fields:
┌─────────────────────────────────────────────────────────────┐
│ EXTRACTED CLAUSE STRUCTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ clause_type: "termination" │
│ ├─ title: "Termination for Convenience" │
│ ├─ key_terms: ["Notice Period", "Termination Fee"] │
│ ├─ obligations: │
│ │ └─ {party: "Either Party", │
│ │ action: "provide written notice", │
│ │ condition: "30 days prior"} │
│ ├─ dates: ["30 days", "end of term"] │
│ ├─ monetary_values: ["$10,000 termination fee"] │
│ └─ parties_mentioned: ["Company", "Client"] │
│ │
└─────────────────────────────────────────────────────────────┘This structured extraction enables:
- Obligation tracking: Who owes what to whom
- Date aggregation: All deadlines in one view
- Party-specific queries: "What are Client's obligations?"
Step 4: Risk Analysis
# src/analysis/risk_analyzer.py
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
from anthropic import Anthropic
import json
from ..config import settings
from ..extraction.clause_extractor import ExtractedClause, ClauseType
class RiskLevel(Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class RiskCategory(Enum):
MISSING_CLAUSE = "missing_clause"
NON_STANDARD_TERM = "non_standard_term"
UNLIMITED_LIABILITY = "unlimited_liability"
ONE_SIDED_OBLIGATION = "one_sided_obligation"
AMBIGUOUS_LANGUAGE = "ambiguous_language"
COMPLIANCE_CONCERN = "compliance_concern"
UNFAVORABLE_TERM = "unfavorable_term"
@dataclass
class RiskItem:
risk_level: RiskLevel
category: RiskCategory
title: str
description: str
clause_reference: str
recommendation: str
affected_text: str
class RiskAnalyzer:
"""Analyze contracts for legal risks."""
def __init__(self):
self.client = Anthropic(api_key=settings.anthropic_api_key)
# Standard clauses that should be present
self.required_clauses = {
ClauseType.TERMINATION,
ClauseType.LIABILITY,
ClauseType.CONFIDENTIALITY,
ClauseType.GOVERNING_LAW,
ClauseType.DISPUTE_RESOLUTION
}
# Risk patterns to check
self.risk_patterns = {
"unlimited_liability": [
"unlimited liability",
"no limit on liability",
"shall be liable for all"
],
"unilateral_termination": [
"may terminate at any time",
"sole discretion to terminate",
"without cause"
],
"auto_renewal": [
"automatically renew",
"auto-renewal",
"shall renew unless"
]
}
def analyze(
self,
clauses: List[ExtractedClause],
contract_type: str = "service_agreement"
) -> List[RiskItem]:
"""Perform comprehensive risk analysis."""
risks = []
# Check for missing clauses
risks.extend(self._check_missing_clauses(clauses))
# Analyze each clause for risks
for clause in clauses:
risks.extend(self._analyze_clause(clause))
# LLM-based deep analysis
risks.extend(self._llm_risk_analysis(clauses, contract_type))
# Sort by risk level
risk_order = {RiskLevel.HIGH: 0, RiskLevel.MEDIUM: 1, RiskLevel.LOW: 2, RiskLevel.INFO: 3}
risks.sort(key=lambda r: risk_order[r.risk_level])
return risks
def _check_missing_clauses(
self,
clauses: List[ExtractedClause]
) -> List[RiskItem]:
"""Check for missing standard clauses."""
risks = []
found_types = {c.clause_type for c in clauses}
for required in self.required_clauses:
if required not in found_types:
risks.append(RiskItem(
risk_level=RiskLevel.HIGH if required in {
ClauseType.LIABILITY,
ClauseType.TERMINATION
} else RiskLevel.MEDIUM,
category=RiskCategory.MISSING_CLAUSE,
title=f"Missing {required.value.replace('_', ' ').title()} Clause",
description=f"The contract does not contain a standard {required.value} clause.",
clause_reference="N/A",
recommendation=f"Add a {required.value} clause to protect your interests.",
affected_text=""
))
return risks
def _analyze_clause(self, clause: ExtractedClause) -> List[RiskItem]:
"""Analyze a single clause for risks."""
risks = []
text_lower = clause.full_text.lower()
# Check for unlimited liability
if clause.clause_type == ClauseType.LIABILITY:
for pattern in self.risk_patterns["unlimited_liability"]:
if pattern in text_lower:
risks.append(RiskItem(
risk_level=RiskLevel.HIGH,
category=RiskCategory.UNLIMITED_LIABILITY,
title="Unlimited Liability Exposure",
description="The liability clause does not contain caps or limitations.",
clause_reference=clause.section_reference,
recommendation="Negotiate liability caps based on contract value or insurance limits.",
affected_text=clause.full_text[:500]
))
break
# Check for unilateral termination
if clause.clause_type == ClauseType.TERMINATION:
for pattern in self.risk_patterns["unilateral_termination"]:
if pattern in text_lower:
risks.append(RiskItem(
risk_level=RiskLevel.MEDIUM,
category=RiskCategory.ONE_SIDED_OBLIGATION,
title="Unilateral Termination Rights",
description="One party has disproportionate termination rights.",
clause_reference=clause.section_reference,
recommendation="Ensure both parties have equivalent termination rights.",
affected_text=clause.full_text[:500]
))
break
return risks
def _llm_risk_analysis(
self,
clauses: List[ExtractedClause],
contract_type: str
) -> List[RiskItem]:
"""Deep risk analysis using LLM."""
clauses_text = "\n\n".join([
f"[{c.clause_type.value}] {c.section_reference}\n{c.full_text[:1000]}"
for c in clauses
])
prompt = f"""As a legal expert, analyze these contract clauses for risks.
Contract type: {contract_type}
Clauses:
{clauses_text}
Identify risks in these categories:
- non_standard_term: Terms that deviate from market standards
- one_sided_obligation: Obligations heavily favoring one party
- ambiguous_language: Unclear terms that could cause disputes
- compliance_concern: Potential regulatory issues
- unfavorable_term: Terms that are commercially unfavorable
For each risk found, provide:
1. risk_level: high, medium, or low
2. category: one of the categories above
3. title: Brief risk title
4. description: Detailed explanation
5. clause_reference: Which clause this affects
6. recommendation: How to mitigate this risk
7. affected_text: The problematic text (first 200 chars)
Return as JSON array. Only include genuine risks, not informational notes."""
response = self.client.messages.create(
model=settings.analysis_model,
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
try:
risks_data = json.loads(response.content[0].text)
return [
RiskItem(
risk_level=RiskLevel(r["risk_level"]),
category=RiskCategory(r["category"]),
title=r["title"],
description=r["description"],
clause_reference=r["clause_reference"],
recommendation=r["recommendation"],
affected_text=r.get("affected_text", "")
)
for r in risks_data
]
except (json.JSONDecodeError, KeyError, ValueError):
return []
def calculate_risk_score(self, risks: List[RiskItem]) -> Dict[str, Any]:
"""Calculate overall risk score."""
weights = {
RiskLevel.HIGH: 10,
RiskLevel.MEDIUM: 5,
RiskLevel.LOW: 2,
RiskLevel.INFO: 0
}
total_score = sum(weights[r.risk_level] for r in risks)
max_possible = len(risks) * 10 if risks else 1
normalized_score = min(100, (total_score / max_possible) * 100)
return {
"score": round(normalized_score, 1),
"rating": self._get_rating(normalized_score),
"high_risks": len([r for r in risks if r.risk_level == RiskLevel.HIGH]),
"medium_risks": len([r for r in risks if r.risk_level == RiskLevel.MEDIUM]),
"low_risks": len([r for r in risks if r.risk_level == RiskLevel.LOW]),
"total_risks": len(risks)
}
def _get_rating(self, score: float) -> str:
"""Convert score to rating."""
if score >= 70:
return "High Risk"
elif score >= 40:
return "Medium Risk"
elif score >= 20:
return "Low Risk"
return "Minimal Risk"Understanding the Three-Layer Risk Analysis:
┌─────────────────────────────────────────────────────────────┐
│ RISK DETECTION LAYERS │
├─────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: MISSING CLAUSE CHECK │
│ ───────────────────────────── │
│ • Compare against required_clauses set │
│ • Missing termination/liability = HIGH RISK │
│ • Missing confidentiality = MEDIUM RISK │
│ │
│ Layer 2: PATTERN MATCHING │
│ ───────────────────────── │
│ • "unlimited liability" → HIGH RISK │
│ • "sole discretion to terminate" → MEDIUM RISK │
│ • "automatically renew" → INFO (flag for review) │
│ │
│ Layer 3: LLM DEEP ANALYSIS │
│ ───────────────────────────── │
│ • Detects nuanced issues pattern matching misses │
│ • Identifies one-sided obligations │
│ • Spots ambiguous language that could cause disputes │
│ │
└─────────────────────────────────────────────────────────────┘Risk Score Calculation:
| Risk Level | Weight | Example |
|---|---|---|
| HIGH | 10 | Missing liability clause |
| MEDIUM | 5 | Unilateral termination rights |
| LOW | 2 | Non-standard notice period |
| INFO | 0 | Auto-renewal clause (informational) |
Score = (Sum of weights) / (Count × 10) × 100, capped at 100
Step 5: Contract Comparison
# src/analysis/comparator.py
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
from enum import Enum
from anthropic import Anthropic
import json
from difflib import SequenceMatcher
from ..config import settings
from ..extraction.clause_extractor import ExtractedClause, ClauseType
class DifferenceType(Enum):
ADDED = "added"
REMOVED = "removed"
MODIFIED = "modified"
UNCHANGED = "unchanged"
@dataclass
class ClauseDifference:
clause_type: ClauseType
difference_type: DifferenceType
original_text: str
new_text: str
significance: str # high, medium, low
summary: str
class ContractComparator:
"""Compare contracts against templates or previous versions."""
def __init__(self):
self.client = Anthropic(api_key=settings.anthropic_api_key)
def compare_to_template(
self,
contract_clauses: List[ExtractedClause],
template_clauses: List[ExtractedClause]
) -> List[ClauseDifference]:
"""Compare contract against standard template."""
differences = []
# Index clauses by type
contract_by_type = {c.clause_type: c for c in contract_clauses}
template_by_type = {c.clause_type: c for c in template_clauses}
# Check all clause types
all_types = set(contract_by_type.keys()) | set(template_by_type.keys())
for clause_type in all_types:
contract_clause = contract_by_type.get(clause_type)
template_clause = template_by_type.get(clause_type)
if contract_clause and not template_clause:
# Added clause (not in template)
differences.append(ClauseDifference(
clause_type=clause_type,
difference_type=DifferenceType.ADDED,
original_text="",
new_text=contract_clause.full_text,
significance="medium",
summary=f"Non-standard {clause_type.value} clause added"
))
elif template_clause and not contract_clause:
# Missing clause
differences.append(ClauseDifference(
clause_type=clause_type,
difference_type=DifferenceType.REMOVED,
original_text=template_clause.full_text,
new_text="",
significance="high",
summary=f"Standard {clause_type.value} clause is missing"
))
elif contract_clause and template_clause:
# Compare content
similarity = self._calculate_similarity(
contract_clause.full_text,
template_clause.full_text
)
if similarity < 0.9: # Less than 90% similar
diff = self._analyze_difference(
clause_type,
template_clause.full_text,
contract_clause.full_text
)
differences.append(diff)
return differences
def compare_versions(
self,
old_clauses: List[ExtractedClause],
new_clauses: List[ExtractedClause]
) -> List[ClauseDifference]:
"""Compare two versions of the same contract."""
return self.compare_to_template(new_clauses, old_clauses)
def _calculate_similarity(self, text1: str, text2: str) -> float:
"""Calculate text similarity ratio."""
return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()
def _analyze_difference(
self,
clause_type: ClauseType,
original: str,
new: str
) -> ClauseDifference:
"""Analyze the significance of a difference."""
prompt = f"""Compare these two versions of a {clause_type.value} clause.
Original (Template):
{original[:2000]}
New (Contract):
{new[:2000]}
Analyze:
1. What are the key differences?
2. Is this modification significant? (high/medium/low)
3. Does this favor one party over another?
Return JSON:
{{
"significance": "high|medium|low",
"summary": "Brief description of the key differences and their impact",
"favors": "party_a|party_b|neutral"
}}"""
response = self.client.messages.create(
model=settings.analysis_model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
try:
analysis = json.loads(response.content[0].text)
return ClauseDifference(
clause_type=clause_type,
difference_type=DifferenceType.MODIFIED,
original_text=original,
new_text=new,
significance=analysis["significance"],
summary=analysis["summary"]
)
except (json.JSONDecodeError, KeyError):
return ClauseDifference(
clause_type=clause_type,
difference_type=DifferenceType.MODIFIED,
original_text=original,
new_text=new,
significance="medium",
summary="Clause has been modified from template"
)
def generate_redline(
self,
old_text: str,
new_text: str
) -> str:
"""Generate redline markup showing changes."""
# Simple word-level diff
old_words = old_text.split()
new_words = new_text.split()
matcher = SequenceMatcher(None, old_words, new_words)
result = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
result.extend(old_words[i1:i2])
elif tag == 'delete':
result.append(f"~~{' '.join(old_words[i1:i2])}~~")
elif tag == 'insert':
result.append(f"**{' '.join(new_words[j1:j2])}**")
elif tag == 'replace':
result.append(f"~~{' '.join(old_words[i1:i2])}~~")
result.append(f"**{' '.join(new_words[j1:j2])}**")
return ' '.join(result)Why Contract Comparison Matters:
Legal teams review contracts against templates to catch non-standard terms:
┌─────────────────────────────────────────────────────────────┐
│ COMPARISON WORKFLOW │
├─────────────────────────────────────────────────────────────┤
│ │
│ Standard Template ◄────────────────► Incoming Contract │
│ Compare │
│ │ │
│ ▼ │
│ ┌────────────────────────────────┐ │
│ │ Differences Detected: │ │
│ │ • ADDED: Non-compete clause │ ← Not in │
│ │ (significance: high) │ template │
│ │ │ │
│ │ • REMOVED: Liability cap │ ← Missing │
│ │ (significance: high) │ standard │
│ │ │ protection │
│ │ • MODIFIED: Payment terms │ ← Changed │
│ │ (significance: med) │ from 30→60 │
│ └────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘The 90% similarity threshold catches meaningful changes while ignoring minor rewording.
Step 6: RAG Pipeline for Q&A
# src/generation/rag_pipeline.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import weaviate
from anthropic import Anthropic
from openai import OpenAI
from ..config import settings
from ..extraction.clause_extractor import ExtractedClause
@dataclass
class ContractAnswer:
answer: str
confidence: float
sources: List[Dict[str, Any]]
relevant_clauses: List[str]
class ContractRAG:
"""RAG pipeline for contract Q&A."""
def __init__(self):
self.anthropic = Anthropic(api_key=settings.anthropic_api_key)
self.openai = OpenAI(api_key=settings.openai_api_key)
self.weaviate_client = weaviate.Client(settings.weaviate_url)
self._ensure_schema()
def _ensure_schema(self):
"""Ensure Weaviate schema exists."""
schema = {
"class": settings.weaviate_class,
"vectorizer": "none", # We provide our own vectors
"properties": [
{"name": "content", "dataType": ["text"]},
{"name": "clause_type", "dataType": ["string"]},
{"name": "section_reference", "dataType": ["string"]},
{"name": "contract_id", "dataType": ["string"]},
{"name": "key_terms", "dataType": ["string[]"]}
]
}
try:
self.weaviate_client.schema.create_class(schema)
except weaviate.exceptions.UnexpectedStatusCodeException:
pass # Schema already exists
def index_clauses(
self,
clauses: List[ExtractedClause],
contract_id: str
):
"""Index extracted clauses for retrieval."""
for clause in clauses:
# Generate embedding
embedding = self._embed(clause.full_text)
# Store in Weaviate
self.weaviate_client.data_object.create(
class_name=settings.weaviate_class,
data_object={
"content": clause.full_text,
"clause_type": clause.clause_type.value,
"section_reference": clause.section_reference,
"contract_id": contract_id,
"key_terms": clause.key_terms
},
vector=embedding
)
def query(
self,
question: str,
contract_id: str,
top_k: int = 5
) -> ContractAnswer:
"""Answer a question about the contract."""
# Retrieve relevant clauses
query_embedding = self._embed(question)
results = (
self.weaviate_client.query
.get(settings.weaviate_class, ["content", "clause_type", "section_reference"])
.with_near_vector({"vector": query_embedding})
.with_where({
"path": ["contract_id"],
"operator": "Equal",
"valueString": contract_id
})
.with_limit(top_k)
.do()
)
clauses = results.get("data", {}).get("Get", {}).get(settings.weaviate_class, [])
if not clauses:
return ContractAnswer(
answer="I couldn't find relevant information in the contract to answer this question.",
confidence=0.0,
sources=[],
relevant_clauses=[]
)
# Generate answer
context = "\n\n".join([
f"[{c['clause_type']}] {c['section_reference']}:\n{c['content']}"
for c in clauses
])
prompt = f"""Based on the following contract clauses, answer the question.
Contract Clauses:
{context}
Question: {question}
Instructions:
1. Answer based ONLY on the provided contract text
2. Quote relevant sections when applicable
3. If the answer is not in the contract, say so
4. Be precise and legally accurate
Provide your answer in JSON format:
{{
"answer": "Your detailed answer",
"confidence": 0.0-1.0,
"relevant_sections": ["section references used"]
}}"""
response = self.anthropic.messages.create(
model=settings.analysis_model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
try:
import json
result = json.loads(response.content[0].text)
return ContractAnswer(
answer=result["answer"],
confidence=result["confidence"],
sources=[
{"clause_type": c["clause_type"], "reference": c["section_reference"]}
for c in clauses
],
relevant_clauses=result.get("relevant_sections", [])
)
except (json.JSONDecodeError, KeyError):
return ContractAnswer(
answer=response.content[0].text,
confidence=0.7,
sources=[
{"clause_type": c["clause_type"], "reference": c["section_reference"]}
for c in clauses
],
relevant_clauses=[]
)
def _embed(self, text: str) -> List[float]:
"""Generate embedding for text."""
response = self.openai.embeddings.create(
model=settings.embedding_model,
input=text
)
return response.data[0].embeddingWhy Store Clauses in Weaviate (Not Full Documents)?
┌─────────────────────────────────────────────────────────────┐
│ CLAUSE-LEVEL vs DOCUMENT-LEVEL INDEXING │
├─────────────────────────────────────────────────────────────┤
│ │
│ DOCUMENT-LEVEL (naive approach): │
│ Query: "What is the termination notice period?" │
│ Retrieved: Whole document → LLM searches through 50 pages │
│ Problem: Context window limits, slow, expensive │
│ │
│ CLAUSE-LEVEL (this approach): │
│ Query: "What is the termination notice period?" │
│ Retrieved: Just termination clause (~500 tokens) │
│ Benefit: Precise, fast, cheap │
│ │
└─────────────────────────────────────────────────────────────┘Each clause is indexed with rich metadata for filtered retrieval:
clause_type: Filter to specific clause categoriescontract_id: Scope queries to one contractkey_terms: Boost retrieval for defined terms
Step 7: FastAPI Application
# src/api/main.py
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import tempfile
import uuid
from pathlib import Path
from ..config import settings
from ..ingestion.pdf_parser import PDFParser
from ..ingestion.section_segmenter import SectionSegmenter
from ..extraction.clause_extractor import ClauseExtractor
from ..analysis.risk_analyzer import RiskAnalyzer
from ..analysis.comparator import ContractComparator
from ..generation.rag_pipeline import ContractRAG
app = FastAPI(
title="Legal Contract Analysis Platform",
description="AI-powered contract review and risk analysis",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
# Initialize components
pdf_parser = PDFParser()
segmenter = SectionSegmenter()
clause_extractor = ClauseExtractor()
risk_analyzer = RiskAnalyzer()
comparator = ContractComparator()
rag = ContractRAG()
class AnalysisResponse(BaseModel):
contract_id: str
filename: str
total_pages: int
clauses_found: int
risk_score: Dict[str, Any]
risks: List[Dict[str, Any]]
clauses: List[Dict[str, Any]]
class QuestionRequest(BaseModel):
contract_id: str
question: str
class CompareRequest(BaseModel):
contract_id: str
template_id: str
@app.post("/api/contracts/analyze", response_model=AnalysisResponse)
async def analyze_contract(
file: UploadFile = File(...),
contract_type: str = "service_agreement"
):
"""Analyze a contract document."""
contract_id = str(uuid.uuid4())
# Save uploaded file
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
# Parse PDF
parsed = pdf_parser.parse(tmp_path)
# Combine all page text
full_text = "\n\n".join([p.text for p in parsed.pages])
# Segment into sections
sections = segmenter.segment(full_text)
# Extract clauses
clauses = clause_extractor.extract_clauses(sections)
# Index for RAG
rag.index_clauses(clauses, contract_id)
# Analyze risks
risks = risk_analyzer.analyze(clauses, contract_type)
risk_score = risk_analyzer.calculate_risk_score(risks)
return AnalysisResponse(
contract_id=contract_id,
filename=file.filename,
total_pages=parsed.total_pages,
clauses_found=len(clauses),
risk_score=risk_score,
risks=[
{
"level": r.risk_level.value,
"category": r.category.value,
"title": r.title,
"description": r.description,
"recommendation": r.recommendation,
"clause_reference": r.clause_reference
}
for r in risks
],
clauses=[
{
"type": c.clause_type.value,
"title": c.title,
"section": c.section_reference,
"key_terms": c.key_terms,
"confidence": c.confidence
}
for c in clauses
]
)
finally:
# Cleanup temp file
Path(tmp_path).unlink(missing_ok=True)
@app.post("/api/contracts/question")
async def ask_question(request: QuestionRequest):
"""Ask a question about a contract."""
answer = rag.query(
question=request.question,
contract_id=request.contract_id
)
return {
"answer": answer.answer,
"confidence": answer.confidence,
"sources": answer.sources,
"relevant_clauses": answer.relevant_clauses
}
@app.get("/api/contracts/{contract_id}/summary")
async def get_summary(contract_id: str):
"""Get executive summary of contract."""
# Query for key information
key_questions = [
"What are the main parties to this agreement?",
"What is the term of this agreement?",
"What are the key payment terms?",
"What are the termination conditions?",
"What are the liability limitations?"
]
summary = {}
for question in key_questions:
answer = rag.query(question, contract_id)
key = question.split("?")[0].replace("What are the ", "").replace("What is the ", "")
summary[key.lower().replace(" ", "_")] = answer.answer
return {"contract_id": contract_id, "summary": summary}
@app.get("/api/health")
async def health_check():
return {"status": "healthy"}Step 8: Docker Deployment
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- WEAVIATE_URL=http://weaviate:8080
depends_on:
- weaviate
volumes:
- ./templates:/app/templates
weaviate:
image: semitechnologies/weaviate:1.23.0
ports:
- "8080:8080"
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'none'
volumes:
- weaviate_data:/var/lib/weaviate
volumes:
weaviate_data:# requirements.txt
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
pydantic-settings==2.1.0
anthropic==0.18.0
openai==1.10.0
weaviate-client==4.4.0
PyMuPDF==1.23.8
python-multipart==0.0.6
Pillow==10.2.0Usage Example
import requests
# Upload and analyze a contract
with open("contract.pdf", "rb") as f:
response = requests.post(
"http://localhost:8000/api/contracts/analyze",
files={"file": f},
params={"contract_type": "service_agreement"}
)
result = response.json()
print(f"Contract ID: {result['contract_id']}")
print(f"Risk Score: {result['risk_score']['score']} ({result['risk_score']['rating']})")
print(f"High Risks: {result['risk_score']['high_risks']}")
# Ask a question
response = requests.post(
"http://localhost:8000/api/contracts/question",
json={
"contract_id": result["contract_id"],
"question": "What is the notice period for termination?"
}
)
answer = response.json()
print(f"Answer: {answer['answer']}")Clause Types Covered
| Category | Clauses |
|---|---|
| Core Terms | Parties, Term, Renewal, Termination |
| Financial | Payment, Pricing, Penalties, Expenses |
| Liability | Indemnification, Limitation, Insurance |
| IP Rights | Ownership, License, Confidentiality |
| Compliance | Governing Law, Dispute Resolution, Force Majeure |
| Data | Privacy, Security, Data Processing |
Business Impact
| Metric | Improvement |
|---|---|
| Contract Review Time | 80% reduction |
| Risk Detection | 95% accuracy |
| Clause Extraction | 98% recall |
| Legal Team Capacity | 5x increase |
| Deal Velocity | 3x faster |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Section Segmentation | Regex + LLM fallback to split contracts | Preserves clause boundaries for accurate extraction |
| Clause Classification | Categorize by type (termination, liability, etc.) | Enables filtered retrieval and risk analysis |
| Three-Layer Risk Analysis | Missing → Pattern → LLM deep analysis | Catches obvious issues fast, nuanced issues with LLM |
| Template Comparison | Diff against standard templates | Identifies non-standard and missing protections |
| Clause-Level Indexing | Store clauses, not documents, in vector DB | Precise retrieval, lower cost, faster responses |
| Structured Extraction | Obligations, dates, monetary values | Powers party-specific queries and deadline tracking |
| Signature Detection | Identify executed vs. draft contracts | Affects risk severity (executed = less negotiable) |
| Redline Generation | Word-level diff with strikethrough/bold | Visual comparison for legal review |
Prerequisites
Before starting this case study, complete: