Content Moderation System
Build an AI-powered content moderation platform for user-generated content with multi-tier classification and human review workflows
Content Moderation System
Build a production-grade content moderation system that automatically detects and filters harmful content across text, images, and user behavior patterns.
Properties
| Property | Value |
|---|---|
| Difficulty | Advanced |
| Time | ~5 days |
| Industry | Social Media / UGC Platforms |
| LOC | ~1400 |
TL;DR
Build a multi-tier moderation system using fast heuristics (regex, blocklists) for obvious violations, ML classifiers (toxicity, spam detection) for nuanced content, and LLM analysis (GPT-4) for borderline cases requiring context understanding. Uses priority queuing for human review with SLA tracking.
Why This Case Study?
Content moderation is an unsolved problem at scale. Human moderators cannot review millions of posts per day, and the psychological cost of manual review is severe. AI moderation is not optional -- it is a regulatory and safety requirement.
| Metric | Human-Only Moderation | AI-Assisted Moderation |
|---|---|---|
| Review capacity | ~500 posts/day per moderator | 10,000+ posts/second (Tier 1) |
| Detection latency | Minutes to hours | <500ms for fast classification |
| Moderator burnout | High (PTSD risk) | Reduced -- humans only see borderline cases |
| False positive rate | 2-5% | 1-3% (multi-tier reduces errors) |
| Cost at 1M posts/day | $200K+/month (400 moderators) | $15K/month (AI + 20 reviewers) |
This case study teaches the multi-tier pattern used by production platforms: fast heuristics catch obvious violations, ML classifiers handle nuanced content, and LLMs analyze borderline cases that require context understanding. Human reviewers only see the cases that AI cannot confidently decide.
The Challenge
User-generated content platforms face critical moderation challenges:
| Challenge | Impact |
|---|---|
| Scale | Millions of posts per day requiring review |
| Speed | Harmful content must be caught within seconds |
| Accuracy | False positives alienate users; false negatives cause harm |
| Context | Sarcasm, cultural references, coded language |
| Evolution | New harmful patterns emerge constantly |
System Architecture
Content Moderation Architecture
Content Input
Tier 1: Fast Classification
Tier 2: LLM Analysis
Action Layer
Learning Loop
Technology Stack
| Component | Technology | Purpose |
|---|---|---|
| Framework | LangChain | LLM orchestration |
| Fast Models | Detoxify, transformers | Real-time classification |
| LLM | GPT-4o | Context-aware analysis |
| Queue | Redis + Celery | Async processing |
| Database | PostgreSQL | Content & decisions |
| Cache | Redis | Rate limiting, dedup |
| Monitoring | Prometheus + Grafana | Metrics & alerts |
Implementation
Project Structure
content_moderation/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── config.py # Configuration
│ ├── models/
│ │ ├── __init__.py
│ │ ├── content.py # Content models
│ │ └── decisions.py # Moderation decisions
│ ├── classifiers/
│ │ ├── __init__.py
│ │ ├── heuristic.py # Pattern matching
│ │ ├── toxicity.py # Toxicity detection
│ │ ├── spam.py # Spam classification
│ │ └── llm_analyzer.py # LLM-based analysis
│ ├── services/
│ │ ├── __init__.py
│ │ ├── moderation.py # Main orchestrator
│ │ ├── queue.py # Review queue
│ │ └── appeals.py # Appeal handling
│ ├── policies/
│ │ ├── __init__.py
│ │ └── community.py # Policy definitions
│ └── api/
│ ├── __init__.py
│ └── routes.py # API endpoints
├── workers/
│ └── moderation_worker.py # Celery workers
├── tests/
└── docker-compose.ymlContent Models
# app/models/content.py
from enum import Enum
from datetime import datetime
from pydantic import BaseModel, Field
from typing import Optional
class ContentType(str, Enum):
TEXT = "text"
IMAGE = "image"
VIDEO = "video"
COMMENT = "comment"
PROFILE = "profile"
class ModerationAction(str, Enum):
APPROVE = "approve"
BLOCK = "block"
HUMAN_REVIEW = "human_review"
SHADOW_BAN = "shadow_ban"
WARNING = "warning"
class ViolationType(str, Enum):
NONE = "none"
HATE_SPEECH = "hate_speech"
HARASSMENT = "harassment"
VIOLENCE = "violence"
SEXUAL = "sexual_content"
SPAM = "spam"
MISINFORMATION = "misinformation"
SELF_HARM = "self_harm"
ILLEGAL = "illegal_activity"
PII = "personal_information"
class ContentItem(BaseModel):
"""Content submitted for moderation."""
id: str
user_id: str
content_type: ContentType
text: Optional[str] = None
media_url: Optional[str] = None
metadata: dict = Field(default_factory=dict)
context: dict = Field(default_factory=dict) # Thread context, reply-to, etc.
submitted_at: datetime = Field(default_factory=datetime.utcnow)
class ClassificationResult(BaseModel):
"""Result from a single classifier."""
classifier: str
violation_type: ViolationType
confidence: float
details: dict = Field(default_factory=dict)
processing_time_ms: float
class ModerationDecision(BaseModel):
"""Final moderation decision."""
content_id: str
action: ModerationAction
violation_types: list[ViolationType]
confidence: float
reasoning: str
classifications: list[ClassificationResult]
requires_appeal: bool = False
reviewed_by: Optional[str] = None # "auto" or moderator_id
decided_at: datetime = Field(default_factory=datetime.utcnow)Understanding the Data Model:
Moderation Data Flow
| Field | Purpose |
|---|---|
violation_type | Enum of 10 categories - enables routing to specialized reviewers |
confidence | 0-1 score - drives auto-approve/block vs human review decision |
processing_time_ms | Track latency per classifier for optimization |
requires_appeal | Auto-set when blocked - enables appeal workflow |
Heuristic Classifier (Tier 1)
# app/classifiers/heuristic.py
import re
from typing import Optional
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType
)
class HeuristicClassifier:
"""
Fast pattern-based classifier for obvious violations.
Catches ~30% of violations with >99% precision.
"""
def __init__(self):
# Load from external config in production
self._load_patterns()
def _load_patterns(self):
"""Load blocklist patterns."""
# Exact match blocklist (hashed in production)
self.blocklist = set([
# Known slurs and variations (simplified for example)
# In production: load from encrypted database
])
# Regex patterns for common violations
self.patterns = {
ViolationType.SPAM: [
r"(?i)buy\s+now.*\d+%\s+off",
r"(?i)click\s+here.*free",
r"(?i)(telegram|whatsapp).*\+\d{10,}",
r"(?i)earn\s+\$\d+.*per\s+(day|hour|week)",
],
ViolationType.PII: [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # Credit card
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", # Email
],
ViolationType.VIOLENCE: [
r"(?i)(kill|murder|attack)\s+(all|every)\s+\w+",
r"(?i)bomb\s+threat",
],
ViolationType.SELF_HARM: [
r"(?i)(suicide|kill myself).*method",
r"(?i)how\s+to\s+(cut|harm)\s+myself",
],
}
# Compile patterns
self.compiled_patterns = {
vtype: [re.compile(p) for p in patterns]
for vtype, patterns in self.patterns.items()
}
def classify(self, content: ContentItem) -> Optional[ClassificationResult]:
"""
Fast heuristic classification.
Returns result only if confident match found.
"""
import time
start = time.time()
if not content.text:
return None
text = content.text.lower()
# Check blocklist
words = set(text.split())
if words & self.blocklist:
return ClassificationResult(
classifier="heuristic",
violation_type=ViolationType.HATE_SPEECH,
confidence=0.99,
details={"match_type": "blocklist"},
processing_time_ms=(time.time() - start) * 1000
)
# Check patterns
for vtype, patterns in self.compiled_patterns.items():
for pattern in patterns:
if pattern.search(content.text):
return ClassificationResult(
classifier="heuristic",
violation_type=vtype,
confidence=0.95,
details={
"match_type": "pattern",
"pattern": pattern.pattern
},
processing_time_ms=(time.time() - start) * 1000
)
return None # No confident matchWhy Heuristics First:
Heuristic Classifier Role — Catches ~30% of violations with >99% precision
| Approach | Latency | Accuracy | Cost |
|---|---|---|---|
| Heuristics | ~1ms | 99% precision | Free |
| ML Classifier | ~50ms | 95% precision | Compute |
| LLM | ~1-2s | 90% precision | $$$ API |
Why this order matters: Stop early for obvious violations → save ML/LLM costs.
Toxicity Classifier (Tier 1)
# app/classifiers/toxicity.py
import time
from typing import Optional
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType
)
class ToxicityClassifier:
"""
ML-based toxicity detection using Detoxify/similar models.
Handles nuanced toxicity that heuristics miss.
"""
TOXICITY_THRESHOLD = 0.7
SEVERE_THRESHOLD = 0.85
# Map model labels to violation types
LABEL_MAPPING = {
"toxic": ViolationType.HARASSMENT,
"severe_toxic": ViolationType.HARASSMENT,
"obscene": ViolationType.SEXUAL,
"threat": ViolationType.VIOLENCE,
"insult": ViolationType.HARASSMENT,
"identity_attack": ViolationType.HATE_SPEECH,
"sexual_explicit": ViolationType.SEXUAL,
}
def __init__(self, model_name: str = "unitary/toxic-bert"):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(
model_name
).to(self.device)
self.model.eval()
# Get label names from model config
self.labels = list(self.model.config.id2label.values())
def classify(self, content: ContentItem) -> Optional[ClassificationResult]:
"""Classify content for toxicity."""
start = time.time()
if not content.text:
return None
# Tokenize
inputs = self.tokenizer(
content.text,
return_tensors="pt",
truncation=True,
max_length=512
).to(self.device)
# Predict
with torch.no_grad():
outputs = self.model(**inputs)
probs = torch.sigmoid(outputs.logits)[0].cpu().numpy()
# Map to label scores
scores = {
label: float(prob)
for label, prob in zip(self.labels, probs)
}
# Find highest violation
max_label = max(scores, key=scores.get)
max_score = scores[max_label]
processing_time = (time.time() - start) * 1000
if max_score < self.TOXICITY_THRESHOLD:
return ClassificationResult(
classifier="toxicity",
violation_type=ViolationType.NONE,
confidence=1 - max_score,
details={"scores": scores},
processing_time_ms=processing_time
)
# Map to violation type
violation_type = self.LABEL_MAPPING.get(
max_label,
ViolationType.HARASSMENT
)
return ClassificationResult(
classifier="toxicity",
violation_type=violation_type,
confidence=max_score,
details={
"scores": scores,
"primary_label": max_label,
"is_severe": max_score >= self.SEVERE_THRESHOLD
},
processing_time_ms=processing_time
)Why ML for Toxicity:
Toxicity Detection — ML handles nuance heuristics miss
| Threshold | Value | Effect |
|---|---|---|
TOXICITY_THRESHOLD | 0.7 | Below this = NONE (safe) |
SEVERE_THRESHOLD | 0.85 | Above this = severe (auto-block candidate) |
Spam Classifier (Tier 1)
# app/classifiers/spam.py
import time
import hashlib
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Optional
import redis
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType
)
class SpamClassifier:
"""
Spam detection using behavioral signals and content analysis.
"""
DUPLICATE_THRESHOLD = 0.8
RATE_LIMIT_WINDOW = 60 # seconds
RATE_LIMIT_MAX = 10 # posts per window
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Spam indicators
self.spam_signals = {
"excessive_caps": lambda t: sum(1 for c in t if c.isupper()) / max(len(t), 1) > 0.5,
"excessive_emoji": lambda t: t.count("") > 10, # placeholder
"link_heavy": lambda t: t.count("http") > 3,
"repetitive_chars": lambda t: any(c * 5 in t for c in set(t)),
}
def _content_hash(self, text: str) -> str:
"""Create hash for deduplication."""
normalized = " ".join(text.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def _check_rate_limit(self, user_id: str) -> tuple[bool, int]:
"""Check if user is posting too frequently."""
key = f"rate:{user_id}"
pipe = self.redis.pipeline()
now = datetime.utcnow().timestamp()
window_start = now - self.RATE_LIMIT_WINDOW
# Remove old entries and count
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, self.RATE_LIMIT_WINDOW * 2)
_, count, _, _ = pipe.execute()
return count >= self.RATE_LIMIT_MAX, count
def _check_duplicate(self, content: ContentItem) -> tuple[bool, int]:
"""Check for duplicate/near-duplicate content."""
if not content.text:
return False, 0
content_hash = self._content_hash(content.text)
key = f"content_hash:{content_hash}"
# Increment and get count
count = self.redis.incr(key)
self.redis.expire(key, 3600) # 1 hour window
return count > 1, count
def classify(self, content: ContentItem) -> Optional[ClassificationResult]:
"""Classify content for spam."""
start = time.time()
signals_detected = []
spam_score = 0.0
# Check rate limiting
rate_limited, post_count = self._check_rate_limit(content.user_id)
if rate_limited:
signals_detected.append("rate_limit_exceeded")
spam_score += 0.4
# Check duplicates
is_duplicate, dup_count = self._check_duplicate(content)
if is_duplicate:
signals_detected.append(f"duplicate_content_{dup_count}")
spam_score += 0.3 * min(dup_count, 3)
# Check content signals
if content.text:
for signal_name, check_fn in self.spam_signals.items():
try:
if check_fn(content.text):
signals_detected.append(signal_name)
spam_score += 0.15
except Exception:
pass
processing_time = (time.time() - start) * 1000
# Cap at 1.0
spam_score = min(spam_score, 1.0)
if spam_score < 0.3:
return ClassificationResult(
classifier="spam",
violation_type=ViolationType.NONE,
confidence=1 - spam_score,
details={"signals": signals_detected},
processing_time_ms=processing_time
)
return ClassificationResult(
classifier="spam",
violation_type=ViolationType.SPAM,
confidence=spam_score,
details={
"signals": signals_detected,
"post_count": post_count,
"duplicate_count": dup_count
},
processing_time_ms=processing_time
)Behavioral Spam Detection:
Spam Detection Signals
Rate Limiting (Redis)
Duplicate Detection
Content Signals
Total Score (capped at 1.0)
Why Redis for spam detection:
- Sorted sets for rate limiting with automatic time-window expiry
- Increment + expire for duplicate counting
- Sub-millisecond lookups for real-time decisions
LLM Policy Analyzer (Tier 2)
# app/classifiers/llm_analyzer.py
import time
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from ..models.content import (
ContentItem,
ClassificationResult,
ViolationType,
ModerationAction
)
from ..policies.community import CommunityGuidelines
class LLMAnalysis(BaseModel):
"""Structured output from LLM analysis."""
violates_policy: bool = Field(
description="Whether content violates community guidelines"
)
violation_types: list[str] = Field(
default_factory=list,
description="List of violation categories"
)
severity: str = Field(
description="Severity level: low, medium, high, critical"
)
confidence: float = Field(
ge=0, le=1,
description="Confidence in this assessment"
)
reasoning: str = Field(
description="Detailed reasoning for the decision"
)
context_matters: bool = Field(
description="Whether context significantly affects interpretation"
)
recommended_action: str = Field(
description="Recommended action: approve, warn, remove, ban"
)
class LLMPolicyAnalyzer:
"""
LLM-based content analysis for nuanced policy violations.
Used for borderline cases that fast classifiers can't handle.
"""
SEVERITY_TO_ACTION = {
"low": ModerationAction.WARNING,
"medium": ModerationAction.HUMAN_REVIEW,
"high": ModerationAction.BLOCK,
"critical": ModerationAction.BLOCK,
}
def __init__(
self,
model: str = "gpt-4o",
guidelines: Optional[CommunityGuidelines] = None
):
self.llm = ChatOpenAI(model=model, temperature=0)
self.guidelines = guidelines or CommunityGuidelines()
self.parser = PydanticOutputParser(pydantic_object=LLMAnalysis)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are a content moderation expert. Analyze user-generated
content against community guidelines and determine if it violates policies.
COMMUNITY GUIDELINES:
{guidelines}
IMPORTANT CONSIDERATIONS:
1. Context matters - consider replies, threads, cultural context
2. Distinguish between harmful content and legitimate discussion OF harmful topics
3. Consider intent - education, news, satire vs. genuine harm
4. Err on the side of caution for content targeting individuals
5. Consider the platform context (this is a {platform_type})
VIOLATION CATEGORIES:
- hate_speech: Attacks on protected groups
- harassment: Targeted attacks on individuals
- violence: Threats, glorification, or instructions for violence
- sexual_content: Explicit sexual content (context-dependent)
- spam: Commercial spam, scams, manipulation
- misinformation: Demonstrably false claims causing harm
- self_harm: Promotion or instructions for self-harm
- illegal_activity: Illegal products, services, or activities
- personal_information: Doxxing, sharing private info
{format_instructions}"""),
("human", """Analyze this content:
CONTENT TYPE: {content_type}
USER ID: {user_id}
CONTENT: {text}
CONTEXT (if available):
{context}
Previous classifications from automated systems:
{prior_classifications}
Provide your analysis.""")
])
def analyze(
self,
content: ContentItem,
prior_classifications: list[ClassificationResult],
platform_type: str = "social media platform"
) -> ClassificationResult:
"""Perform deep LLM analysis on content."""
start = time.time()
# Format prior classifications
prior_str = "\n".join([
f"- {c.classifier}: {c.violation_type.value} (confidence: {c.confidence:.2f})"
for c in prior_classifications
]) or "None"
# Format context
context_str = "\n".join([
f"- {k}: {v}" for k, v in content.context.items()
]) or "No additional context"
# Build prompt
chain = self.prompt | self.llm | self.parser
try:
result: LLMAnalysis = chain.invoke({
"guidelines": self.guidelines.to_text(),
"platform_type": platform_type,
"format_instructions": self.parser.get_format_instructions(),
"content_type": content.content_type.value,
"user_id": content.user_id,
"text": content.text or "[No text - media only]",
"context": context_str,
"prior_classifications": prior_str,
})
# Map to violation type
violation_type = ViolationType.NONE
if result.violates_policy and result.violation_types:
# Take the most severe violation
for vt in result.violation_types:
try:
violation_type = ViolationType(vt)
break
except ValueError:
continue
return ClassificationResult(
classifier="llm_analyzer",
violation_type=violation_type,
confidence=result.confidence,
details={
"severity": result.severity,
"reasoning": result.reasoning,
"context_matters": result.context_matters,
"recommended_action": result.recommended_action,
"all_violations": result.violation_types,
},
processing_time_ms=(time.time() - start) * 1000
)
except Exception as e:
# Fallback for LLM errors
return ClassificationResult(
classifier="llm_analyzer",
violation_type=ViolationType.NONE,
confidence=0.0,
details={"error": str(e)},
processing_time_ms=(time.time() - start) * 1000
)Why LLM for Borderline Cases:
LLM Analysis — When Context Matters
| LLM Input | Purpose |
|---|---|
| Guidelines | Teaches policy rules with examples |
| Platform type | Context calibration (gaming vs. news vs. social) |
| Prior classifications | Helps LLM understand why content is borderline |
| User context | Account type, thread context, reply-to info |
Cost Control: Only ~5% of content reaches LLM tier (after heuristics + ML filter).
Community Guidelines
# app/policies/community.py
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class PolicyRule:
"""A single policy rule."""
id: str
category: str
description: str
examples_violating: list[str] = field(default_factory=list)
examples_allowed: list[str] = field(default_factory=list)
severity: str = "medium" # low, medium, high, critical
requires_context: bool = False
class CommunityGuidelines:
"""
Community guidelines definition.
In production, load from database/CMS.
"""
def __init__(self):
self.rules = self._load_default_rules()
def _load_default_rules(self) -> list[PolicyRule]:
"""Load default community guidelines."""
return [
PolicyRule(
id="hate-1",
category="hate_speech",
description="Content that attacks or demeans groups based on protected characteristics (race, ethnicity, religion, gender, sexual orientation, disability, nationality)",
examples_violating=[
"All [group] are criminals",
"Slurs targeting protected groups",
"Calls to exclude groups from society",
],
examples_allowed=[
"Discussion of discrimination issues",
"News reporting on hate crimes",
"Academic analysis of prejudice",
],
severity="high",
requires_context=True
),
PolicyRule(
id="harass-1",
category="harassment",
description="Targeted attacks, bullying, or intimidation of specific individuals",
examples_violating=[
"Repeated negative comments targeting a user",
"Sharing someone's private information",
"Coordinated attacks on individuals",
],
examples_allowed=[
"Criticism of public figures' actions",
"Disagreement in debates",
"Negative reviews of businesses",
],
severity="high",
requires_context=True
),
PolicyRule(
id="violence-1",
category="violence",
description="Threats of violence, glorification of violence, or instructions for violent acts",
examples_violating=[
"Specific threats against individuals or groups",
"Instructions for creating weapons",
"Celebrating violent attacks",
],
examples_allowed=[
"News reporting on violence",
"Historical education",
"Fictional violence in clearly marked fiction",
],
severity="critical",
requires_context=True
),
PolicyRule(
id="spam-1",
category="spam",
description="Commercial spam, scams, or manipulation campaigns",
examples_violating=[
"Unsolicited commercial promotion",
"Get-rich-quick schemes",
"Fake engagement requests",
],
examples_allowed=[
"Genuine product recommendations",
"Business accounts promoting services",
],
severity="medium",
requires_context=False
),
PolicyRule(
id="misinfo-1",
category="misinformation",
description="Demonstrably false information that could cause real-world harm",
examples_violating=[
"False medical advice (e.g., dangerous 'cures')",
"Election interference claims",
"Crisis event misinformation",
],
examples_allowed=[
"Opinion and political commentary",
"Satire (when clearly marked)",
"Honest mistakes with corrections",
],
severity="high",
requires_context=True
),
]
def to_text(self) -> str:
"""Convert guidelines to text for LLM."""
sections = []
for rule in self.rules:
section = f"""
## {rule.category.upper()} ({rule.id})
{rule.description}
Severity: {rule.severity}
Requires Context: {'Yes' if rule.requires_context else 'No'}
Examples of VIOLATIONS:
{chr(10).join('- ' + ex for ex in rule.examples_violating)}
Examples of ALLOWED content:
{chr(10).join('- ' + ex for ex in rule.examples_allowed)}
"""
sections.append(section)
return "\n---\n".join(sections)
def get_rule(self, rule_id: str) -> Optional[PolicyRule]:
"""Get a specific rule by ID."""
for rule in self.rules:
if rule.id == rule_id:
return rule
return NoneWhy Structured Guidelines:
Policy Rule Structure
| Severity | Auto-Action | Human Review |
|---|---|---|
| critical | Immediate block | Priority 1 (1hr SLA) |
| high | Block if confidence > 0.9 | Priority 2 (4hr SLA) |
| medium | Warning/review | Priority 3 (8hr SLA) |
| low | Warning only | Priority 4 (24hr SLA) |
Moderation Orchestrator
# app/services/moderation.py
import time
from typing import Optional
import redis
from dataclasses import dataclass
from ..models.content import (
ContentItem,
ClassificationResult,
ModerationDecision,
ModerationAction,
ViolationType,
)
from ..classifiers.heuristic import HeuristicClassifier
from ..classifiers.toxicity import ToxicityClassifier
from ..classifiers.spam import SpamClassifier
from ..classifiers.llm_analyzer import LLMPolicyAnalyzer
@dataclass
class ModerationConfig:
"""Configuration for moderation thresholds."""
# Confidence thresholds
auto_block_threshold: float = 0.95
auto_approve_threshold: float = 0.2
llm_review_threshold: float = 0.6
# Rate limits
enable_rate_limiting: bool = True
# LLM usage
always_use_llm: bool = False
llm_for_borderline: bool = True
class ModerationOrchestrator:
"""
Orchestrates multi-tier content moderation.
Flow:
1. Heuristic check (fast blocklist/pattern matching)
2. ML toxicity check (if passes heuristic)
3. Spam detection (behavioral + content)
4. LLM analysis (for borderline cases)
5. Final decision
"""
def __init__(
self,
redis_client: redis.Redis,
config: Optional[ModerationConfig] = None
):
self.config = config or ModerationConfig()
self.redis = redis_client
# Initialize classifiers
self.heuristic = HeuristicClassifier()
self.toxicity = ToxicityClassifier()
self.spam = SpamClassifier(redis_client)
self.llm = LLMPolicyAnalyzer()
def moderate(self, content: ContentItem) -> ModerationDecision:
"""
Run full moderation pipeline on content.
"""
start_time = time.time()
classifications: list[ClassificationResult] = []
# Tier 1: Heuristic check
heuristic_result = self.heuristic.classify(content)
if heuristic_result:
classifications.append(heuristic_result)
# Immediate block for high-confidence heuristic match
if heuristic_result.confidence >= self.config.auto_block_threshold:
return self._create_decision(
content=content,
action=ModerationAction.BLOCK,
classifications=classifications,
reasoning="Blocked by heuristic filter"
)
# Tier 1: Toxicity check
toxicity_result = self.toxicity.classify(content)
if toxicity_result:
classifications.append(toxicity_result)
# Tier 1: Spam check
spam_result = self.spam.classify(content)
if spam_result:
classifications.append(spam_result)
# Determine if LLM analysis needed
needs_llm = self._needs_llm_review(classifications)
if needs_llm or self.config.always_use_llm:
# Tier 2: LLM analysis
llm_result = self.llm.analyze(content, classifications)
classifications.append(llm_result)
# Make final decision
return self._make_final_decision(content, classifications)
def _needs_llm_review(
self,
classifications: list[ClassificationResult]
) -> bool:
"""Determine if content needs LLM review."""
if not self.config.llm_for_borderline:
return False
for result in classifications:
# Borderline confidence - needs LLM
if (self.config.auto_approve_threshold < result.confidence
< self.config.auto_block_threshold):
return True
# Context-sensitive violations need LLM
if result.violation_type in [
ViolationType.HATE_SPEECH,
ViolationType.HARASSMENT,
ViolationType.MISINFORMATION,
] and result.confidence > 0.5:
return True
return False
def _make_final_decision(
self,
content: ContentItem,
classifications: list[ClassificationResult]
) -> ModerationDecision:
"""Make final moderation decision based on all classifications."""
# Collect all violations
violations = []
max_confidence = 0.0
primary_reasoning = ""
for result in classifications:
if result.violation_type != ViolationType.NONE:
violations.append(result.violation_type)
if result.confidence > max_confidence:
max_confidence = result.confidence
primary_reasoning = result.details.get(
"reasoning",
f"Detected by {result.classifier}"
)
# No violations
if not violations:
return self._create_decision(
content=content,
action=ModerationAction.APPROVE,
classifications=classifications,
reasoning="No policy violations detected"
)
# Determine action based on confidence
if max_confidence >= self.config.auto_block_threshold:
action = ModerationAction.BLOCK
elif max_confidence >= self.config.llm_review_threshold:
action = ModerationAction.HUMAN_REVIEW
else:
action = ModerationAction.WARNING
return self._create_decision(
content=content,
action=action,
classifications=classifications,
reasoning=primary_reasoning,
violations=violations
)
def _create_decision(
self,
content: ContentItem,
action: ModerationAction,
classifications: list[ClassificationResult],
reasoning: str,
violations: Optional[list[ViolationType]] = None
) -> ModerationDecision:
"""Create a moderation decision."""
# Calculate overall confidence
if classifications:
confidence = max(c.confidence for c in classifications)
else:
confidence = 1.0
return ModerationDecision(
content_id=content.id,
action=action,
violation_types=violations or [ViolationType.NONE],
confidence=confidence,
reasoning=reasoning,
classifications=classifications,
requires_appeal=action == ModerationAction.BLOCK,
reviewed_by="auto"
)Understanding the Orchestration Logic:
Moderation Pipeline Flow
| Config Setting | Default | Effect |
|---|---|---|
auto_block_threshold | 0.95 | Confidence needed for auto-block |
auto_approve_threshold | 0.2 | Below this = definitely safe |
llm_review_threshold | 0.6 | Above this = queue for human |
llm_for_borderline | true | Use LLM for uncertain cases |
Human Review Queue
# app/services/queue.py
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass
import json
import redis
from ..models.content import (
ContentItem,
ModerationDecision,
ModerationAction,
ViolationType,
)
@dataclass
class ReviewItem:
"""Item in the human review queue."""
content: ContentItem
decision: ModerationDecision
priority: int # 1 = highest
assigned_to: Optional[str] = None
assigned_at: Optional[datetime] = None
sla_deadline: Optional[datetime] = None
class HumanReviewQueue:
"""
Manages the human review queue for borderline cases.
"""
PRIORITY_MAP = {
ViolationType.SELF_HARM: 1,
ViolationType.VIOLENCE: 1,
ViolationType.ILLEGAL: 1,
ViolationType.HATE_SPEECH: 2,
ViolationType.HARASSMENT: 2,
ViolationType.SEXUAL: 3,
ViolationType.MISINFORMATION: 3,
ViolationType.SPAM: 4,
ViolationType.PII: 3,
ViolationType.NONE: 5,
}
SLA_HOURS = {
1: 1, # Priority 1: 1 hour
2: 4, # Priority 2: 4 hours
3: 8, # Priority 3: 8 hours
4: 24, # Priority 4: 24 hours
5: 48, # Priority 5: 48 hours
}
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def enqueue(
self,
content: ContentItem,
decision: ModerationDecision
) -> ReviewItem:
"""Add item to review queue."""
# Determine priority
priority = min(
self.PRIORITY_MAP.get(vt, 5)
for vt in decision.violation_types
)
# Calculate SLA deadline
sla_hours = self.SLA_HOURS.get(priority, 24)
sla_deadline = datetime.utcnow() + timedelta(hours=sla_hours)
item = ReviewItem(
content=content,
decision=decision,
priority=priority,
sla_deadline=sla_deadline
)
# Add to sorted set (priority + timestamp for ordering)
score = priority * 1e12 + datetime.utcnow().timestamp()
self.redis.zadd(
"review_queue",
{content.id: score}
)
# Store full item data
self.redis.hset(
"review_items",
content.id,
json.dumps({
"content": content.model_dump(),
"decision": decision.model_dump(),
"priority": priority,
"sla_deadline": sla_deadline.isoformat(),
}, default=str)
)
return item
def dequeue(self, moderator_id: str) -> Optional[ReviewItem]:
"""Get next item for moderator to review."""
# Get highest priority unassigned item
items = self.redis.zrange("review_queue", 0, 0)
if not items:
return None
content_id = items[0].decode() if isinstance(items[0], bytes) else items[0]
# Remove from queue
self.redis.zrem("review_queue", content_id)
# Get item data
data = self.redis.hget("review_items", content_id)
if not data:
return None
item_data = json.loads(data)
# Update assignment
item_data["assigned_to"] = moderator_id
item_data["assigned_at"] = datetime.utcnow().isoformat()
self.redis.hset(
"review_items",
content_id,
json.dumps(item_data, default=str)
)
return ReviewItem(
content=ContentItem(**item_data["content"]),
decision=ModerationDecision(**item_data["decision"]),
priority=item_data["priority"],
assigned_to=moderator_id,
assigned_at=datetime.utcnow(),
sla_deadline=datetime.fromisoformat(item_data["sla_deadline"])
)
def submit_review(
self,
content_id: str,
moderator_id: str,
final_action: ModerationAction,
notes: str
) -> ModerationDecision:
"""Submit moderator's final decision."""
# Get item data
data = self.redis.hget("review_items", content_id)
if not data:
raise ValueError(f"Item {content_id} not found")
item_data = json.loads(data)
original_decision = ModerationDecision(**item_data["decision"])
# Create final decision
final_decision = ModerationDecision(
content_id=content_id,
action=final_action,
violation_types=original_decision.violation_types,
confidence=1.0, # Human review = full confidence
reasoning=f"{original_decision.reasoning}\n\nModerator notes: {notes}",
classifications=original_decision.classifications,
requires_appeal=final_action == ModerationAction.BLOCK,
reviewed_by=moderator_id
)
# Clean up
self.redis.hdel("review_items", content_id)
# Store for analytics
self._log_decision(original_decision, final_decision, moderator_id)
return final_decision
def _log_decision(
self,
original: ModerationDecision,
final: ModerationDecision,
moderator_id: str
):
"""Log decision for model improvement."""
log_entry = {
"content_id": original.content_id,
"auto_decision": original.action.value,
"human_decision": final.action.value,
"was_overturned": original.action != final.action,
"moderator_id": moderator_id,
"timestamp": datetime.utcnow().isoformat(),
}
self.redis.lpush("moderation_log", json.dumps(log_entry))
self.redis.ltrim("moderation_log", 0, 99999) # Keep last 100k
def get_queue_stats(self) -> dict:
"""Get queue statistics."""
queue_length = self.redis.zcard("review_queue")
# Count by priority
by_priority = {}
for p in range(1, 6):
score_min = p * 1e12
score_max = (p + 1) * 1e12
count = self.redis.zcount("review_queue", score_min, score_max)
by_priority[p] = count
return {
"total": queue_length,
"by_priority": by_priority,
}Priority Queue Design:
Human Review Queue — Redis Sorted Set (score = priority * 1e12 + timestamp)
Priority 1 (SLA: 1 hour)
Priority 2 (SLA: 4 hours)
Priority 3 (SLA: 8 hours)
Priority 4 (SLA: 24 hours)
Priority 5 (SLA: 48 hours)
| Queue Operation | Redis Command | Purpose |
|---|---|---|
| Enqueue | ZADD | Add with priority score |
| Dequeue | ZRANGE + ZREM | Get highest priority |
| Stats | ZCOUNT | Count by priority range |
| SLA tracking | Stored in hash | Calculate deadline |
Decision Logging: Every moderator decision is logged for:
- Model retraining (was AI wrong?)
- Quality assurance (is moderator consistent?)
- Analytics (overturn rate, category patterns)
API Endpoints
# app/api/routes.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import Optional
import redis
from ..models.content import (
ContentItem,
ModerationDecision,
ModerationAction,
)
from ..services.moderation import ModerationOrchestrator, ModerationConfig
from ..services.queue import HumanReviewQueue
router = APIRouter(prefix="/api/v1", tags=["moderation"])
# Initialize services
redis_client = redis.Redis(host="localhost", port=6379, db=0)
moderator = ModerationOrchestrator(redis_client)
review_queue = HumanReviewQueue(redis_client)
@router.post("/moderate", response_model=ModerationDecision)
async def moderate_content(
content: ContentItem,
background_tasks: BackgroundTasks
):
"""
Submit content for moderation.
Returns immediate decision or queues for human review.
"""
decision = moderator.moderate(content)
# Queue for human review if needed
if decision.action == ModerationAction.HUMAN_REVIEW:
review_queue.enqueue(content, decision)
return decision
@router.post("/moderate/batch")
async def moderate_batch(contents: list[ContentItem]):
"""Batch moderation for bulk processing."""
results = []
for content in contents:
decision = moderator.moderate(content)
results.append(decision)
if decision.action == ModerationAction.HUMAN_REVIEW:
review_queue.enqueue(content, decision)
return {"decisions": results}
@router.get("/queue/stats")
async def get_queue_stats():
"""Get human review queue statistics."""
return review_queue.get_queue_stats()
@router.post("/queue/next")
async def get_next_review(moderator_id: str):
"""Get next item for moderator to review."""
item = review_queue.dequeue(moderator_id)
if not item:
return {"message": "Queue is empty"}
return {
"content": item.content,
"auto_decision": item.decision,
"priority": item.priority,
"sla_deadline": item.sla_deadline,
}
@router.post("/queue/submit")
async def submit_review(
content_id: str,
moderator_id: str,
action: ModerationAction,
notes: str = ""
):
"""Submit moderator's final decision."""
try:
decision = review_queue.submit_review(
content_id=content_id,
moderator_id=moderator_id,
final_action=action,
notes=notes
)
return decision
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@router.post("/appeal")
async def submit_appeal(
content_id: str,
user_id: str,
reason: str
):
"""Submit an appeal for blocked content."""
# Re-queue with high priority
# In production, implement full appeal workflow
return {
"appeal_id": f"appeal_{content_id}",
"status": "pending",
"message": "Your appeal has been submitted for review"
}Main Application
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import make_asgi_app
from .api.routes import router
app = FastAPI(
title="Content Moderation API",
description="AI-powered content moderation system",
version="1.0.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount routes
app.include_router(router)
# Prometheus metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Deployment
Docker Configuration
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- REDIS_HOST=redis
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
- postgres
deploy:
replicas: 3
resources:
limits:
memory: 4G
reservations:
memory: 2G
worker:
build: .
command: celery -A workers.moderation_worker worker -l info
environment:
- REDIS_HOST=redis
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
deploy:
replicas: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
postgres:
image: postgres:15
environment:
- POSTGRES_DB=moderation
- POSTGRES_USER=moderation
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
redis_data:
postgres_data:Key Metrics
| Metric | Target | Description |
|---|---|---|
| Latency (P99) | Under 200ms | Tier 1 classification |
| Latency (P99) | Under 2s | With LLM analysis |
| Precision | Over 95% | Minimize false positives |
| Recall | Over 99% | Catch harmful content |
| Human Review Rate | Under 5% | Minimize queue burden |
| Appeal Overturn Rate | Under 10% | Decision quality |
Testing
# tests/test_moderation.py
import pytest
from app.models.content import ContentItem, ContentType
from app.services.moderation import ModerationOrchestrator, ModerationConfig
@pytest.fixture
def moderator(redis_client):
return ModerationOrchestrator(redis_client)
class TestModeration:
def test_approve_safe_content(self, moderator):
content = ContentItem(
id="test1",
user_id="user1",
content_type=ContentType.TEXT,
text="I love sunny days and walking in the park!"
)
decision = moderator.moderate(content)
assert decision.action.value == "approve"
def test_block_obvious_violation(self, moderator):
content = ContentItem(
id="test2",
user_id="user1",
content_type=ContentType.TEXT,
text="Buy now! 90% off! Click here for free money!"
)
decision = moderator.moderate(content)
assert decision.action.value in ["block", "human_review"]
assert "spam" in [v.value for v in decision.violation_types]
def test_context_aware_classification(self, moderator):
# News article about violence should be allowed
content = ContentItem(
id="test3",
user_id="news_outlet",
content_type=ContentType.TEXT,
text="Breaking: Police respond to incident downtown. Officials confirm no injuries.",
context={"account_type": "verified_news"}
)
decision = moderator.moderate(content)
assert decision.action.value == "approve"Business Impact
| Metric | Before | After |
|---|---|---|
| Human Review Load | 100% manual | 5% queue rate |
| Response Time | 24-48 hours | Under 2 seconds |
| Harmful Content Reach | Hours of exposure | Under 1 minute |
| False Positive Rate | 15% | Under 3% |
| Moderator Burnout | High | Reduced |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Multi-Tier Classification | Heuristics → ML → LLM pipeline | Optimizes cost: cheap checks first, expensive LLM only when needed |
| Heuristic Classifier | Regex patterns + blocklists for obvious violations | ~1ms latency, catches 30% of violations with 99% precision |
| Toxicity Classifier | ML model (toxic-bert) trained on labeled content | Catches nuanced toxicity that heuristics miss |
| Spam Classifier | Behavioral signals (rate limiting, duplicates) + content signals | Detects bot patterns and promotional spam |
| LLM Policy Analyzer | GPT-4 with full guidelines for borderline cases | Understands context, sarcasm, legitimate discussion of sensitive topics |
| Community Guidelines | Structured policy rules with examples of allowed and forbidden | Teaches LLM the boundary between violation and legitimate content |
| Priority Queue | Redis sorted set ordering by severity + timestamp | Self-harm reviewed in 1 hour, spam in 24 hours |
| SLA Tracking | Deadline per priority level | Ensures harmful content is addressed quickly |
| Decision Logging | Track auto vs human decisions, overturn rate | Enables model retraining and quality monitoring |
| Confidence Thresholds | 0.2 (safe) / 0.6 (review) / 0.95 (block) | Balances automation with human oversight |
Further Improvements
- Multi-modal Analysis - Extend to images and videos using vision models
- Real-time Learning - Update models based on moderator decisions
- User Reputation - Factor in user history and trust scores
- Coordinated Behavior - Detect bot networks and brigading
- Localization - Language and culture-specific policies