Content Moderation System
Build an AI-powered content moderation platform for user-generated content with multi-tier classification and human review workflows
Content Moderation System
Build a production-grade content moderation system that automatically detects and filters harmful content across text, images, and user behavior patterns.
Properties
| Property | Value |
|---|---|
| Difficulty | Advanced |
| Time | ~5 days |
| Industry | Social Media / UGC Platforms |
| LOC | ~1400 |
TL;DR
Build a multi-tier moderation system using fast heuristics (regex, blocklists) for obvious violations, ML classifiers (toxicity, spam detection) for nuanced content, and LLM analysis (GPT-4) for borderline cases requiring context understanding. Uses priority queuing for human review with SLA tracking.
The Challenge
User-generated content platforms face critical moderation challenges:
| Challenge | Impact |
|---|---|
| Scale | Millions of posts per day requiring review |
| Speed | Harmful content must be caught within seconds |
| Accuracy | False positives alienate users; false negatives cause harm |
| Context | Sarcasm, cultural references, coded language |
| Evolution | New harmful patterns emerge constantly |
System Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ CONTENT MODERATION ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CONTENT INPUT │ │
│ │ User Content ───┬─── Reported Content ───┬─── Bulk Historical │ │
│ └───────────────────┴────────────────────────┴────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TIER 1: FAST CLASSIFICATION │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │Heuristic Matcher│──► │ Toxicity Model │ │ │
│ │ │ (Blocklist/ │ │ (ML-based) │ │ │
│ │ │ Patterns) │ └────────┬────────┘ │ │
│ │ └────────┬────────┘ │ │ │
│ │ │ ▼ │ │
│ │ [Block] ┌─────────────────┐ │ │
│ │ │ │ Spam Classifier │ │ │
│ │ ▼ │ (Behavioral) │ │ │
│ │ Auto-Block └────────┬────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────┼─────────────────┐ │ │
│ │ ▼ ▼ ▼ │ │
│ │ [Not Spam] [Uncertain] [Spam] │ │
│ │ │ │ │ │ │
│ │ ▼ │ ▼ │ │
│ │ Auto-Approve │ Auto-Block │ │
│ └──────────────────────────────────────┼──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TIER 2: LLM ANALYSIS │ │
│ │ Context Analyzer ──► Policy Analyzer ──► Severity Scorer │ │
│ │ │ │ │
│ │ ┌──────────────────────────┼──────────────┐ │ │
│ │ ▼ ▼ ▼ │ │
│ │ [Safe] [Borderline] [Violating]│ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ACTION LAYER │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │Auto-Approve│ │Human Review│ │ Auto-Block │ │ Appeal │ │ │
│ │ │ │ │ Queue │ │ │ │ Handler │ │ │
│ │ └────────────┘ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │ │
│ └──────────────────────────┴───────────────┴───────────────┴──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LEARNING LOOP │ │
│ │ Moderator Labels ──► Retrain Pipeline ──► Policy Updates │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Technology Stack
| Component | Technology | Purpose |
|---|---|---|
| Framework | LangChain | LLM orchestration |
| Fast Models | Detoxify, transformers | Real-time classification |
| LLM | GPT-4o | Context-aware analysis |
| Queue | Redis + Celery | Async processing |
| Database | PostgreSQL | Content & decisions |
| Cache | Redis | Rate limiting, dedup |
| Monitoring | Prometheus + Grafana | Metrics & alerts |
Implementation
Project Structure
content_moderation/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── config.py # Configuration
│ ├── models/
│ │ ├── __init__.py
│ │ ├── content.py # Content models
│ │ └── decisions.py # Moderation decisions
│ ├── classifiers/
│ │ ├── __init__.py
│ │ ├── heuristic.py # Pattern matching
│ │ ├── toxicity.py # Toxicity detection
│ │ ├── spam.py # Spam classification
│ │ └── llm_analyzer.py # LLM-based analysis
│ ├── services/
│ │ ├── __init__.py
│ │ ├── moderation.py # Main orchestrator
│ │ ├── queue.py # Review queue
│ │ └── appeals.py # Appeal handling
│ ├── policies/
│ │ ├── __init__.py
│ │ └── community.py # Policy definitions
│ └── api/
│ ├── __init__.py
│ └── routes.py # API endpoints
├── workers/
│ └── moderation_worker.py # Celery workers
├── tests/
└── docker-compose.ymlContent Models
# app/models/content.py
from enum import Enum
from datetime import datetime
from pydantic import BaseModel, Field
from typing import Optional
class ContentType(str, Enum):
TEXT = "text"
IMAGE = "image"
VIDEO = "video"
COMMENT = "comment"
PROFILE = "profile"
class ModerationAction(str, Enum):
APPROVE = "approve"
BLOCK = "block"
HUMAN_REVIEW = "human_review"
SHADOW_BAN = "shadow_ban"
WARNING = "warning"
class ViolationType(str, Enum):
NONE = "none"
HATE_SPEECH = "hate_speech"
HARASSMENT = "harassment"
VIOLENCE = "violence"
SEXUAL = "sexual_content"
SPAM = "spam"
MISINFORMATION = "misinformation"
SELF_HARM = "self_harm"
ILLEGAL = "illegal_activity"
PII = "personal_information"
class ContentItem(BaseModel):
"""Content submitted for moderation."""
id: str
user_id: str
content_type: ContentType
text: Optional[str] = None
media_url: Optional[str] = None
metadata: dict = Field(default_factory=dict)
context: dict = Field(default_factory=dict) # Thread context, reply-to, etc.
submitted_at: datetime = Field(default_factory=datetime.utcnow)
class ClassificationResult(BaseModel):
"""Result from a single classifier."""
classifier: str
violation_type: ViolationType
confidence: float
details: dict = Field(default_factory=dict)
processing_time_ms: float
class ModerationDecision(BaseModel):
"""Final moderation decision."""
content_id: str
action: ModerationAction
violation_types: list[ViolationType]
confidence: float
reasoning: str
classifications: list[ClassificationResult]
requires_appeal: bool = False
reviewed_by: Optional[str] = None # "auto" or moderator_id
decided_at: datetime = Field(default_factory=datetime.utcnow)Understanding the Data Model:
┌─────────────────────────────────────────────────────────────┐
│ MODERATION DATA FLOW │
├─────────────────────────────────────────────────────────────┤
│ │
│ ContentItem │
│ (user post) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ClassificationResult[] (one per classifier) │ │
│ │ • heuristic: SPAM, confidence=0.95 │ │
│ │ • toxicity: NONE, confidence=0.12 │ │
│ │ • spam: SPAM, confidence=0.87 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ModerationDecision │
│ (final verdict) │
│ • action: BLOCK │
│ • violations: [SPAM] │
│ • confidence: 0.95 │
│ • reviewed_by: "auto" or "moderator_123" │
│ │
└─────────────────────────────────────────────────────────────┘| Field | Purpose |
|---|---|
violation_type | Enum of 10 categories - enables routing to specialized reviewers |
confidence | 0-1 score - drives auto-approve/block vs human review decision |
processing_time_ms | Track latency per classifier for optimization |
requires_appeal | Auto-set when blocked - enables appeal workflow |
Heuristic Classifier (Tier 1)
# app/classifiers/heuristic.py
import re
from typing import Optional
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType
)
class HeuristicClassifier:
"""
Fast pattern-based classifier for obvious violations.
Catches ~30% of violations with >99% precision.
"""
def __init__(self):
# Load from external config in production
self._load_patterns()
def _load_patterns(self):
"""Load blocklist patterns."""
# Exact match blocklist (hashed in production)
self.blocklist = set([
# Known slurs and variations (simplified for example)
# In production: load from encrypted database
])
# Regex patterns for common violations
self.patterns = {
ViolationType.SPAM: [
r"(?i)buy\s+now.*\d+%\s+off",
r"(?i)click\s+here.*free",
r"(?i)(telegram|whatsapp).*\+\d{10,}",
r"(?i)earn\s+\$\d+.*per\s+(day|hour|week)",
],
ViolationType.PII: [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # Credit card
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # Email
],
ViolationType.VIOLENCE: [
r"(?i)(kill|murder|attack)\s+(all|every)\s+\w+",
r"(?i)bomb\s+threat",
],
ViolationType.SELF_HARM: [
r"(?i)(suicide|kill myself).*method",
r"(?i)how\s+to\s+(cut|harm)\s+myself",
],
}
# Compile patterns
self.compiled_patterns = {
vtype: [re.compile(p) for p in patterns]
for vtype, patterns in self.patterns.items()
}
def classify(self, content: ContentItem) -> Optional[ClassificationResult]:
"""
Fast heuristic classification.
Returns result only if confident match found.
"""
import time
start = time.time()
if not content.text:
return None
text = content.text.lower()
# Check blocklist
words = set(text.split())
if words & self.blocklist:
return ClassificationResult(
classifier="heuristic",
violation_type=ViolationType.HATE_SPEECH,
confidence=0.99,
details={"match_type": "blocklist"},
processing_time_ms=(time.time() - start) * 1000
)
# Check patterns
for vtype, patterns in self.compiled_patterns.items():
for pattern in patterns:
if pattern.search(content.text):
return ClassificationResult(
classifier="heuristic",
violation_type=vtype,
confidence=0.95,
details={
"match_type": "pattern",
"pattern": pattern.pattern
},
processing_time_ms=(time.time() - start) * 1000
)
return None # No confident matchWhy Heuristics First:
┌─────────────────────────────────────────────────────────────┐
│ HEURISTIC CLASSIFIER ROLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Catches ~30% of violations with >99% precision │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ BLOCKLIST (exact match) │ │
│ │ • Known slurs and variations │ │
│ │ • Processing: O(1) hash lookup │ │
│ │ • Confidence: 0.99 (near-certain) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ No match │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PATTERN MATCHING (regex) │ │
│ │ • SPAM: "buy now.*\d+% off" │ │
│ │ • PII: SSN, credit card, email patterns │ │
│ │ • VIOLENCE: "bomb threat", "kill all" │ │
│ │ • Processing: Compiled regex, ~1ms │ │
│ │ • Confidence: 0.95 (high certainty) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ No match │
│ ▼ │
│ Pass to ML classifiers (more expensive) │
│ │
└─────────────────────────────────────────────────────────────┘| Approach | Latency | Accuracy | Cost |
|---|---|---|---|
| Heuristics | ~1ms | 99% precision | Free |
| ML Classifier | ~50ms | 95% precision | Compute |
| LLM | ~1-2s | 90% precision | $$$ API |
Why this order matters: Stop early for obvious violations → save ML/LLM costs.
Toxicity Classifier (Tier 1)
# app/classifiers/toxicity.py
import time
from typing import Optional
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType
)
class ToxicityClassifier:
"""
ML-based toxicity detection using Detoxify/similar models.
Handles nuanced toxicity that heuristics miss.
"""
TOXICITY_THRESHOLD = 0.7
SEVERE_THRESHOLD = 0.85
# Map model labels to violation types
LABEL_MAPPING = {
"toxic": ViolationType.HARASSMENT,
"severe_toxic": ViolationType.HARASSMENT,
"obscene": ViolationType.SEXUAL,
"threat": ViolationType.VIOLENCE,
"insult": ViolationType.HARASSMENT,
"identity_attack": ViolationType.HATE_SPEECH,
"sexual_explicit": ViolationType.SEXUAL,
}
def __init__(self, model_name: str = "unitary/toxic-bert"):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(
model_name
).to(self.device)
self.model.eval()
# Get label names from model config
self.labels = list(self.model.config.id2label.values())
def classify(self, content: ContentItem) -> Optional[ClassificationResult]:
"""Classify content for toxicity."""
start = time.time()
if not content.text:
return None
# Tokenize
inputs = self.tokenizer(
content.text,
return_tensors="pt",
truncation=True,
max_length=512
).to(self.device)
# Predict
with torch.no_grad():
outputs = self.model(**inputs)
probs = torch.sigmoid(outputs.logits)[0].cpu().numpy()
# Map to label scores
scores = {
label: float(prob)
for label, prob in zip(self.labels, probs)
}
# Find highest violation
max_label = max(scores, key=scores.get)
max_score = scores[max_label]
processing_time = (time.time() - start) * 1000
if max_score < self.TOXICITY_THRESHOLD:
return ClassificationResult(
classifier="toxicity",
violation_type=ViolationType.NONE,
confidence=1 - max_score,
details={"scores": scores},
processing_time_ms=processing_time
)
# Map to violation type
violation_type = self.LABEL_MAPPING.get(
max_label,
ViolationType.HARASSMENT
)
return ClassificationResult(
classifier="toxicity",
violation_type=violation_type,
confidence=max_score,
details={
"scores": scores,
"primary_label": max_label,
"is_severe": max_score >= self.SEVERE_THRESHOLD
},
processing_time_ms=processing_time
)Why ML for Toxicity:
┌─────────────────────────────────────────────────────────────┐
│ TOXICITY DETECTION │
├─────────────────────────────────────────────────────────────┤
│ │
│ "You're an absolute genius" ← sarcasm? sincere? │
│ "This policy is killing our community" ← metaphor │
│ │
│ Heuristics can't handle nuance. ML models trained on │
│ millions of labeled examples understand context better. │
│ │
│ Model: unitary/toxic-bert (or Detoxify) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Multi-label classification │ │
│ │ │ │
│ │ Input: "You're worthless and should quit" │ │
│ │ │ │
│ │ Output (sigmoid probabilities): │ │
│ │ • toxic: 0.89 │ │
│ │ • severe_toxic: 0.23 │ │
│ │ • insult: 0.91 ← highest │ │
│ │ • threat: 0.02 │ │
│ │ • identity_attack: 0.05 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Decision: │
│ • max_score = 0.91 (insult) → HARASSMENT │
│ • 0.91 > 0.85 SEVERE_THRESHOLD → is_severe = true │
│ │
└─────────────────────────────────────────────────────────────┘| Threshold | Value | Effect |
|---|---|---|
TOXICITY_THRESHOLD | 0.7 | Below this = NONE (safe) |
SEVERE_THRESHOLD | 0.85 | Above this = severe (auto-block candidate) |
Spam Classifier (Tier 1)
# app/classifiers/spam.py
import time
import hashlib
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Optional
import redis
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType
)
class SpamClassifier:
"""
Spam detection using behavioral signals and content analysis.
"""
DUPLICATE_THRESHOLD = 0.8
RATE_LIMIT_WINDOW = 60 # seconds
RATE_LIMIT_MAX = 10 # posts per window
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Spam indicators
self.spam_signals = {
"excessive_caps": lambda t: sum(1 for c in t if c.isupper()) / max(len(t), 1) > 0.5,
"excessive_emoji": lambda t: t.count("") > 10, # placeholder
"link_heavy": lambda t: t.count("http") > 3,
"repetitive_chars": lambda t: any(c * 5 in t for c in set(t)),
}
def _content_hash(self, text: str) -> str:
"""Create hash for deduplication."""
normalized = " ".join(text.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def _check_rate_limit(self, user_id: str) -> tuple[bool, int]:
"""Check if user is posting too frequently."""
key = f"rate:{user_id}"
pipe = self.redis.pipeline()
now = datetime.utcnow().timestamp()
window_start = now - self.RATE_LIMIT_WINDOW
# Remove old entries and count
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, self.RATE_LIMIT_WINDOW * 2)
_, count, _, _ = pipe.execute()
return count >= self.RATE_LIMIT_MAX, count
def _check_duplicate(self, content: ContentItem) -> tuple[bool, int]:
"""Check for duplicate/near-duplicate content."""
if not content.text:
return False, 0
content_hash = self._content_hash(content.text)
key = f"content_hash:{content_hash}"
# Increment and get count
count = self.redis.incr(key)
self.redis.expire(key, 3600) # 1 hour window
return count > 1, count
def classify(self, content: ContentItem) -> Optional[ClassificationResult]:
"""Classify content for spam."""
start = time.time()
signals_detected = []
spam_score = 0.0
# Check rate limiting
rate_limited, post_count = self._check_rate_limit(content.user_id)
if rate_limited:
signals_detected.append("rate_limit_exceeded")
spam_score += 0.4
# Check duplicates
is_duplicate, dup_count = self._check_duplicate(content)
if is_duplicate:
signals_detected.append(f"duplicate_content_{dup_count}")
spam_score += 0.3 * min(dup_count, 3)
# Check content signals
if content.text:
for signal_name, check_fn in self.spam_signals.items():
try:
if check_fn(content.text):
signals_detected.append(signal_name)
spam_score += 0.15
except Exception:
pass
processing_time = (time.time() - start) * 1000
# Cap at 1.0
spam_score = min(spam_score, 1.0)
if spam_score < 0.3:
return ClassificationResult(
classifier="spam",
violation_type=ViolationType.NONE,
confidence=1 - spam_score,
details={"signals": signals_detected},
processing_time_ms=processing_time
)
return ClassificationResult(
classifier="spam",
violation_type=ViolationType.SPAM,
confidence=spam_score,
details={
"signals": signals_detected,
"post_count": post_count,
"duplicate_count": dup_count
},
processing_time_ms=processing_time
)Behavioral Spam Detection:
┌─────────────────────────────────────────────────────────────┐
│ SPAM DETECTION SIGNALS │
├─────────────────────────────────────────────────────────────┤
│ │
│ USER BEHAVIOR (Redis tracking) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Rate Limiting │ │
│ │ • Track posts per minute per user (sorted set) │ │
│ │ • 10+ posts/minute → likely bot │ │
│ │ • Score: +0.4 │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Duplicate Detection │ │
│ │ • Hash content, track occurrences │ │
│ │ • Same content posted 3x → spam │ │
│ │ • Score: +0.3 per duplicate │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ CONTENT SIGNALS │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • excessive_caps: "BUY NOW!!!" │ │
│ │ • excessive_emoji: 10+ emojis │ │
│ │ • link_heavy: 3+ HTTP links │ │
│ │ • repetitive_chars: "helloooooo" │ │
│ │ • Score: +0.15 each │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Total Score (capped at 1.0): │
│ • Below 0.3 → NOT SPAM │
│ • 0.3+ → SPAM (confidence = score) │
│ │
└─────────────────────────────────────────────────────────────┘Why Redis for spam detection:
- Sorted sets for rate limiting with automatic time-window expiry
- Increment + expire for duplicate counting
- Sub-millisecond lookups for real-time decisions
LLM Policy Analyzer (Tier 2)
# app/classifiers/llm_analyzer.py
import time
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType,
ModerationAction
)
from ..policies.community import CommunityGuidelines
class LLMAnalysis(BaseModel):
"""Structured output from LLM analysis."""
violates_policy: bool = Field(
description="Whether content violates community guidelines"
)
violation_types: list[str] = Field(
default_factory=list,
description="List of violation categories"
)
severity: str = Field(
description="Severity level: low, medium, high, critical"
)
confidence: float = Field(
ge=0, le=1,
description="Confidence in this assessment"
)
reasoning: str = Field(
description="Detailed reasoning for the decision"
)
context_matters: bool = Field(
description="Whether context significantly affects interpretation"
)
recommended_action: str = Field(
description="Recommended action: approve, warn, remove, ban"
)
class LLMPolicyAnalyzer:
"""
LLM-based content analysis for nuanced policy violations.
Used for borderline cases that fast classifiers can't handle.
"""
SEVERITY_TO_ACTION = {
"low": ModerationAction.WARNING,
"medium": ModerationAction.HUMAN_REVIEW,
"high": ModerationAction.BLOCK,
"critical": ModerationAction.BLOCK,
}
def __init__(
self,
model: str = "gpt-4o",
guidelines: Optional[CommunityGuidelines] = None
):
self.llm = ChatOpenAI(model=model, temperature=0)
self.guidelines = guidelines or CommunityGuidelines()
self.parser = PydanticOutputParser(pydantic_object=LLMAnalysis)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are a content moderation expert. Analyze user-generated
content against community guidelines and determine if it violates policies.
COMMUNITY GUIDELINES:
{guidelines}
IMPORTANT CONSIDERATIONS:
1. Context matters - consider replies, threads, cultural context
2. Distinguish between harmful content and legitimate discussion OF harmful topics
3. Consider intent - education, news, satire vs. genuine harm
4. Err on the side of caution for content targeting individuals
5. Consider the platform context (this is a {platform_type})
VIOLATION CATEGORIES:
- hate_speech: Attacks on protected groups
- harassment: Targeted attacks on individuals
- violence: Threats, glorification, or instructions for violence
- sexual_content: Explicit sexual content (context-dependent)
- spam: Commercial spam, scams, manipulation
- misinformation: Demonstrably false claims causing harm
- self_harm: Promotion or instructions for self-harm
- illegal_activity: Illegal products, services, or activities
- personal_information: Doxxing, sharing private info
{format_instructions}"""),
("human", """Analyze this content:
CONTENT TYPE: {content_type}
USER ID: {user_id}
CONTENT: {text}
CONTEXT (if available):
{context}
Previous classifications from automated systems:
{prior_classifications}
Provide your analysis.""")
])
def analyze(
self,
content: ContentItem,
prior_classifications: list[ClassificationResult],
platform_type: str = "social media platform"
) -> ClassificationResult:
"""Perform deep LLM analysis on content."""
start = time.time()
# Format prior classifications
prior_str = "\n".join([
f"- {c.classifier}: {c.violation_type.value} (confidence: {c.confidence:.2f})"
for c in prior_classifications
]) or "None"
# Format context
context_str = "\n".join([
f"- {k}: {v}" for k, v in content.context.items()
]) or "No additional context"
# Build prompt
chain = self.prompt | self.llm | self.parser
try:
result: LLMAnalysis = chain.invoke({
"guidelines": self.guidelines.to_text(),
"platform_type": platform_type,
"format_instructions": self.parser.get_format_instructions(),
"content_type": content.content_type.value,
"user_id": content.user_id,
"text": content.text or "[No text - media only]",
"context": context_str,
"prior_classifications": prior_str,
})
# Map to violation type
violation_type = ViolationType.NONE
if result.violates_policy and result.violation_types:
# Take the most severe violation
for vt in result.violation_types:
try:
violation_type = ViolationType(vt)
break
except ValueError:
continue
return ClassificationResult(
classifier="llm_analyzer",
violation_type=violation_type,
confidence=result.confidence,
details={
"severity": result.severity,
"reasoning": result.reasoning,
"context_matters": result.context_matters,
"recommended_action": result.recommended_action,
"all_violations": result.violation_types,
},
processing_time_ms=(time.time() - start) * 1000
)
except Exception as e:
# Fallback for LLM errors
return ClassificationResult(
classifier="llm_analyzer",
violation_type=ViolationType.NONE,
confidence=0.0,
details={"error": str(e)},
processing_time_ms=(time.time() - start) * 1000
)Why LLM for Borderline Cases:
┌─────────────────────────────────────────────────────────────┐
│ LLM ANALYSIS - WHEN CONTEXT MATTERS │
├─────────────────────────────────────────────────────────────┤
│ │
│ Example: "I want to kill this game's final boss" │
│ │
│ Heuristic: ⚠️ "kill" detected → flags violence │
│ ML Model: ⚠️ 0.65 confidence → borderline │
│ LLM: ✓ Gaming context → NOT a violation │
│ │
│ LLM receives: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. Community Guidelines (full policy document) │ │
│ │ 2. Platform type ("gaming forum") │ │
│ │ 3. Content + context (thread, replies, user type) │ │
│ │ 4. Prior classifier results (for calibration) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ LLM outputs structured analysis: │
│ • violates_policy: false │
│ • severity: "none" │
│ • reasoning: "Gaming context, 'kill' refers to game..." │
│ • context_matters: true │
│ • recommended_action: "approve" │
│ │
└─────────────────────────────────────────────────────────────┘| LLM Input | Purpose |
|---|---|
| Guidelines | Teaches policy rules with examples |
| Platform type | Context calibration (gaming vs. news vs. social) |
| Prior classifications | Helps LLM understand why content is borderline |
| User context | Account type, thread context, reply-to info |
Cost Control: Only ~5% of content reaches LLM tier (after heuristics + ML filter).
Community Guidelines
# app/policies/community.py
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class PolicyRule:
"""A single policy rule."""
id: str
category: str
description: str
examples_violating: list[str] = field(default_factory=list)
examples_allowed: list[str] = field(default_factory=list)
severity: str = "medium" # low, medium, high, critical
requires_context: bool = False
class CommunityGuidelines:
"""
Community guidelines definition.
In production, load from database/CMS.
"""
def __init__(self):
self.rules = self._load_default_rules()
def _load_default_rules(self) -> list[PolicyRule]:
"""Load default community guidelines."""
return [
PolicyRule(
id="hate-1",
category="hate_speech",
description="Content that attacks or demeans groups based on protected characteristics (race, ethnicity, religion, gender, sexual orientation, disability, nationality)",
examples_violating=[
"All [group] are criminals",
"Slurs targeting protected groups",
"Calls to exclude groups from society",
],
examples_allowed=[
"Discussion of discrimination issues",
"News reporting on hate crimes",
"Academic analysis of prejudice",
],
severity="high",
requires_context=True
),
PolicyRule(
id="harass-1",
category="harassment",
description="Targeted attacks, bullying, or intimidation of specific individuals",
examples_violating=[
"Repeated negative comments targeting a user",
"Sharing someone's private information",
"Coordinated attacks on individuals",
],
examples_allowed=[
"Criticism of public figures' actions",
"Disagreement in debates",
"Negative reviews of businesses",
],
severity="high",
requires_context=True
),
PolicyRule(
id="violence-1",
category="violence",
description="Threats of violence, glorification of violence, or instructions for violent acts",
examples_violating=[
"Specific threats against individuals or groups",
"Instructions for creating weapons",
"Celebrating violent attacks",
],
examples_allowed=[
"News reporting on violence",
"Historical education",
"Fictional violence in clearly marked fiction",
],
severity="critical",
requires_context=True
),
PolicyRule(
id="spam-1",
category="spam",
description="Commercial spam, scams, or manipulation campaigns",
examples_violating=[
"Unsolicited commercial promotion",
"Get-rich-quick schemes",
"Fake engagement requests",
],
examples_allowed=[
"Genuine product recommendations",
"Business accounts promoting services",
],
severity="medium",
requires_context=False
),
PolicyRule(
id="misinfo-1",
category="misinformation",
description="Demonstrably false information that could cause real-world harm",
examples_violating=[
"False medical advice (e.g., dangerous 'cures')",
"Election interference claims",
"Crisis event misinformation",
],
examples_allowed=[
"Opinion and political commentary",
"Satire (when clearly marked)",
"Honest mistakes with corrections",
],
severity="high",
requires_context=True
),
]
def to_text(self) -> str:
"""Convert guidelines to text for LLM."""
sections = []
for rule in self.rules:
section = f"""
## {rule.category.upper()} ({rule.id})
{rule.description}
Severity: {rule.severity}
Requires Context: {'Yes' if rule.requires_context else 'No'}
Examples of VIOLATIONS:
{chr(10).join('- ' + ex for ex in rule.examples_violating)}
Examples of ALLOWED content:
{chr(10).join('- ' + ex for ex in rule.examples_allowed)}
"""
sections.append(section)
return "\n---\n".join(sections)
def get_rule(self, rule_id: str) -> Optional[PolicyRule]:
"""Get a specific rule by ID."""
for rule in self.rules:
if rule.id == rule_id:
return rule
return NoneWhy Structured Guidelines:
┌─────────────────────────────────────────────────────────────┐
│ POLICY RULE STRUCTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Each rule has: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ id: "hate-1" (unique identifier for tracking) │ │
│ │ category: "hate_speech" (maps to ViolationType) │ │
│ │ description: "Attacks on protected groups..." │ │
│ │ │ │
│ │ examples_violating: ["All [group] are criminals"] │ │
│ │ examples_allowed: ["Discussion of discrimination"] │ │
│ │ │ │
│ │ severity: "high" (drives action selection) │ │
│ │ requires_context: true (needs LLM review) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Why examples matter: │
│ • LLM learns the BOUNDARY between allowed and forbidden │
│ • "Discussion of hate crimes" vs "Committing hate speech" │
│ • Reduces false positives on legitimate discourse │
│ │
└─────────────────────────────────────────────────────────────┘| Severity | Auto-Action | Human Review |
|---|---|---|
| critical | Immediate block | Priority 1 (1hr SLA) |
| high | Block if confidence > 0.9 | Priority 2 (4hr SLA) |
| medium | Warning/review | Priority 3 (8hr SLA) |
| low | Warning only | Priority 4 (24hr SLA) |
Moderation Orchestrator
# app/services/moderation.py
import time
from typing import Optional
import redis
from dataclasses import dataclass
from ..models.content import (
ContentItem,
ClassificationResult,
ModerationDecision,
ModerationAction,
ViolationType,
)
from ..classifiers.heuristic import HeuristicClassifier
from ..classifiers.toxicity import ToxicityClassifier
from ..classifiers.spam import SpamClassifier
from ..classifiers.llm_analyzer import LLMPolicyAnalyzer
@dataclass
class ModerationConfig:
"""Configuration for moderation thresholds."""
# Confidence thresholds
auto_block_threshold: float = 0.95
auto_approve_threshold: float = 0.2
llm_review_threshold: float = 0.6
# Rate limits
enable_rate_limiting: bool = True
# LLM usage
always_use_llm: bool = False
llm_for_borderline: bool = True
class ModerationOrchestrator:
"""
Orchestrates multi-tier content moderation.
Flow:
1. Heuristic check (fast blocklist/pattern matching)
2. ML toxicity check (if passes heuristic)
3. Spam detection (behavioral + content)
4. LLM analysis (for borderline cases)
5. Final decision
"""
def __init__(
self,
redis_client: redis.Redis,
config: Optional[ModerationConfig] = None
):
self.config = config or ModerationConfig()
self.redis = redis_client
# Initialize classifiers
self.heuristic = HeuristicClassifier()
self.toxicity = ToxicityClassifier()
self.spam = SpamClassifier(redis_client)
self.llm = LLMPolicyAnalyzer()
def moderate(self, content: ContentItem) -> ModerationDecision:
"""
Run full moderation pipeline on content.
"""
start_time = time.time()
classifications: list[ClassificationResult] = []
# Tier 1: Heuristic check
heuristic_result = self.heuristic.classify(content)
if heuristic_result:
classifications.append(heuristic_result)
# Immediate block for high-confidence heuristic match
if heuristic_result.confidence >= self.config.auto_block_threshold:
return self._create_decision(
content=content,
action=ModerationAction.BLOCK,
classifications=classifications,
reasoning="Blocked by heuristic filter"
)
# Tier 1: Toxicity check
toxicity_result = self.toxicity.classify(content)
if toxicity_result:
classifications.append(toxicity_result)
# Tier 1: Spam check
spam_result = self.spam.classify(content)
if spam_result:
classifications.append(spam_result)
# Determine if LLM analysis needed
needs_llm = self._needs_llm_review(classifications)
if needs_llm or self.config.always_use_llm:
# Tier 2: LLM analysis
llm_result = self.llm.analyze(content, classifications)
classifications.append(llm_result)
# Make final decision
return self._make_final_decision(content, classifications)
def _needs_llm_review(
self,
classifications: list[ClassificationResult]
) -> bool:
"""Determine if content needs LLM review."""
if not self.config.llm_for_borderline:
return False
for result in classifications:
# Borderline confidence - needs LLM
if (self.config.auto_approve_threshold < result.confidence
< self.config.auto_block_threshold):
return True
# Context-sensitive violations need LLM
if result.violation_type in [
ViolationType.HATE_SPEECH,
ViolationType.HARASSMENT,
ViolationType.MISINFORMATION,
] and result.confidence > 0.5:
return True
return False
def _make_final_decision(
self,
content: ContentItem,
classifications: list[ClassificationResult]
) -> ModerationDecision:
"""Make final moderation decision based on all classifications."""
# Collect all violations
violations = []
max_confidence = 0.0
primary_reasoning = ""
for result in classifications:
if result.violation_type != ViolationType.NONE:
violations.append(result.violation_type)
if result.confidence > max_confidence:
max_confidence = result.confidence
primary_reasoning = result.details.get(
"reasoning",
f"Detected by {result.classifier}"
)
# No violations
if not violations:
return self._create_decision(
content=content,
action=ModerationAction.APPROVE,
classifications=classifications,
reasoning="No policy violations detected"
)
# Determine action based on confidence
if max_confidence >= self.config.auto_block_threshold:
action = ModerationAction.BLOCK
elif max_confidence >= self.config.llm_review_threshold:
action = ModerationAction.HUMAN_REVIEW
else:
action = ModerationAction.WARNING
return self._create_decision(
content=content,
action=action,
classifications=classifications,
reasoning=primary_reasoning,
violations=violations
)
def _create_decision(
self,
content: ContentItem,
action: ModerationAction,
classifications: list[ClassificationResult],
reasoning: str,
violations: Optional[list[ViolationType]] = None
) -> ModerationDecision:
"""Create a moderation decision."""
# Calculate overall confidence
if classifications:
confidence = max(c.confidence for c in classifications)
else:
confidence = 1.0
return ModerationDecision(
content_id=content.id,
action=action,
violation_types=violations or [ViolationType.NONE],
confidence=confidence,
reasoning=reasoning,
classifications=classifications,
requires_appeal=action == ModerationAction.BLOCK,
reviewed_by="auto"
)Understanding the Orchestration Logic:
┌─────────────────────────────────────────────────────────────┐
│ MODERATION PIPELINE FLOW │
├─────────────────────────────────────────────────────────────┤
│ │
│ Content arrives │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ TIER 1: HEURISTIC │ │
│ │ confidence ≥ 0.95? ──► AUTO-BLOCK (immediate) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ continue │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ TIER 1: TOXICITY + SPAM (parallel) │ │
│ │ Collect all ClassificationResults │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ NEEDS LLM REVIEW? │ │
│ │ • 0.2 < confidence < 0.95 (borderline) │ │
│ │ • hate_speech, harassment, misinfo + conf > 0.5 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ yes │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ TIER 2: LLM ANALYSIS │ │
│ │ Add LLM result to classifications │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ FINAL DECISION based on max_confidence: │
│ • ≥ 0.95 → BLOCK │
│ • ≥ 0.60 → HUMAN_REVIEW │
│ • < 0.60 → WARNING or APPROVE │
│ │
└─────────────────────────────────────────────────────────────┘| Config Setting | Default | Effect |
|---|---|---|
auto_block_threshold | 0.95 | Confidence needed for auto-block |
auto_approve_threshold | 0.2 | Below this = definitely safe |
llm_review_threshold | 0.6 | Above this = queue for human |
llm_for_borderline | true | Use LLM for uncertain cases |
Human Review Queue
# app/services/queue.py
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass
import json
import redis
from ..models.content import (
ContentItem,
ModerationDecision,
ModerationAction,
ViolationType,
)
@dataclass
class ReviewItem:
"""Item in the human review queue."""
content: ContentItem
decision: ModerationDecision
priority: int # 1 = highest
assigned_to: Optional[str] = None
assigned_at: Optional[datetime] = None
sla_deadline: Optional[datetime] = None
class HumanReviewQueue:
"""
Manages the human review queue for borderline cases.
"""
PRIORITY_MAP = {
ViolationType.SELF_HARM: 1,
ViolationType.VIOLENCE: 1,
ViolationType.ILLEGAL: 1,
ViolationType.HATE_SPEECH: 2,
ViolationType.HARASSMENT: 2,
ViolationType.SEXUAL: 3,
ViolationType.MISINFORMATION: 3,
ViolationType.SPAM: 4,
ViolationType.PII: 3,
ViolationType.NONE: 5,
}
SLA_HOURS = {
1: 1, # Priority 1: 1 hour
2: 4, # Priority 2: 4 hours
3: 8, # Priority 3: 8 hours
4: 24, # Priority 4: 24 hours
5: 48, # Priority 5: 48 hours
}
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def enqueue(
self,
content: ContentItem,
decision: ModerationDecision
) -> ReviewItem:
"""Add item to review queue."""
# Determine priority
priority = min(
self.PRIORITY_MAP.get(vt, 5)
for vt in decision.violation_types
)
# Calculate SLA deadline
sla_hours = self.SLA_HOURS.get(priority, 24)
sla_deadline = datetime.utcnow() + timedelta(hours=sla_hours)
item = ReviewItem(
content=content,
decision=decision,
priority=priority,
sla_deadline=sla_deadline
)
# Add to sorted set (priority + timestamp for ordering)
score = priority * 1e12 + datetime.utcnow().timestamp()
self.redis.zadd(
"review_queue",
{content.id: score}
)
# Store full item data
self.redis.hset(
"review_items",
content.id,
json.dumps({
"content": content.model_dump(),
"decision": decision.model_dump(),
"priority": priority,
"sla_deadline": sla_deadline.isoformat(),
}, default=str)
)
return item
def dequeue(self, moderator_id: str) -> Optional[ReviewItem]:
"""Get next item for moderator to review."""
# Get highest priority unassigned item
items = self.redis.zrange("review_queue", 0, 0)
if not items:
return None
content_id = items[0].decode() if isinstance(items[0], bytes) else items[0]
# Remove from queue
self.redis.zrem("review_queue", content_id)
# Get item data
data = self.redis.hget("review_items", content_id)
if not data:
return None
item_data = json.loads(data)
# Update assignment
item_data["assigned_to"] = moderator_id
item_data["assigned_at"] = datetime.utcnow().isoformat()
self.redis.hset(
"review_items",
content_id,
json.dumps(item_data, default=str)
)
return ReviewItem(
content=ContentItem(**item_data["content"]),
decision=ModerationDecision(**item_data["decision"]),
priority=item_data["priority"],
assigned_to=moderator_id,
assigned_at=datetime.utcnow(),
sla_deadline=datetime.fromisoformat(item_data["sla_deadline"])
)
def submit_review(
self,
content_id: str,
moderator_id: str,
final_action: ModerationAction,
notes: str
) -> ModerationDecision:
"""Submit moderator's final decision."""
# Get item data
data = self.redis.hget("review_items", content_id)
if not data:
raise ValueError(f"Item {content_id} not found")
item_data = json.loads(data)
original_decision = ModerationDecision(**item_data["decision"])
# Create final decision
final_decision = ModerationDecision(
content_id=content_id,
action=final_action,
violation_types=original_decision.violation_types,
confidence=1.0, # Human review = full confidence
reasoning=f"{original_decision.reasoning}\n\nModerator notes: {notes}",
classifications=original_decision.classifications,
requires_appeal=final_action == ModerationAction.BLOCK,
reviewed_by=moderator_id
)
# Clean up
self.redis.hdel("review_items", content_id)
# Store for analytics
self._log_decision(original_decision, final_decision, moderator_id)
return final_decision
def _log_decision(
self,
original: ModerationDecision,
final: ModerationDecision,
moderator_id: str
):
"""Log decision for model improvement."""
log_entry = {
"content_id": original.content_id,
"auto_decision": original.action.value,
"human_decision": final.action.value,
"was_overturned": original.action != final.action,
"moderator_id": moderator_id,
"timestamp": datetime.utcnow().isoformat(),
}
self.redis.lpush("moderation_log", json.dumps(log_entry))
self.redis.ltrim("moderation_log", 0, 99999) # Keep last 100k
def get_queue_stats(self) -> dict:
"""Get queue statistics."""
queue_length = self.redis.zcard("review_queue")
# Count by priority
by_priority = {}
for p in range(1, 6):
score_min = p * 1e12
score_max = (p + 1) * 1e12
count = self.redis.zcount("review_queue", score_min, score_max)
by_priority[p] = count
return {
"total": queue_length,
"by_priority": by_priority,
}Priority Queue Design:
┌─────────────────────────────────────────────────────────────┐
│ HUMAN REVIEW QUEUE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Redis Sorted Set: score = priority * 1e12 + timestamp │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Priority 1 (SLA: 1 hour) │ │
│ │ • SELF_HARM, VIOLENCE, ILLEGAL │ │
│ │ • Immediate danger → fastest review │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Priority 2 (SLA: 4 hours) │ │
│ │ • HATE_SPEECH, HARASSMENT │ │
│ │ • Serious harm but not immediate danger │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Priority 3 (SLA: 8 hours) │ │
│ │ • SEXUAL, MISINFORMATION, PII │ │
│ │ • Important but less urgent │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Priority 4 (SLA: 24 hours) │ │
│ │ • SPAM │ │
│ │ • Nuisance but not harmful │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Priority 5 (SLA: 48 hours) │ │
│ │ • Edge cases, appeals │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ dequeue(moderator_id): │
│ • ZRANGE gets lowest score (highest priority + oldest) │
│ • Assigns to moderator, tracks SLA │
│ • Logs decision for model improvement │
│ │
└─────────────────────────────────────────────────────────────┘| Queue Operation | Redis Command | Purpose |
|---|---|---|
| Enqueue | ZADD | Add with priority score |
| Dequeue | ZRANGE + ZREM | Get highest priority |
| Stats | ZCOUNT | Count by priority range |
| SLA tracking | Stored in hash | Calculate deadline |
Decision Logging: Every moderator decision is logged for:
- Model retraining (was AI wrong?)
- Quality assurance (is moderator consistent?)
- Analytics (overturn rate, category patterns)
API Endpoints
# app/api/routes.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import Optional
import redis
from ..models.content import (
ContentItem,
ModerationDecision,
ModerationAction,
)
from ..services.moderation import ModerationOrchestrator, ModerationConfig
from ..services.queue import HumanReviewQueue
router = APIRouter(prefix="/api/v1", tags=["moderation"])
# Initialize services
redis_client = redis.Redis(host="localhost", port=6379, db=0)
moderator = ModerationOrchestrator(redis_client)
review_queue = HumanReviewQueue(redis_client)
@router.post("/moderate", response_model=ModerationDecision)
async def moderate_content(
content: ContentItem,
background_tasks: BackgroundTasks
):
"""
Submit content for moderation.
Returns immediate decision or queues for human review.
"""
decision = moderator.moderate(content)
# Queue for human review if needed
if decision.action == ModerationAction.HUMAN_REVIEW:
review_queue.enqueue(content, decision)
return decision
@router.post("/moderate/batch")
async def moderate_batch(contents: list[ContentItem]):
"""Batch moderation for bulk processing."""
results = []
for content in contents:
decision = moderator.moderate(content)
results.append(decision)
if decision.action == ModerationAction.HUMAN_REVIEW:
review_queue.enqueue(content, decision)
return {"decisions": results}
@router.get("/queue/stats")
async def get_queue_stats():
"""Get human review queue statistics."""
return review_queue.get_queue_stats()
@router.post("/queue/next")
async def get_next_review(moderator_id: str):
"""Get next item for moderator to review."""
item = review_queue.dequeue(moderator_id)
if not item:
return {"message": "Queue is empty"}
return {
"content": item.content,
"auto_decision": item.decision,
"priority": item.priority,
"sla_deadline": item.sla_deadline,
}
@router.post("/queue/submit")
async def submit_review(
content_id: str,
moderator_id: str,
action: ModerationAction,
notes: str = ""
):
"""Submit moderator's final decision."""
try:
decision = review_queue.submit_review(
content_id=content_id,
moderator_id=moderator_id,
final_action=action,
notes=notes
)
return decision
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.post("/appeal")
async def submit_appeal(
content_id: str,
user_id: str,
reason: str
):
"""Submit an appeal for blocked content."""
# Re-queue with high priority
# In production, implement full appeal workflow
return {
"appeal_id": f"appeal_{content_id}",
"status": "pending",
"message": "Your appeal has been submitted for review"
}Main Application
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import make_asgi_app
from .api.routes import router
app = FastAPI(
title="Content Moderation API",
description="AI-powered content moderation system",
version="1.0.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount routes
app.include_router(router)
# Prometheus metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Deployment
Docker Configuration
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_HOST=redis
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
- postgres
deploy:
replicas: 3
resources:
limits:
memory: 4G
reservations:
memory: 2G
worker:
build: .
command: celery -A workers.moderation_worker worker -l info
environment:
- REDIS_HOST=redis
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
deploy:
replicas: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
postgres:
image: postgres:15
environment:
- POSTGRES_DB=moderation
- POSTGRES_USER=moderation
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
redis_data:
postgres_data:Key Metrics
| Metric | Target | Description |
|---|---|---|
| Latency (P99) | Under 200ms | Tier 1 classification |
| Latency (P99) | Under 2s | With LLM analysis |
| Precision | Over 95% | Minimize false positives |
| Recall | Over 99% | Catch harmful content |
| Human Review Rate | Under 5% | Minimize queue burden |
| Appeal Overturn Rate | Under 10% | Decision quality |
Testing
# tests/test_moderation.py
import pytest
from app.models.content import ContentItem, ContentType
from app.services.moderation import ModerationOrchestrator, ModerationConfig
@pytest.fixture
def moderator(redis_client):
return ModerationOrchestrator(redis_client)
class TestModeration:
def test_approve_safe_content(self, moderator):
content = ContentItem(
id="test1",
user_id="user1",
content_type=ContentType.TEXT,
text="I love sunny days and walking in the park!"
)
decision = moderator.moderate(content)
assert decision.action.value == "approve"
def test_block_obvious_violation(self, moderator):
content = ContentItem(
id="test2",
user_id="user1",
content_type=ContentType.TEXT,
text="Buy now! 90% off! Click here for free money!"
)
decision = moderator.moderate(content)
assert decision.action.value in ["block", "human_review"]
assert "spam" in [v.value for v in decision.violation_types]
def test_context_aware_classification(self, moderator):
# News article about violence should be allowed
content = ContentItem(
id="test3",
user_id="news_outlet",
content_type=ContentType.TEXT,
text="Breaking: Police respond to incident downtown. Officials confirm no injuries.",
context={"account_type": "verified_news"}
)
decision = moderator.moderate(content)
assert decision.action.value == "approve"Business Impact
| Metric | Before | After |
|---|---|---|
| Human Review Load | 100% manual | 5% queue rate |
| Response Time | 24-48 hours | Under 2 seconds |
| Harmful Content Reach | Hours of exposure | Under 1 minute |
| False Positive Rate | 15% | Under 3% |
| Moderator Burnout | High | Reduced |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Multi-Tier Classification | Heuristics → ML → LLM pipeline | Optimizes cost: cheap checks first, expensive LLM only when needed |
| Heuristic Classifier | Regex patterns + blocklists for obvious violations | ~1ms latency, catches 30% of violations with 99% precision |
| Toxicity Classifier | ML model (toxic-bert) trained on labeled content | Catches nuanced toxicity that heuristics miss |
| Spam Classifier | Behavioral signals (rate limiting, duplicates) + content signals | Detects bot patterns and promotional spam |
| LLM Policy Analyzer | GPT-4 with full guidelines for borderline cases | Understands context, sarcasm, legitimate discussion of sensitive topics |
| Community Guidelines | Structured policy rules with examples of allowed and forbidden | Teaches LLM the boundary between violation and legitimate content |
| Priority Queue | Redis sorted set ordering by severity + timestamp | Self-harm reviewed in 1 hour, spam in 24 hours |
| SLA Tracking | Deadline per priority level | Ensures harmful content is addressed quickly |
| Decision Logging | Track auto vs human decisions, overturn rate | Enables model retraining and quality monitoring |
| Confidence Thresholds | 0.2 (safe) / 0.6 (review) / 0.95 (block) | Balances automation with human oversight |
Further Improvements
- Multi-modal Analysis - Extend to images and videos using vision models
- Real-time Learning - Update models based on moderator decisions
- User Reputation - Factor in user history and trust scores
- Coordinated Behavior - Detect bot networks and brigading
- Localization - Language and culture-specific policies