Adverse Event Surveillance Agent
Build a multi-agent system that monitors patient data streams for potential adverse drug reactions using supervisor-worker orchestration, temporal correlation, and escalation tiers
Adverse Event Surveillance Agent
Build a production-grade surveillance system that continuously monitors patient data for potential adverse drug events (ADEs) using a supervisor-worker multi-agent architecture with real-time correlation and intelligent escalation.
| Industry | Healthcare / Pharmacovigilance |
| Difficulty | Advanced |
| Time | 2 weeks |
| Code | ~1500 lines |
TL;DR
Build an adverse event surveillance system using LangGraph multi-agent orchestration (supervisor coordinates specialist detectors), event-driven architecture (processes streaming patient data), OpenFDA FAERS (known adverse event patterns), and temporal correlation (detects medication-to-event timing). Monitors labs, vitals, and medication orders to catch ADEs early with tiered escalation.
Medical Disclaimer
This system is designed as a clinical surveillance tool to assist healthcare professionals in detecting potential adverse drug events. It does not replace clinical judgment, pharmacovigilance expertise, or established reporting protocols. All alerts must be reviewed by qualified personnel before any clinical action.
What You'll Build
An adverse event surveillance agent that:
- Monitors patient data streams - Processes labs, vitals, and medication orders in real-time
- Detects lab anomalies - Identifies abnormal lab values that may indicate drug toxicity
- Tracks vital sign trends - Detects concerning patterns like sudden BP drops or tachycardia
- Correlates temporal patterns - Links medication administration to subsequent adverse events
- Matches known ADE patterns - Queries OpenFDA FAERS for recognized drug-reaction associations
- Escalates intelligently - Routes alerts through LOW → MEDIUM → HIGH → CRITICAL tiers
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ ADVERSE EVENT SURVEILLANCE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ EVENT STREAM (Patient Data) │ │
│ │ Lab Results ──── Vital Signs ──── Medication Orders ──── Notes │ │
│ └──────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SLIDING WINDOW STATE │ │
│ │ Last 24-72 hours of patient events, indexed by patient ID │ │
│ └──────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SUPERVISOR AGENT │ │
│ │ Coordinates detection ──► Aggregates signals ──► Triggers alerts │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Lab Anomaly │ │ Vital Trend │ │ Temporal │ │ Known │ │ │
│ │ │ Detector │ │ Detector │ │ Correlation │ │ Pattern │ │ │
│ │ │ │ │ │ │ Detector │ │ Matcher │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ │ │ │ │ │ │ │
│ │ └───────────────┴───────────────┴───────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ Signal Correlator │ │ │
│ │ │ (Fuses detections) │ │ │
│ │ └──────────┬──────────┘ │ │
│ └───────────────────────────────┼─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ESCALATION ENGINE │ │
│ │ │ │
│ │ LOW ────► MEDIUM ────► HIGH ────► CRITICAL │ │
│ │ (Log) (Queue) (Page) (Interrupt) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘Project Structure
adverse-event-surveillance/
├── src/
│ ├── __init__.py
│ ├── config.py
│ ├── events/
│ │ ├── __init__.py
│ │ ├── models.py # Event data models
│ │ ├── stream.py # Event stream processor
│ │ └── window.py # Sliding time window state
│ ├── detectors/
│ │ ├── __init__.py
│ │ ├── base.py # Base detector interface
│ │ ├── lab_anomaly.py # Lab value anomaly detector
│ │ ├── vital_trend.py # Vital sign trend detector
│ │ ├── temporal.py # Temporal correlation detector
│ │ └── known_pattern.py # Known ADE pattern matcher (OpenFDA)
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── supervisor.py # Supervisor agent
│ │ ├── correlator.py # Signal correlation agent
│ │ └── workflow.py # LangGraph multi-agent workflow
│ ├── alerts/
│ │ ├── __init__.py
│ │ ├── models.py # Alert models
│ │ ├── escalation.py # Escalation logic
│ │ └── storage.py # Alert persistence
│ └── api/
│ ├── __init__.py
│ └── main.py # FastAPI endpoints
├── data/
│ └── reference/ # Reference ranges, known patterns
├── tests/
├── docker-compose.yml
└── requirements.txtTech Stack
| Technology | Purpose |
|---|---|
| LangGraph | Multi-agent supervisor-worker orchestration |
| OpenAI GPT-4o | Clinical reasoning and signal correlation |
| OpenFDA FAERS API | Known adverse event patterns (free, no key required) |
| Redis Streams | Event ingestion and processing |
| SQLite | Alert storage and audit trail |
| Pydantic | Event and alert data validation |
| FastAPI | REST API and webhook endpoints |
Implementation
Configuration
# src/config.py
from pydantic_settings import BaseSettings
from typing import Optional, Dict, List
from datetime import timedelta
class Settings(BaseSettings):
# LLM Settings
openai_api_key: str
openai_model: str = "gpt-4o"
temperature: float = 0.1 # Low for clinical precision
# OpenFDA API (free, no key required)
openfda_base_url: str = "https://api.fda.gov"
openfda_api_key: Optional[str] = None # Optional: increases rate limit
# Redis Settings (for event streaming)
redis_url: str = "redis://localhost:6379"
event_stream_name: str = "patient_events"
# Sliding Window Settings
window_duration_hours: int = 72 # Look back 72 hours
window_cleanup_interval_minutes: int = 30
# Detection Thresholds
lab_anomaly_threshold: float = 0.7 # Confidence threshold
vital_trend_threshold: float = 0.6
temporal_correlation_threshold: float = 0.65
known_pattern_threshold: float = 0.8
# Escalation Settings
escalation_rules: Dict[str, List[str]] = {
"CRITICAL": ["anaphylaxis", "cardiac_arrest", "respiratory_failure",
"severe_bleeding", "seizure"],
"HIGH": ["acute_kidney_injury", "hepatotoxicity", "severe_hypotension",
"severe_bradycardia", "severe_tachycardia"],
"MEDIUM": ["moderate_rash", "nausea_vomiting", "diarrhea",
"mild_hypotension", "electrolyte_imbalance"],
"LOW": ["mild_rash", "headache", "dizziness", "fatigue"]
}
# Alert Settings
alert_db_path: str = "./data/alerts.db"
enable_audit_logging: bool = True
class Config:
env_file = ".env"
settings = Settings()Understanding the Configuration:
┌─────────────────────────────────────────────────────────────────────┐
│ KEY CONFIGURATION DECISIONS │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ window_duration_hours: 72 │
│ └── Most ADEs manifest within 72 hours of drug administration │
│ • Immediate (minutes): anaphylaxis, infusion reactions │
│ • Acute (hours): hypotension, arrhythmia │
│ • Delayed (days): rash, hepatotoxicity, nephrotoxicity │
│ │
│ Threshold Strategy (different for each detector): │
│ ┌──────────────────────┬─────────┬────────────────────────────┐ │
│ │ Detector │ Thresh │ Why │ │
│ ├──────────────────────┼─────────┼────────────────────────────┤ │
│ │ known_pattern │ 0.80 │ High - FDA data is reliable │ │
│ │ lab_anomaly │ 0.70 │ Medium - labs can vary │ │
│ │ temporal_correlation │ 0.65 │ Medium - timing uncertain │ │
│ │ vital_trend │ 0.60 │ Lower - trends are noisy │ │
│ └──────────────────────┴─────────┴────────────────────────────┘ │
│ │
│ Escalation tiers determine response urgency: │
│ • CRITICAL → Immediate page to on-call physician │
│ • HIGH → Alert queue for pharmacist review within 1 hour │
│ • MEDIUM → Dashboard flag for routine review │
│ • LOW → Logged for trending/analytics only │
│ │
└─────────────────────────────────────────────────────────────────────┘Event Models
# src/events/models.py
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Literal
from datetime import datetime
from enum import Enum
class EventType(str, Enum):
LAB_RESULT = "lab_result"
VITAL_SIGN = "vital_sign"
MEDICATION_ORDER = "medication_order"
MEDICATION_ADMIN = "medication_admin"
CLINICAL_NOTE = "clinical_note"
class LabResult(BaseModel):
"""Laboratory test result."""
event_type: Literal["lab_result"] = "lab_result"
patient_id: str
timestamp: datetime
test_code: str = Field(description="LOINC code")
test_name: str
value: float
unit: str
reference_low: Optional[float] = None
reference_high: Optional[float] = None
is_critical: bool = False
ordering_provider: Optional[str] = None
@property
def is_abnormal(self) -> bool:
if self.reference_low and self.value < self.reference_low:
return True
if self.reference_high and self.value > self.reference_high:
return True
return False
@property
def abnormality_direction(self) -> Optional[str]:
if self.reference_low and self.value < self.reference_low:
return "low"
if self.reference_high and self.value > self.reference_high:
return "high"
return None
class VitalSign(BaseModel):
"""Vital sign measurement."""
event_type: Literal["vital_sign"] = "vital_sign"
patient_id: str
timestamp: datetime
vital_type: str # BP, HR, RR, SpO2, Temp
value: str # "120/80" for BP, numeric string for others
numeric_value: Optional[float] = None # Parsed numeric (systolic for BP)
unit: str
location: Optional[str] = None # ICU, ED, Floor, etc.
class MedicationOrder(BaseModel):
"""Medication order event."""
event_type: Literal["medication_order"] = "medication_order"
patient_id: str
timestamp: datetime
medication_name: str
rxnorm_code: Optional[str] = None
ndc_code: Optional[str] = None
dose: str
route: str # PO, IV, IM, SC, etc.
frequency: str
ordering_provider: str
indication: Optional[str] = None
class MedicationAdmin(BaseModel):
"""Medication administration event."""
event_type: Literal["medication_admin"] = "medication_admin"
patient_id: str
timestamp: datetime
medication_name: str
rxnorm_code: Optional[str] = None
dose_given: str
route: str
administered_by: str
order_id: Optional[str] = None
class PatientEvent(BaseModel):
"""Union type for all patient events."""
event: LabResult | VitalSign | MedicationOrder | MedicationAdmin
@property
def patient_id(self) -> str:
return self.event.patient_id
@property
def timestamp(self) -> datetime:
return self.event.timestamp
@property
def event_type(self) -> str:
return self.event.event_typeEvent Data Flow:
┌─────────────────────────────────────────────────────────────────────┐
│ PATIENT EVENT TYPES AND THEIR SURVEILLANCE VALUE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LAB_RESULT │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ • Creatinine ↑ → Nephrotoxicity (aminoglycosides, NSAIDs) │ │
│ │ • ALT/AST ↑ → Hepatotoxicity (acetaminophen, statins) │ │
│ │ • Platelets ↓ → Thrombocytopenia (heparin, chemo) │ │
│ │ • Potassium ↓/↑ → Electrolyte imbalance (diuretics, ACEi) │ │
│ │ • WBC ↓ → Neutropenia (chemo, clozapine) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ VITAL_SIGN │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ • BP drop → Hypotension (antihypertensives, opioids) │ │
│ │ • HR spike → Tachycardia (stimulants, anticholinergics) │ │
│ │ • SpO2 drop → Respiratory depression (opioids, sedatives) │ │
│ │ • Temp spike → Drug fever, serotonin syndrome │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ MEDICATION_ADMIN (Key for temporal correlation) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ timestamp = T₀ (drug given) │ │
│ │ │ │
│ │ If LAB_RESULT or VITAL_SIGN abnormality at T₀ + Δt │ │
│ │ where Δt is within expected onset window... │ │
│ │ → Temporal correlation detected │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘Sliding Window State
# src/events/window.py
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import asyncio
from .models import PatientEvent, LabResult, VitalSign, MedicationAdmin
from ..config import settings
class SlidingWindowState:
"""Maintains a sliding window of recent patient events.
Events older than window_duration are automatically pruned.
Provides efficient lookups by patient_id and event_type.
"""
def __init__(self, window_hours: int = None):
self.window_hours = window_hours or settings.window_duration_hours
self._events: Dict[str, List[PatientEvent]] = defaultdict(list)
self._lock = asyncio.Lock()
async def add_event(self, event: PatientEvent) -> None:
"""Add an event to the window."""
async with self._lock:
self._events[event.patient_id].append(event)
async def get_patient_events(
self,
patient_id: str,
event_types: Optional[List[str]] = None,
since: Optional[datetime] = None
) -> List[PatientEvent]:
"""Get events for a patient, optionally filtered."""
async with self._lock:
events = self._events.get(patient_id, [])
if since:
events = [e for e in events if e.timestamp >= since]
if event_types:
events = [e for e in events if e.event_type in event_types]
return sorted(events, key=lambda e: e.timestamp)
async def get_recent_medications(
self,
patient_id: str,
hours_back: int = 72
) -> List[MedicationAdmin]:
"""Get recent medication administrations for a patient."""
since = datetime.utcnow() - timedelta(hours=hours_back)
events = await self.get_patient_events(
patient_id,
event_types=["medication_admin"],
since=since
)
return [e.event for e in events if isinstance(e.event, MedicationAdmin)]
async def get_recent_labs(
self,
patient_id: str,
hours_back: int = 24
) -> List[LabResult]:
"""Get recent lab results for a patient."""
since = datetime.utcnow() - timedelta(hours=hours_back)
events = await self.get_patient_events(
patient_id,
event_types=["lab_result"],
since=since
)
return [e.event for e in events if isinstance(e.event, LabResult)]
async def get_recent_vitals(
self,
patient_id: str,
hours_back: int = 24
) -> List[VitalSign]:
"""Get recent vital signs for a patient."""
since = datetime.utcnow() - timedelta(hours=hours_back)
events = await self.get_patient_events(
patient_id,
event_types=["vital_sign"],
since=since
)
return [e.event for e in events if isinstance(e.event, VitalSign)]
async def cleanup_old_events(self) -> int:
"""Remove events older than the window duration."""
cutoff = datetime.utcnow() - timedelta(hours=self.window_hours)
removed_count = 0
async with self._lock:
for patient_id in list(self._events.keys()):
original_count = len(self._events[patient_id])
self._events[patient_id] = [
e for e in self._events[patient_id]
if e.timestamp >= cutoff
]
removed_count += original_count - len(self._events[patient_id])
# Remove empty patient entries
if not self._events[patient_id]:
del self._events[patient_id]
return removed_count
async def get_stats(self) -> Dict:
"""Get window statistics."""
async with self._lock:
total_events = sum(len(events) for events in self._events.values())
return {
"patient_count": len(self._events),
"total_events": total_events,
"window_hours": self.window_hours
}
# Global window instance
event_window = SlidingWindowState()Base Detector Interface
# src/detectors/base.py
from abc import ABC, abstractmethod
from typing import List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from ..events.models import PatientEvent
from ..events.window import SlidingWindowState
class DetectionSignal(BaseModel):
"""A signal from a detector indicating potential ADE."""
detector_name: str
patient_id: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
confidence: float = Field(ge=0.0, le=1.0)
signal_type: str # e.g., "lab_anomaly", "vital_trend", "temporal_correlation"
description: str
evidence: List[str] = Field(default_factory=list)
suspected_medication: Optional[str] = None
suspected_reaction: Optional[str] = None
severity_hint: Optional[str] = None # LOW, MEDIUM, HIGH, CRITICAL
class BaseDetector(ABC):
"""Abstract base class for ADE detectors."""
def __init__(self, window: SlidingWindowState):
self.window = window
self.name = self.__class__.__name__
@abstractmethod
async def detect(self, event: PatientEvent) -> List[DetectionSignal]:
"""Process an event and return any detection signals.
Args:
event: The incoming patient event to analyze
Returns:
List of detection signals (empty if no ADE suspected)
"""
pass
@property
@abstractmethod
def signal_type(self) -> str:
"""Return the type of signal this detector produces."""
passLab Anomaly Detector
# src/detectors/lab_anomaly.py
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from .base import BaseDetector, DetectionSignal
from ..events.models import PatientEvent, LabResult, MedicationAdmin
from ..events.window import SlidingWindowState
from ..config import settings
# Known drug-lab associations (medication → affected lab → expected change)
DRUG_LAB_ASSOCIATIONS: Dict[str, Dict[str, str]] = {
"metformin": {"creatinine": "high", "lactate": "high"},
"lisinopril": {"potassium": "high", "creatinine": "high"},
"furosemide": {"potassium": "low", "sodium": "low", "creatinine": "high"},
"warfarin": {"inr": "high", "pt": "high"},
"heparin": {"platelets": "low", "ptt": "high"},
"vancomycin": {"creatinine": "high"},
"gentamicin": {"creatinine": "high"},
"methotrexate": {"wbc": "low", "platelets": "low", "alt": "high", "ast": "high"},
"acetaminophen": {"alt": "high", "ast": "high"},
"atorvastatin": {"alt": "high", "ast": "high", "ck": "high"},
"amiodarone": {"tsh": "high", "alt": "high"},
"lithium": {"creatinine": "high", "tsh": "high"},
"carbamazepine": {"sodium": "low", "wbc": "low"},
"clozapine": {"wbc": "low", "anc": "low"},
}
# Critical lab thresholds that always warrant attention
CRITICAL_THRESHOLDS = {
"potassium": {"critical_low": 2.5, "critical_high": 6.5},
"sodium": {"critical_low": 120, "critical_high": 160},
"glucose": {"critical_low": 40, "critical_high": 500},
"creatinine": {"critical_high": 10.0},
"inr": {"critical_high": 5.0},
"platelets": {"critical_low": 20},
"wbc": {"critical_low": 1.0},
"hemoglobin": {"critical_low": 6.0},
}
class LabAnomalyDetector(BaseDetector):
"""Detects lab abnormalities that may indicate adverse drug events."""
@property
def signal_type(self) -> str:
return "lab_anomaly"
async def detect(self, event: PatientEvent) -> List[DetectionSignal]:
"""Analyze lab result for potential drug-induced abnormality."""
if event.event_type != "lab_result":
return []
lab: LabResult = event.event
signals = []
# Skip if lab is within normal limits
if not lab.is_abnormal and not self._is_critical(lab):
return []
# Get recent medications for this patient
recent_meds = await self.window.get_recent_medications(
lab.patient_id,
hours_back=72
)
if not recent_meds:
# No recent meds, but still flag critical values
if self._is_critical(lab):
signals.append(self._create_critical_signal(lab))
return signals
# Check for drug-lab associations
for med in recent_meds:
med_name_lower = med.medication_name.lower()
# Check known associations
for drug_pattern, lab_effects in DRUG_LAB_ASSOCIATIONS.items():
if drug_pattern in med_name_lower:
lab_name_lower = lab.test_name.lower()
for affected_lab, expected_direction in lab_effects.items():
if affected_lab in lab_name_lower:
if lab.abnormality_direction == expected_direction:
confidence = self._calculate_confidence(
lab, med, expected_direction
)
if confidence >= settings.lab_anomaly_threshold:
signals.append(DetectionSignal(
detector_name=self.name,
patient_id=lab.patient_id,
confidence=confidence,
signal_type=self.signal_type,
description=(
f"Abnormal {lab.test_name} ({lab.value} {lab.unit}) "
f"may be associated with {med.medication_name}"
),
evidence=[
f"Lab: {lab.test_name} = {lab.value} {lab.unit}",
f"Reference: {lab.reference_low}-{lab.reference_high}",
f"Direction: {expected_direction}",
f"Medication: {med.medication_name} "
f"administered {self._time_since(med.timestamp)}"
],
suspected_medication=med.medication_name,
suspected_reaction=f"{lab.test_name}_{expected_direction}",
severity_hint=self._assess_severity(lab)
))
# Always flag critical values even without medication association
if self._is_critical(lab) and not signals:
signals.append(self._create_critical_signal(lab))
return signals
def _is_critical(self, lab: LabResult) -> bool:
"""Check if lab value is in critical range."""
lab_name = lab.test_name.lower()
for critical_lab, thresholds in CRITICAL_THRESHOLDS.items():
if critical_lab in lab_name:
if "critical_low" in thresholds and lab.value < thresholds["critical_low"]:
return True
if "critical_high" in thresholds and lab.value > thresholds["critical_high"]:
return True
return lab.is_critical
def _calculate_confidence(
self,
lab: LabResult,
med: MedicationAdmin,
expected_direction: str
) -> float:
"""Calculate confidence score for the association."""
confidence = 0.5 # Base confidence
# Increase confidence if direction matches expectation
if lab.abnormality_direction == expected_direction:
confidence += 0.2
# Increase confidence for critical values
if self._is_critical(lab):
confidence += 0.2
# Adjust based on timing (closer to med admin = higher confidence)
hours_since_med = (lab.timestamp - med.timestamp).total_seconds() / 3600
if hours_since_med < 24:
confidence += 0.1
elif hours_since_med > 72:
confidence -= 0.1
return min(confidence, 1.0)
def _assess_severity(self, lab: LabResult) -> str:
"""Assess severity based on lab value."""
if self._is_critical(lab):
return "HIGH"
if lab.is_abnormal:
return "MEDIUM"
return "LOW"
def _create_critical_signal(self, lab: LabResult) -> DetectionSignal:
"""Create signal for critical lab without medication association."""
return DetectionSignal(
detector_name=self.name,
patient_id=lab.patient_id,
confidence=0.9,
signal_type=self.signal_type,
description=f"CRITICAL: {lab.test_name} = {lab.value} {lab.unit}",
evidence=[
f"Lab: {lab.test_name} = {lab.value} {lab.unit}",
f"Reference: {lab.reference_low}-{lab.reference_high}",
"Value is in critical range"
],
suspected_reaction=f"critical_{lab.test_name.lower()}",
severity_hint="CRITICAL"
)
def _time_since(self, timestamp: datetime) -> str:
"""Format time since timestamp."""
delta = datetime.utcnow() - timestamp
hours = delta.total_seconds() / 3600
if hours < 1:
return f"{int(delta.total_seconds() / 60)} minutes ago"
elif hours < 24:
return f"{int(hours)} hours ago"
else:
return f"{int(hours / 24)} days ago"Lab Anomaly Detection Logic:
┌─────────────────────────────────────────────────────────────────────┐
│ LAB ANOMALY DETECTION FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Incoming Lab Result │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Is value abnormal OR critical? │ │
│ └───────────────────────┬──────────────────────────────────────┘ │
│ NO │ YES │
│ │ │ │
│ ▼ ▼ │
│ SKIP │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Get medications administered in last 72 hours │ │
│ └───────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ For each medication, check DRUG_LAB_ASSOCIATIONS table: │ │
│ │ │ │
│ │ furosemide → {potassium: low, sodium: low, creatinine: high}│ │
│ │ │ │
│ │ If patient's lab matches expected direction... │ │
│ │ → Generate signal with confidence score │ │
│ └───────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Confidence calculation: │ │
│ │ Base: 0.5 │ │
│ │ + 0.2 if direction matches │ │
│ │ + 0.2 if critical value │ │
│ │ + 0.1 if within 24h of medication │ │
│ │ - 0.1 if beyond 72h │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘Vital Trend Detector
# src/detectors/vital_trend.py
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from .base import BaseDetector, DetectionSignal
from ..events.models import PatientEvent, VitalSign, MedicationAdmin
from ..events.window import SlidingWindowState
from ..config import settings
# Drug classes known to affect vitals
DRUG_VITAL_EFFECTS: Dict[str, Dict[str, str]] = {
# Antihypertensives
"lisinopril": {"bp": "low"},
"amlodipine": {"bp": "low"},
"metoprolol": {"bp": "low", "hr": "low"},
"carvedilol": {"bp": "low", "hr": "low"},
"hydralazine": {"bp": "low", "hr": "high"},
# Opioids
"morphine": {"bp": "low", "rr": "low", "spo2": "low"},
"fentanyl": {"bp": "low", "rr": "low", "spo2": "low"},
"hydromorphone": {"bp": "low", "rr": "low", "spo2": "low"},
"oxycodone": {"rr": "low", "spo2": "low"},
# Sedatives
"lorazepam": {"bp": "low", "rr": "low"},
"midazolam": {"bp": "low", "rr": "low"},
"propofol": {"bp": "low", "rr": "low"},
# Stimulants / anticholinergics
"albuterol": {"hr": "high"},
"diphenhydramine": {"hr": "high"},
"atropine": {"hr": "high"},
# Others
"insulin": {"hr": "high"}, # Secondary to hypoglycemia
"levothyroxine": {"hr": "high"},
}
# Critical vital thresholds
CRITICAL_VITALS = {
"bp_systolic": {"critical_low": 80, "critical_high": 200},
"hr": {"critical_low": 40, "critical_high": 150},
"rr": {"critical_low": 8, "critical_high": 30},
"spo2": {"critical_low": 88},
"temp": {"critical_low": 35.0, "critical_high": 40.0},
}
class VitalTrendDetector(BaseDetector):
"""Detects vital sign trends that may indicate adverse drug events."""
@property
def signal_type(self) -> str:
return "vital_trend"
async def detect(self, event: PatientEvent) -> List[DetectionSignal]:
"""Analyze vital sign for drug-induced changes."""
if event.event_type != "vital_sign":
return []
vital: VitalSign = event.event
signals = []
# Parse vital value
vital_type, numeric_value = self._parse_vital(vital)
if numeric_value is None:
return []
# Check for critical value first
is_critical, direction = self._check_critical(vital_type, numeric_value)
# Get recent vitals to detect trend
recent_vitals = await self.window.get_recent_vitals(
vital.patient_id,
hours_back=24
)
# Calculate trend
trend = self._calculate_trend(vital_type, numeric_value, recent_vitals)
# Get recent medications
recent_meds = await self.window.get_recent_medications(
vital.patient_id,
hours_back=48
)
# Check for drug-vital associations
for med in recent_meds:
med_name_lower = med.medication_name.lower()
for drug_pattern, vital_effects in DRUG_VITAL_EFFECTS.items():
if drug_pattern in med_name_lower:
if vital_type in vital_effects:
expected_direction = vital_effects[vital_type]
# Check if actual direction matches expected
actual_direction = "low" if direction == "low" or trend == "decreasing" else "high"
if trend == "stable":
actual_direction = direction # Use critical direction if available
if actual_direction == expected_direction:
confidence = self._calculate_confidence(
is_critical, trend, med, vital.timestamp
)
if confidence >= settings.vital_trend_threshold:
signals.append(DetectionSignal(
detector_name=self.name,
patient_id=vital.patient_id,
confidence=confidence,
signal_type=self.signal_type,
description=(
f"{vital_type.upper()} {trend} ({numeric_value}) "
f"may be associated with {med.medication_name}"
),
evidence=[
f"Vital: {vital_type} = {numeric_value}",
f"Trend: {trend} over last 24h",
f"Medication: {med.medication_name}",
f"Expected effect: {expected_direction} {vital_type}"
],
suspected_medication=med.medication_name,
suspected_reaction=f"{vital_type}_{actual_direction}",
severity_hint="CRITICAL" if is_critical else "MEDIUM"
))
# Flag critical vitals without medication association
if is_critical and not signals:
signals.append(DetectionSignal(
detector_name=self.name,
patient_id=vital.patient_id,
confidence=0.85,
signal_type=self.signal_type,
description=f"CRITICAL: {vital_type} = {numeric_value}",
evidence=[
f"Vital: {vital_type} = {numeric_value}",
f"Direction: {direction}",
"Value in critical range"
],
suspected_reaction=f"critical_{vital_type}_{direction}",
severity_hint="CRITICAL"
))
return signals
def _parse_vital(self, vital: VitalSign) -> tuple[str, Optional[float]]:
"""Parse vital type and numeric value."""
vital_type = vital.vital_type.lower()
# Handle blood pressure specially (use systolic)
if vital_type in ["bp", "blood_pressure"]:
try:
systolic = float(vital.value.split("/")[0])
return "bp_systolic", systolic
except (ValueError, IndexError):
return vital_type, None
# Use pre-parsed numeric value or parse from string
if vital.numeric_value is not None:
return vital_type, vital.numeric_value
try:
return vital_type, float(vital.value.replace("%", ""))
except ValueError:
return vital_type, None
def _check_critical(
self,
vital_type: str,
value: float
) -> tuple[bool, Optional[str]]:
"""Check if vital is in critical range."""
thresholds = CRITICAL_VITALS.get(vital_type, {})
if "critical_low" in thresholds and value < thresholds["critical_low"]:
return True, "low"
if "critical_high" in thresholds and value > thresholds["critical_high"]:
return True, "high"
return False, None
def _calculate_trend(
self,
vital_type: str,
current_value: float,
recent_vitals: List[VitalSign]
) -> str:
"""Calculate trend from recent vitals."""
# Filter to same vital type
same_type_vitals = []
for v in recent_vitals:
v_type, v_value = self._parse_vital(v)
if v_type == vital_type and v_value is not None:
same_type_vitals.append((v.timestamp, v_value))
if len(same_type_vitals) < 2:
return "stable"
# Sort by timestamp
same_type_vitals.sort(key=lambda x: x[0])
# Calculate average of first half vs second half
mid = len(same_type_vitals) // 2
first_half_avg = sum(v[1] for v in same_type_vitals[:mid]) / mid
second_half_avg = sum(v[1] for v in same_type_vitals[mid:]) / (len(same_type_vitals) - mid)
# Determine trend with 10% threshold
change_pct = (second_half_avg - first_half_avg) / first_half_avg if first_half_avg != 0 else 0
if change_pct > 0.1:
return "increasing"
elif change_pct < -0.1:
return "decreasing"
return "stable"
def _calculate_confidence(
self,
is_critical: bool,
trend: str,
med: MedicationAdmin,
vital_timestamp: datetime
) -> float:
"""Calculate confidence score."""
confidence = 0.4
if is_critical:
confidence += 0.3
if trend in ["increasing", "decreasing"]:
confidence += 0.15
# Timing factor
hours_since_med = (vital_timestamp - med.timestamp).total_seconds() / 3600
if hours_since_med < 6:
confidence += 0.15
elif hours_since_med < 24:
confidence += 0.1
return min(confidence, 1.0)Temporal Correlation Detector
# src/detectors/temporal.py
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from .base import BaseDetector, DetectionSignal
from ..events.models import PatientEvent, LabResult, VitalSign, MedicationAdmin
from ..events.window import SlidingWindowState
from ..config import settings
# Expected onset windows for different drug-event types (hours)
ONSET_WINDOWS: Dict[str, Dict[str, tuple]] = {
"anaphylaxis": {"min_hours": 0, "max_hours": 2},
"infusion_reaction": {"min_hours": 0, "max_hours": 4},
"hypotension": {"min_hours": 0.5, "max_hours": 6},
"bradycardia": {"min_hours": 0.5, "max_hours": 6},
"respiratory_depression": {"min_hours": 0.5, "max_hours": 8},
"nephrotoxicity": {"min_hours": 24, "max_hours": 72},
"hepatotoxicity": {"min_hours": 48, "max_hours": 168},
"thrombocytopenia": {"min_hours": 72, "max_hours": 168},
"neutropenia": {"min_hours": 72, "max_hours": 336},
}
# Map lab/vital abnormalities to reaction types
ABNORMALITY_TO_REACTION = {
"creatinine_high": "nephrotoxicity",
"alt_high": "hepatotoxicity",
"ast_high": "hepatotoxicity",
"platelets_low": "thrombocytopenia",
"wbc_low": "neutropenia",
"bp_systolic_low": "hypotension",
"hr_low": "bradycardia",
"hr_high": "tachycardia",
"rr_low": "respiratory_depression",
"spo2_low": "respiratory_depression",
}
class TemporalCorrelationDetector(BaseDetector):
"""Detects temporal correlations between medication administration and adverse events.
This detector analyzes the timing between when a medication was given and
when an abnormality occurred. If the timing falls within the expected
onset window for a known adverse event type, a signal is generated.
"""
@property
def signal_type(self) -> str:
return "temporal_correlation"
async def detect(self, event: PatientEvent) -> List[DetectionSignal]:
"""Detect temporal correlations with medication timing."""
signals = []
# Determine the reaction type from this event
reaction_type = self._event_to_reaction(event)
if not reaction_type:
return []
# Get expected onset window
onset_window = ONSET_WINDOWS.get(reaction_type)
if not onset_window:
# Use default window for unknown reaction types
onset_window = {"min_hours": 0, "max_hours": 72}
# Get medications administered before this event
event_time = event.timestamp
lookback_hours = onset_window["max_hours"] + 6 # Add buffer
recent_meds = await self.window.get_recent_medications(
event.patient_id,
hours_back=int(lookback_hours)
)
for med in recent_meds:
hours_since_med = (event_time - med.timestamp).total_seconds() / 3600
# Check if timing falls within expected onset window
if onset_window["min_hours"] <= hours_since_med <= onset_window["max_hours"]:
confidence = self._calculate_confidence(
hours_since_med,
onset_window,
reaction_type
)
if confidence >= settings.temporal_correlation_threshold:
signals.append(DetectionSignal(
detector_name=self.name,
patient_id=event.patient_id,
confidence=confidence,
signal_type=self.signal_type,
description=(
f"Temporal correlation: {reaction_type} occurred "
f"{hours_since_med:.1f}h after {med.medication_name}"
),
evidence=[
f"Event: {reaction_type}",
f"Medication: {med.medication_name}",
f"Time since administration: {hours_since_med:.1f} hours",
f"Expected onset window: {onset_window['min_hours']}-"
f"{onset_window['max_hours']} hours",
f"Route: {med.route}"
],
suspected_medication=med.medication_name,
suspected_reaction=reaction_type,
severity_hint=self._reaction_severity(reaction_type)
))
return signals
def _event_to_reaction(self, event: PatientEvent) -> Optional[str]:
"""Map an event to a reaction type."""
if event.event_type == "lab_result":
lab: LabResult = event.event
if lab.is_abnormal:
key = f"{lab.test_name.lower()}_{lab.abnormality_direction}"
return ABNORMALITY_TO_REACTION.get(key)
elif event.event_type == "vital_sign":
vital: VitalSign = event.event
vital_type = vital.vital_type.lower()
# Parse and check against thresholds
try:
if vital_type in ["bp", "blood_pressure"]:
systolic = float(vital.value.split("/")[0])
if systolic < 90:
return "hypotension"
elif vital_type == "hr":
hr = float(vital.value)
if hr < 50:
return "bradycardia"
elif hr > 120:
return "tachycardia"
elif vital_type == "rr":
rr = float(vital.value)
if rr < 10:
return "respiratory_depression"
elif vital_type == "spo2":
spo2 = float(vital.value.replace("%", ""))
if spo2 < 92:
return "respiratory_depression"
except ValueError:
pass
return None
def _calculate_confidence(
self,
hours_since_med: float,
onset_window: Dict,
reaction_type: str
) -> float:
"""Calculate confidence based on how well timing matches expected window."""
min_h = onset_window["min_hours"]
max_h = onset_window["max_hours"]
optimal_h = (min_h + max_h) / 2
# Higher confidence when timing is closer to typical onset
distance_from_optimal = abs(hours_since_med - optimal_h)
window_size = max_h - min_h
normalized_distance = distance_from_optimal / (window_size / 2) if window_size > 0 else 0
# Base confidence decreases with distance from optimal
confidence = 0.8 - (normalized_distance * 0.3)
# Boost confidence for well-documented reactions
if reaction_type in ["nephrotoxicity", "hepatotoxicity", "hypotension"]:
confidence += 0.1
return max(0.3, min(confidence, 1.0))
def _reaction_severity(self, reaction_type: str) -> str:
"""Assess severity of reaction type."""
critical = ["anaphylaxis", "respiratory_depression"]
high = ["hypotension", "bradycardia", "nephrotoxicity", "hepatotoxicity"]
medium = ["thrombocytopenia", "neutropenia", "tachycardia"]
if reaction_type in critical:
return "CRITICAL"
elif reaction_type in high:
return "HIGH"
elif reaction_type in medium:
return "MEDIUM"
return "LOW"Temporal Correlation Concept:
┌─────────────────────────────────────────────────────────────────────┐
│ TEMPORAL CORRELATION: LINKING MEDICATION TO OUTCOME │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ The key insight: ADEs have characteristic onset windows │
│ │
│ Timeline: │
│ ───────────────────────────────────────────────────────────────── │
│ T₀ T₁ │
│ │ │ │
│ ▼ ▼ │
│ Medication Abnormal │
│ Administered Lab/Vital │
│ │
│ If (T₁ - T₀) falls within expected onset window → SIGNAL │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Reaction Type │ Onset Window │ Example │ │
│ ├─────────────────────────────────────────────────────────────┤ │
│ │ Anaphylaxis │ 0-2 hours │ Penicillin → hives │ │
│ │ Hypotension │ 0.5-6 hours │ Beta-blocker → low BP│ │
│ │ Nephrotoxicity │ 24-72 hours │ Vancomycin → ↑Cr │ │
│ │ Hepatotoxicity │ 48-168 hours │ Acetaminophen → ↑ALT │ │
│ │ Thrombocytopenia │ 72-168 hours │ Heparin → ↓platelets │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ Confidence is HIGHEST when timing matches the "sweet spot" of │
│ the expected window, and decreases toward the edges. │
│ │
└─────────────────────────────────────────────────────────────────────┘Known Pattern Matcher (OpenFDA FAERS)
# src/detectors/known_pattern.py
from typing import List, Dict, Optional
from datetime import datetime
import httpx
from .base import BaseDetector, DetectionSignal
from ..events.models import PatientEvent, LabResult, VitalSign, MedicationAdmin
from ..events.window import SlidingWindowState
from ..config import settings
# Cache for OpenFDA results (simple in-memory cache)
_openfda_cache: Dict[str, Dict] = {}
class KnownPatternMatcher(BaseDetector):
"""Matches events against known adverse event patterns from OpenFDA FAERS.
FAERS (FDA Adverse Event Reporting System) contains millions of reports
of suspected adverse drug reactions. This detector queries FAERS to
check if the observed drug-reaction combination is a known pattern.
"""
def __init__(self, window: SlidingWindowState):
super().__init__(window)
self.base_url = settings.openfda_base_url
self.api_key = settings.openfda_api_key
@property
def signal_type(self) -> str:
return "known_pattern"
async def detect(self, event: PatientEvent) -> List[DetectionSignal]:
"""Check if event matches known adverse event patterns."""
signals = []
# Get the reaction term from this event
reaction_term = self._extract_reaction_term(event)
if not reaction_term:
return []
# Get recent medications
recent_meds = await self.window.get_recent_medications(
event.patient_id,
hours_back=72
)
for med in recent_meds:
# Query OpenFDA for this drug-reaction combination
report_count = await self._query_faers(
med.medication_name,
reaction_term
)
if report_count > 0:
confidence = self._calculate_confidence(report_count)
if confidence >= settings.known_pattern_threshold:
signals.append(DetectionSignal(
detector_name=self.name,
patient_id=event.patient_id,
confidence=confidence,
signal_type=self.signal_type,
description=(
f"Known ADE pattern: {med.medication_name} → {reaction_term} "
f"({report_count} FAERS reports)"
),
evidence=[
f"Medication: {med.medication_name}",
f"Reaction: {reaction_term}",
f"FAERS report count: {report_count}",
"Source: FDA Adverse Event Reporting System"
],
suspected_medication=med.medication_name,
suspected_reaction=reaction_term,
severity_hint=self._assess_faers_severity(report_count)
))
return signals
def _extract_reaction_term(self, event: PatientEvent) -> Optional[str]:
"""Extract MedDRA-style reaction term from event."""
# Map common lab/vital findings to MedDRA preferred terms
if event.event_type == "lab_result":
lab: LabResult = event.event
if lab.is_abnormal:
lab_name = lab.test_name.lower()
direction = lab.abnormality_direction
# Map to MedDRA terms
mappings = {
("creatinine", "high"): "Blood creatinine increased",
("alt", "high"): "Alanine aminotransferase increased",
("ast", "high"): "Aspartate aminotransferase increased",
("potassium", "high"): "Hyperkalaemia",
("potassium", "low"): "Hypokalaemia",
("sodium", "low"): "Hyponatraemia",
("platelets", "low"): "Thrombocytopenia",
("wbc", "low"): "Leukopenia",
("hemoglobin", "low"): "Anaemia",
("glucose", "high"): "Hyperglycaemia",
("glucose", "low"): "Hypoglycaemia",
}
for (lab_key, dir_key), term in mappings.items():
if lab_key in lab_name and dir_key == direction:
return term
elif event.event_type == "vital_sign":
vital: VitalSign = event.event
vital_type = vital.vital_type.lower()
try:
if vital_type in ["bp", "blood_pressure"]:
systolic = float(vital.value.split("/")[0])
if systolic < 90:
return "Hypotension"
elif systolic > 180:
return "Hypertension"
elif vital_type == "hr":
hr = float(vital.value)
if hr < 50:
return "Bradycardia"
elif hr > 100:
return "Tachycardia"
elif vital_type == "spo2":
spo2 = float(vital.value.replace("%", ""))
if spo2 < 92:
return "Oxygen saturation decreased"
except ValueError:
pass
return None
async def _query_faers(
self,
drug_name: str,
reaction_term: str
) -> int:
"""Query OpenFDA FAERS for drug-reaction co-reports."""
cache_key = f"{drug_name.lower()}|{reaction_term.lower()}"
# Check cache first
if cache_key in _openfda_cache:
return _openfda_cache[cache_key].get("count", 0)
try:
# Build search query
# Search for reports where both drug and reaction are present
search = (
f'patient.drug.openfda.generic_name:"{drug_name}"+'
f'patient.reaction.reactionmeddrapt:"{reaction_term}"'
)
params = {"search": search, "limit": 1}
if self.api_key:
params["api_key"] = self.api_key
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/drug/event.json",
params=params,
timeout=10.0
)
if response.status_code == 200:
data = response.json()
count = data.get("meta", {}).get("results", {}).get("total", 0)
_openfda_cache[cache_key] = {"count": count}
return count
elif response.status_code == 404:
# No reports found
_openfda_cache[cache_key] = {"count": 0}
return 0
except Exception:
pass
return 0
def _calculate_confidence(self, report_count: int) -> float:
"""Calculate confidence based on FAERS report count."""
# More reports = higher confidence this is a real signal
if report_count >= 1000:
return 0.95
elif report_count >= 500:
return 0.90
elif report_count >= 100:
return 0.85
elif report_count >= 50:
return 0.80
elif report_count >= 10:
return 0.70
elif report_count >= 1:
return 0.60
return 0.0
def _assess_faers_severity(self, report_count: int) -> str:
"""Assess severity hint based on report count."""
if report_count >= 500:
return "HIGH" # Well-documented adverse event
elif report_count >= 50:
return "MEDIUM"
return "LOW"Understanding OpenFDA FAERS:
┌─────────────────────────────────────────────────────────────────────┐
│ OPENFDA FAERS: WHY REPORT COUNT MATTERS │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ FAERS contains ~20 million adverse event reports │
│ Each report links: Drug(s) → Reaction(s) → Outcome │
│ │
│ Query: "How many reports have [drug] AND [reaction]?" │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Report Count │ Interpretation │ │
│ ├─────────────────────────────────────────────────────────────┤ │
│ │ 1000+ │ Well-established ADE, likely in drug label │ │
│ │ 100-999 │ Known signal, may be in warnings │ │
│ │ 10-99 │ Emerging signal, warrants attention │ │
│ │ 1-9 │ Rare reports, correlation uncertain │ │
│ │ 0 │ Not reported (but doesn't mean safe!) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ Example queries: │
│ • "metformin" + "Lactic acidosis" → ~2,500 reports (known!) │
│ • "vancomycin" + "Blood creatinine increased" → ~1,200 reports │
│ • "lisinopril" + "Hyperkalaemia" → ~800 reports │
│ │
│ API: https://api.fda.gov/drug/event.json │
│ Rate limit: 40/min without key, 240/min with key │
│ │
└─────────────────────────────────────────────────────────────────────┘Supervisor Agent
# src/agents/supervisor.py
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from ..detectors.base import DetectionSignal
from ..config import settings
class SupervisorDecision(BaseModel):
"""Supervisor's decision after reviewing detection signals."""
should_alert: bool = Field(description="Whether to generate an alert")
final_severity: str = Field(description="Final severity: LOW, MEDIUM, HIGH, CRITICAL")
primary_suspicion: str = Field(description="Most likely adverse event")
reasoning: str = Field(description="Brief explanation of decision")
recommended_actions: List[str] = Field(
default_factory=list,
description="Recommended clinical actions"
)
class SupervisorAgent:
"""Supervisor agent that reviews and adjudicates detection signals.
The supervisor uses clinical reasoning to:
1. Evaluate concordance across multiple detectors
2. Assess overall signal strength
3. Determine appropriate escalation level
4. Recommend clinical actions
"""
def __init__(self):
self.llm = ChatOpenAI(
model=settings.openai_model,
api_key=settings.openai_api_key,
temperature=settings.temperature
).with_structured_output(SupervisorDecision)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are a clinical pharmacovigilance supervisor AI reviewing
detection signals for potential adverse drug events (ADEs).
Your role is to:
1. Evaluate the strength and concordance of signals from multiple detectors
2. Apply clinical judgment to determine if an alert is warranted
3. Assess appropriate severity level
4. Recommend clinical actions
Severity levels:
- CRITICAL: Immediate threat to life, requires urgent intervention
- HIGH: Serious ADE requiring prompt attention (within 1 hour)
- MEDIUM: Clinically significant, routine review needed
- LOW: Minor finding, log for trending
Consider:
- Multiple concordant signals (same drug, same reaction) increase confidence
- Known patterns (high FAERS counts) are more reliable
- Temporal correlation within expected windows is significant
- Critical lab/vital values always warrant attention regardless of medication link
IMPORTANT: You are a decision support tool. Final clinical decisions must be made
by qualified healthcare professionals."""),
("human", """Patient ID: {patient_id}
Detection Signals:
{signals}
Recent Medications:
{medications}
Review these signals and make a decision.""")
])
async def review_signals(
self,
patient_id: str,
signals: List[DetectionSignal],
recent_medications: List[str]
) -> SupervisorDecision:
"""Review detection signals and make a decision."""
if not signals:
return SupervisorDecision(
should_alert=False,
final_severity="LOW",
primary_suspicion="None",
reasoning="No detection signals to review",
recommended_actions=[]
)
# Format signals for prompt
signals_text = "\n".join([
f"- [{s.detector_name}] {s.description}\n"
f" Confidence: {s.confidence:.2f}\n"
f" Suspected: {s.suspected_medication} → {s.suspected_reaction}\n"
f" Severity hint: {s.severity_hint}\n"
f" Evidence: {', '.join(s.evidence[:3])}"
for s in signals
])
chain = self.prompt | self.llm
result = await chain.ainvoke({
"patient_id": patient_id,
"signals": signals_text,
"medications": ", ".join(recent_medications) if recent_medications else "None recorded"
})
return result
def quick_triage(self, signals: List[DetectionSignal]) -> str:
"""Quick rule-based triage without LLM call.
Used for obvious cases to reduce latency.
"""
if not signals:
return "SKIP"
# Any CRITICAL signal → escalate immediately
if any(s.severity_hint == "CRITICAL" for s in signals):
return "CRITICAL"
# Multiple HIGH signals → escalate
high_count = sum(1 for s in signals if s.severity_hint == "HIGH")
if high_count >= 2:
return "HIGH"
# High confidence known pattern → escalate
known_pattern_signals = [
s for s in signals
if s.signal_type == "known_pattern" and s.confidence >= 0.9
]
if known_pattern_signals:
return "HIGH"
# Multiple concordant signals (same drug, same reaction)
drug_reaction_pairs = [
(s.suspected_medication, s.suspected_reaction)
for s in signals
if s.suspected_medication and s.suspected_reaction
]
if len(drug_reaction_pairs) != len(set(drug_reaction_pairs)):
return "MEDIUM" # Duplicates mean concordance
# Single signal with high confidence
if any(s.confidence >= 0.85 for s in signals):
return "MEDIUM"
return "LOW"Signal Correlator Agent
# src/agents/correlator.py
from typing import List, Dict, Optional
from collections import defaultdict
from ..detectors.base import DetectionSignal
class SignalCorrelator:
"""Correlates and fuses signals from multiple detectors.
Looks for concordant signals (multiple detectors pointing to the same
drug-reaction pair) which increases confidence in the finding.
"""
def correlate(
self,
signals: List[DetectionSignal]
) -> List[DetectionSignal]:
"""Correlate signals and boost confidence for concordant findings."""
if len(signals) <= 1:
return signals
# Group signals by (suspected_medication, suspected_reaction)
grouped: Dict[tuple, List[DetectionSignal]] = defaultdict(list)
ungrouped: List[DetectionSignal] = []
for signal in signals:
if signal.suspected_medication and signal.suspected_reaction:
key = (
signal.suspected_medication.lower(),
signal.suspected_reaction.lower()
)
grouped[key].append(signal)
else:
ungrouped.append(signal)
correlated_signals = []
# Process grouped signals
for (drug, reaction), group in grouped.items():
if len(group) == 1:
correlated_signals.append(group[0])
else:
# Multiple detectors found the same drug-reaction pair
# Create a merged signal with boosted confidence
merged = self._merge_signals(group, drug, reaction)
correlated_signals.append(merged)
# Add ungrouped signals
correlated_signals.extend(ungrouped)
return correlated_signals
def _merge_signals(
self,
signals: List[DetectionSignal],
drug: str,
reaction: str
) -> DetectionSignal:
"""Merge concordant signals into a single boosted signal."""
# Combine evidence from all signals
all_evidence = []
detector_names = []
for s in signals:
detector_names.append(s.detector_name)
all_evidence.extend(s.evidence)
# Boost confidence based on concordance
max_confidence = max(s.confidence for s in signals)
concordance_boost = 0.1 * (len(signals) - 1) # +0.1 per additional signal
boosted_confidence = min(max_confidence + concordance_boost, 0.99)
# Use highest severity hint
severity_order = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
highest_severity = max(
signals,
key=lambda s: severity_order.get(s.severity_hint or "LOW", 0)
).severity_hint
return DetectionSignal(
detector_name=f"Correlated({', '.join(detector_names)})",
patient_id=signals[0].patient_id,
confidence=boosted_confidence,
signal_type="correlated",
description=(
f"CONCORDANT SIGNAL: {len(signals)} detectors identified "
f"{drug} → {reaction}"
),
evidence=list(dict.fromkeys(all_evidence)), # Deduplicate
suspected_medication=drug,
suspected_reaction=reaction,
severity_hint=highest_severity
)Signal Correlation Logic:
┌─────────────────────────────────────────────────────────────────────┐
│ SIGNAL CORRELATION: WHY CONCORDANCE MATTERS │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Single detector = isolated finding │
│ Multiple detectors, same finding = CONCORDANT SIGNAL │
│ │
│ Example: Patient on vancomycin │
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Detector │ Finding │ │
│ ├───────────────────────────────────────────────────────────────┤ │
│ │ LabAnomalyDetector │ Creatinine ↑, vancomycin suspected │ │
│ │ TemporalCorrelation │ ↑Cr occurred 48h after vancomycin │ │
│ │ KnownPatternMatcher │ vancomycin + nephrotoxicity: 1200 rpts│ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ 3 detectors → same drug (vancomycin) → same reaction (nephrotox) │
│ = HIGH CONCORDANCE │
│ │
│ Confidence boosting: │
│ Base confidence: 0.75 (max of individual signals) │
│ + 0.10 (2nd concordant signal) │
│ + 0.10 (3rd concordant signal) │
│ = 0.95 final confidence │
│ │
│ Merged signal carries all evidence + boosted confidence │
│ │
└─────────────────────────────────────────────────────────────────────┘LangGraph Multi-Agent Workflow
# src/agents/workflow.py
from typing import TypedDict, List, Annotated, Literal
from langgraph.graph import StateGraph, END
import operator
from ..events.models import PatientEvent
from ..events.window import event_window, SlidingWindowState
from ..detectors.base import DetectionSignal
from ..detectors.lab_anomaly import LabAnomalyDetector
from ..detectors.vital_trend import VitalTrendDetector
from ..detectors.temporal import TemporalCorrelationDetector
from ..detectors.known_pattern import KnownPatternMatcher
from .supervisor import SupervisorAgent, SupervisorDecision
from .correlator import SignalCorrelator
from ..alerts.models import Alert, AlertSeverity
from ..alerts.storage import alert_storage
class SurveillanceState(TypedDict):
"""State for the adverse event surveillance workflow."""
# Input
event: PatientEvent
# Detection phase
signals: Annotated[List[DetectionSignal], operator.add]
# Correlation phase
correlated_signals: List[DetectionSignal]
# Supervisor phase
supervisor_decision: SupervisorDecision | None
# Output
alert: Alert | None
quick_triage_result: str
# Initialize components
lab_detector = LabAnomalyDetector(event_window)
vital_detector = VitalTrendDetector(event_window)
temporal_detector = TemporalCorrelationDetector(event_window)
pattern_matcher = KnownPatternMatcher(event_window)
correlator = SignalCorrelator()
supervisor = SupervisorAgent()
async def add_event_node(state: SurveillanceState) -> SurveillanceState:
"""Add incoming event to the sliding window."""
await event_window.add_event(state["event"])
return state
async def lab_detection_node(state: SurveillanceState) -> SurveillanceState:
"""Run lab anomaly detection."""
signals = await lab_detector.detect(state["event"])
return {"signals": signals}
async def vital_detection_node(state: SurveillanceState) -> SurveillanceState:
"""Run vital trend detection."""
signals = await vital_detector.detect(state["event"])
return {"signals": signals}
async def temporal_detection_node(state: SurveillanceState) -> SurveillanceState:
"""Run temporal correlation detection."""
signals = await temporal_detector.detect(state["event"])
return {"signals": signals}
async def pattern_detection_node(state: SurveillanceState) -> SurveillanceState:
"""Run known pattern matching."""
signals = await pattern_matcher.detect(state["event"])
return {"signals": signals}
async def correlation_node(state: SurveillanceState) -> SurveillanceState:
"""Correlate signals from all detectors."""
correlated = correlator.correlate(state["signals"])
return {"correlated_signals": correlated}
async def quick_triage_node(state: SurveillanceState) -> SurveillanceState:
"""Quick rule-based triage to decide if supervisor review is needed."""
result = supervisor.quick_triage(state["correlated_signals"])
return {"quick_triage_result": result}
def route_after_triage(
state: SurveillanceState
) -> Literal["supervisor", "skip", "alert_critical"]:
"""Route based on quick triage result."""
result = state["quick_triage_result"]
if result == "SKIP":
return "skip"
elif result == "CRITICAL":
# Skip supervisor for critical - alert immediately
return "alert_critical"
else:
# Send to supervisor for review
return "supervisor"
async def supervisor_node(state: SurveillanceState) -> SurveillanceState:
"""Supervisor agent reviews signals."""
recent_meds = await event_window.get_recent_medications(
state["event"].patient_id,
hours_back=72
)
med_names = [m.medication_name for m in recent_meds]
decision = await supervisor.review_signals(
state["event"].patient_id,
state["correlated_signals"],
med_names
)
return {"supervisor_decision": decision}
async def alert_node(state: SurveillanceState) -> SurveillanceState:
"""Generate and store alert based on supervisor decision."""
decision = state["supervisor_decision"]
if not decision or not decision.should_alert:
return {"alert": None}
alert = Alert(
patient_id=state["event"].patient_id,
severity=AlertSeverity(decision.final_severity),
title=f"Potential ADE: {decision.primary_suspicion}",
description=decision.reasoning,
suspected_medication=state["correlated_signals"][0].suspected_medication
if state["correlated_signals"] else None,
suspected_reaction=decision.primary_suspicion,
recommended_actions=decision.recommended_actions,
signals=[s.model_dump() for s in state["correlated_signals"]]
)
await alert_storage.store_alert(alert)
return {"alert": alert}
async def critical_alert_node(state: SurveillanceState) -> SurveillanceState:
"""Generate critical alert without supervisor review (time-sensitive)."""
signals = state["correlated_signals"]
critical_signal = next(
(s for s in signals if s.severity_hint == "CRITICAL"),
signals[0] if signals else None
)
if not critical_signal:
return {"alert": None}
alert = Alert(
patient_id=state["event"].patient_id,
severity=AlertSeverity.CRITICAL,
title=f"CRITICAL: {critical_signal.suspected_reaction or 'Adverse Event'}",
description=critical_signal.description,
suspected_medication=critical_signal.suspected_medication,
suspected_reaction=critical_signal.suspected_reaction,
recommended_actions=[
"Immediate clinical assessment required",
"Consider holding suspected medication",
"Notify attending physician STAT"
],
signals=[s.model_dump() for s in signals]
)
await alert_storage.store_alert(alert)
return {"alert": alert}
async def skip_node(state: SurveillanceState) -> SurveillanceState:
"""No alert needed."""
return {"alert": None, "supervisor_decision": None}
def create_surveillance_workflow() -> StateGraph:
"""Create the adverse event surveillance workflow."""
workflow = StateGraph(SurveillanceState)
# Add nodes
workflow.add_node("add_event", add_event_node)
workflow.add_node("lab_detection", lab_detection_node)
workflow.add_node("vital_detection", vital_detection_node)
workflow.add_node("temporal_detection", temporal_detection_node)
workflow.add_node("pattern_detection", pattern_detection_node)
workflow.add_node("correlation", correlation_node)
workflow.add_node("quick_triage", quick_triage_node)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("alert", alert_node)
workflow.add_node("critical_alert", critical_alert_node)
workflow.add_node("skip", skip_node)
# Set entry point
workflow.set_entry_point("add_event")
# Add edges - detection runs in parallel conceptually
# (In practice, LangGraph executes sequentially, but we aggregate with operator.add)
workflow.add_edge("add_event", "lab_detection")
workflow.add_edge("lab_detection", "vital_detection")
workflow.add_edge("vital_detection", "temporal_detection")
workflow.add_edge("temporal_detection", "pattern_detection")
workflow.add_edge("pattern_detection", "correlation")
workflow.add_edge("correlation", "quick_triage")
# Conditional routing after triage
workflow.add_conditional_edges(
"quick_triage",
route_after_triage,
{
"supervisor": "supervisor",
"skip": "skip",
"alert_critical": "critical_alert"
}
)
workflow.add_edge("supervisor", "alert")
workflow.add_edge("alert", END)
workflow.add_edge("critical_alert", END)
workflow.add_edge("skip", END)
return workflow.compile()
# Create the surveillance agent
surveillance_agent = create_surveillance_workflow()Multi-Agent Workflow Visualization:
┌─────────────────────────────────────────────────────────────────────┐
│ ADVERSE EVENT SURVEILLANCE STATE MACHINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Entry: Patient Event │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ ADD_EVENT │ Store in sliding window │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ PARALLEL DETECTION (signals accumulate via operator.add) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ ┌───────────┐ │ │
│ │ │ Lab Anomaly │→│ Vital Trend │→│ Temporal │→│ Known │ │ │
│ │ │ Detector │ │ Detector │ │ Detector │ │ Pattern │ │ │
│ │ └─────────────┘ └─────────────┘ └──────────┘ └───────────┘ │ │
│ │ │ │
│ └───────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ CORRELATION │ Merge concordant signals, boost confidence │
│ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ QUICK_TRIAGE │ Rule-based severity assessment │
│ └────────┬─────────┘ │
│ │ │
│ ┌─────┼─────────────────┐ │
│ │ │ │ │
│ SKIP MEDIUM/HIGH CRITICAL │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ SKIP │ │ SUPERVISOR │ │ CRITICAL_ALERT │ │
│ │ │ │ (LLM review)│ │ (bypass review) │ │
│ └──┬───┘ └───────┬──────┘ └────────┬────────┘ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ ALERT │ │ │
│ │ │ (if warranted)│ │ │
│ │ └───────┬──────┘ │ │
│ │ │ │ │
│ └──────────────┴────────────────┘ │
│ │ │
│ ▼ │
│ END │
│ │
└─────────────────────────────────────────────────────────────────────┘| Workflow Feature | Purpose |
|---|---|
| Parallel detection | Four specialist detectors run on every event |
| Signal accumulation | Annotated[List, operator.add] aggregates signals |
| Correlation | Identifies concordant signals for confidence boost |
| Quick triage | Rule-based fast path to avoid LLM latency for clear cases |
| Supervisor bypass | CRITICAL signals skip LLM review for immediate alerting |
Alert Models and Storage
# src/alerts/models.py
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enum
class AlertSeverity(str, Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
class AlertStatus(str, Enum):
PENDING = "pending"
ACKNOWLEDGED = "acknowledged"
RESOLVED = "resolved"
DISMISSED = "dismissed"
class Alert(BaseModel):
"""An adverse event alert."""
id: Optional[str] = None
patient_id: str
severity: AlertSeverity
status: AlertStatus = AlertStatus.PENDING
title: str
description: str
suspected_medication: Optional[str] = None
suspected_reaction: Optional[str] = None
recommended_actions: List[str] = Field(default_factory=list)
signals: List[Dict[str, Any]] = Field(default_factory=list)
created_at: datetime = Field(default_factory=datetime.utcnow)
acknowledged_at: Optional[datetime] = None
acknowledged_by: Optional[str] = None
resolution_notes: Optional[str] = None# src/alerts/storage.py
import sqlite3
import json
from typing import List, Optional
from datetime import datetime
import uuid
from .models import Alert, AlertSeverity, AlertStatus
from ..config import settings
class AlertStorage:
"""SQLite-based alert storage."""
def __init__(self, db_path: str = None):
self.db_path = db_path or settings.alert_db_path
self._init_db()
def _init_db(self):
"""Initialize database schema."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id TEXT PRIMARY KEY,
patient_id TEXT NOT NULL,
severity TEXT NOT NULL,
status TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
suspected_medication TEXT,
suspected_reaction TEXT,
recommended_actions TEXT,
signals TEXT,
created_at TEXT NOT NULL,
acknowledged_at TEXT,
acknowledged_by TEXT,
resolution_notes TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_patient_id ON alerts(patient_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_severity ON alerts(severity)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_status ON alerts(status)
""")
conn.commit()
conn.close()
async def store_alert(self, alert: Alert) -> Alert:
"""Store a new alert."""
alert.id = str(uuid.uuid4())
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO alerts (
id, patient_id, severity, status, title, description,
suspected_medication, suspected_reaction, recommended_actions,
signals, created_at, acknowledged_at, acknowledged_by,
resolution_notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
alert.id,
alert.patient_id,
alert.severity.value,
alert.status.value,
alert.title,
alert.description,
alert.suspected_medication,
alert.suspected_reaction,
json.dumps(alert.recommended_actions),
json.dumps(alert.signals),
alert.created_at.isoformat(),
alert.acknowledged_at.isoformat() if alert.acknowledged_at else None,
alert.acknowledged_by,
alert.resolution_notes
))
conn.commit()
conn.close()
return alert
async def get_alerts(
self,
patient_id: Optional[str] = None,
severity: Optional[AlertSeverity] = None,
status: Optional[AlertStatus] = None,
limit: int = 100
) -> List[Alert]:
"""Query alerts with optional filters."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = "SELECT * FROM alerts WHERE 1=1"
params = []
if patient_id:
query += " AND patient_id = ?"
params.append(patient_id)
if severity:
query += " AND severity = ?"
params.append(severity.value)
if status:
query += " AND status = ?"
params.append(status.value)
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
alerts = []
for row in rows:
alerts.append(Alert(
id=row["id"],
patient_id=row["patient_id"],
severity=AlertSeverity(row["severity"]),
status=AlertStatus(row["status"]),
title=row["title"],
description=row["description"],
suspected_medication=row["suspected_medication"],
suspected_reaction=row["suspected_reaction"],
recommended_actions=json.loads(row["recommended_actions"]),
signals=json.loads(row["signals"]),
created_at=datetime.fromisoformat(row["created_at"]),
acknowledged_at=datetime.fromisoformat(row["acknowledged_at"])
if row["acknowledged_at"] else None,
acknowledged_by=row["acknowledged_by"],
resolution_notes=row["resolution_notes"]
))
return alerts
async def acknowledge_alert(
self,
alert_id: str,
acknowledged_by: str
) -> Optional[Alert]:
"""Mark an alert as acknowledged."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE alerts
SET status = ?, acknowledged_at = ?, acknowledged_by = ?
WHERE id = ?
""", (
AlertStatus.ACKNOWLEDGED.value,
datetime.utcnow().isoformat(),
acknowledged_by,
alert_id
))
conn.commit()
conn.close()
alerts = await self.get_alerts()
return next((a for a in alerts if a.id == alert_id), None)
# Global storage instance
alert_storage = AlertStorage()FastAPI Application
# src/api/main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
from ..agents.workflow import surveillance_agent, SurveillanceState
from ..events.models import (
PatientEvent, LabResult, VitalSign,
MedicationOrder, MedicationAdmin
)
from ..events.window import event_window
from ..alerts.models import Alert, AlertSeverity, AlertStatus
from ..alerts.storage import alert_storage
app = FastAPI(
title="Adverse Event Surveillance API",
description="Real-time monitoring for potential adverse drug events",
version="1.0.0"
)
class LabResultRequest(BaseModel):
patient_id: str
test_code: str
test_name: str
value: float
unit: str
reference_low: Optional[float] = None
reference_high: Optional[float] = None
is_critical: bool = False
class VitalSignRequest(BaseModel):
patient_id: str
vital_type: str
value: str
unit: str
class MedicationAdminRequest(BaseModel):
patient_id: str
medication_name: str
rxnorm_code: Optional[str] = None
dose_given: str
route: str
administered_by: str
class ProcessEventResponse(BaseModel):
event_processed: bool
alert_generated: bool
alert: Optional[Alert] = None
@app.post("/events/lab", response_model=ProcessEventResponse)
async def process_lab_result(request: LabResultRequest):
"""Process a new lab result for adverse event detection."""
lab = LabResult(
patient_id=request.patient_id,
timestamp=datetime.utcnow(),
test_code=request.test_code,
test_name=request.test_name,
value=request.value,
unit=request.unit,
reference_low=request.reference_low,
reference_high=request.reference_high,
is_critical=request.is_critical
)
event = PatientEvent(event=lab)
return await _process_event(event)
@app.post("/events/vital", response_model=ProcessEventResponse)
async def process_vital_sign(request: VitalSignRequest):
"""Process a new vital sign for adverse event detection."""
vital = VitalSign(
patient_id=request.patient_id,
timestamp=datetime.utcnow(),
vital_type=request.vital_type,
value=request.value,
unit=request.unit
)
event = PatientEvent(event=vital)
return await _process_event(event)
@app.post("/events/medication", response_model=ProcessEventResponse)
async def process_medication_admin(request: MedicationAdminRequest):
"""Process a medication administration event."""
med = MedicationAdmin(
patient_id=request.patient_id,
timestamp=datetime.utcnow(),
medication_name=request.medication_name,
rxnorm_code=request.rxnorm_code,
dose_given=request.dose_given,
route=request.route,
administered_by=request.administered_by
)
event = PatientEvent(event=med)
return await _process_event(event)
async def _process_event(event: PatientEvent) -> ProcessEventResponse:
"""Process an event through the surveillance workflow."""
initial_state: SurveillanceState = {
"event": event,
"signals": [],
"correlated_signals": [],
"supervisor_decision": None,
"alert": None,
"quick_triage_result": ""
}
try:
result = await surveillance_agent.ainvoke(initial_state)
alert = result.get("alert")
return ProcessEventResponse(
event_processed=True,
alert_generated=alert is not None,
alert=alert
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Event processing failed: {str(e)}"
)
@app.get("/alerts", response_model=List[Alert])
async def get_alerts(
patient_id: Optional[str] = None,
severity: Optional[AlertSeverity] = None,
status: Optional[AlertStatus] = None,
limit: int = 100
):
"""Get alerts with optional filters."""
return await alert_storage.get_alerts(
patient_id=patient_id,
severity=severity,
status=status,
limit=limit
)
@app.post("/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str, acknowledged_by: str):
"""Acknowledge an alert."""
alert = await alert_storage.acknowledge_alert(alert_id, acknowledged_by)
if not alert:
raise HTTPException(status_code=404, detail="Alert not found")
return alert
@app.get("/window/stats")
async def get_window_stats():
"""Get sliding window statistics."""
return await event_window.get_stats()
@app.post("/window/cleanup")
async def cleanup_window():
"""Manually trigger window cleanup."""
removed = await event_window.cleanup_old_events()
return {"removed_events": removed}
@app.get("/health")
async def health():
return {"status": "healthy", "service": "adverse-event-surveillance"}Deployment
Docker Configuration
# docker-compose.yml
version: '3.8'
services:
surveillance-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENFDA_API_KEY=${OPENFDA_API_KEY:-}
- REDIS_URL=redis://redis:6379
volumes:
- ./data:/app/data
depends_on:
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
redis_data:Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY data/ ./data/
RUN mkdir -p /app/data
EXPOSE 8000
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]Requirements
# requirements.txt
langgraph>=0.2.0
langchain>=0.3.0
langchain-openai>=0.2.0
openai>=1.40.0
httpx>=0.27.0
fastapi>=0.115.0
uvicorn>=0.30.0
pydantic>=2.9.0
pydantic-settings>=2.5.0
redis>=5.0.0Business Impact
| Metric | Before | After | Improvement |
|---|---|---|---|
| ADE detection time | 24-48 hours | Minutes | 95% faster |
| Missed ADEs | ~10% | <2% | 80% reduction |
| Manual chart review time | 30 min/patient | 5 min for flagged | 83% reduction |
| Alert fatigue (false positives) | 60% | 20% | 67% reduction |
| FAERS report submission | Inconsistent | Automated data capture | Standardized |
Key Learnings
-
Multi-agent architecture enables specialization - Each detector focuses on a specific signal type (labs, vitals, timing, known patterns). The supervisor synthesizes without needing expertise in each area.
-
Concordance boosts confidence - When multiple independent detectors identify the same drug-reaction pair, the signal is much more reliable than any single finding.
-
Quick triage reduces latency - Rule-based fast paths for obvious cases (CRITICAL, SKIP) avoid LLM latency. Only ambiguous cases go to the supervisor.
-
Temporal windows matter - Different adverse events have characteristic onset windows. Nephrotoxicity takes days, anaphylaxis takes minutes. Matching observed timing to expected windows increases signal quality.
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Supervisor-Worker Pattern | Coordinator agent orchestrates specialist workers | Specialization + synthesis without overloading single agent |
| Signal Accumulation | Annotated[List, operator.add] in LangGraph | Multiple detectors can add signals to shared state |
| Sliding Window | Time-bounded event store (72 hours) | Enables temporal analysis without unbounded memory |
| Detection Signal | Structured finding from a detector | Consistent format for aggregation and correlation |
| Concordance | Multiple detectors → same finding | Dramatically increases confidence in true positive |
| Quick Triage | Rule-based severity assessment | Bypasses LLM for clear cases, reduces latency |
| OpenFDA FAERS | FDA adverse event database | Known drug-reaction patterns from 20M+ reports |
| Temporal Correlation | Medication timing vs event timing | Distinguishes causation vs coincidence |
| Escalation Tiers | LOW → MEDIUM → HIGH → CRITICAL | Right response for right severity |
| Alert Storage | SQLite with status tracking | Audit trail, acknowledgment workflow |
Next Steps
- Add streaming ingestion from hospital HL7/FHIR feeds
- Implement feedback loop for pharmacist corrections to improve detection
- Add patient history weighting (prior ADEs increase suspicion)
- Build Grafana dashboard for alert metrics and trends
- Integrate FDA MedWatch for automated adverse event reporting