MLOpsIntermediate
Monitoring Dashboard
Build a comprehensive monitoring dashboard for ML model performance
Monitoring Dashboard
Create a real-time monitoring system to track model performance, latency, and data drift.
TL;DR
Expose Prometheus metrics from FastAPI (/metrics endpoint), track latency histograms, prediction counts, confidence scores, and drift gauges. Prometheus scrapes every 15s, Grafana visualizes, Alertmanager fires on thresholds (latency > 500ms, error rate > 5%, drift > 0.15).
What You'll Learn
- Prometheus metrics collection
- Grafana dashboard creation
- Model performance monitoring
- Data drift detection
- Alerting configuration
Tech Stack
| Component | Technology |
|---|---|
| Metrics | Prometheus |
| Visualization | Grafana |
| Alerting | Alertmanager |
| Backend | FastAPI |
Architecture
┌──────────────────────────────────────────────────────────────────────────────┐
│ MONITORING ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ ML SERVICE │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌───────────────────────────────────┐ │ │
│ │ │ FastAPI │─────▶│ Metrics Collector │ │ │
│ │ │ /predict │ │ • latency histograms │ │ │
│ │ │ /metrics │ │ • prediction counts │ │ │
│ │ └────────┬────────┘ │ • confidence scores │ │ │
│ │ │ │ • drift gauges │ │ │
│ │ ▼ └───────────────┬───────────────────┘ │ │
│ │ ┌─────────────────┐ │ │ │
│ │ │ Inference Engine│ │ │ │
│ │ └─────────────────┘ │ │ │
│ └────────────────────────────────────────────┼───────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ Prometheus │◀─────│ Scrape /metrics every 15s │ │
│ │ (TSDB) │ └─────────────────────────────────────────────┘ │
│ └────────┬────────┘ │
│ │ │
│ ┌─────┴─────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌────────────────────────────────┐ │
│ │ Grafana │ │ Alertmanager │─────▶│ Notifications (Slack, Email) │ │
│ └────┬─────┘ └──────────────┘ └────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DASHBOARDS │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │Performance │ │ Latency │ │ Drift │ │ │
│ │ │ (accuracy) │ │ (p50/p95) │ │ (KS stat) │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘Project Structure
monitoring-dashboard/
├── src/
│ ├── __init__.py
│ ├── metrics.py # Custom metrics
│ ├── drift.py # Drift detection
│ ├── collectors.py # Metric collectors
│ └── api.py # FastAPI with metrics
├── monitoring/
│ ├── prometheus/
│ │ ├── prometheus.yml
│ │ └── alerts.yml
│ └── grafana/
│ ├── provisioning/
│ └── dashboards/
├── docker-compose.yml
└── requirements.txtImplementation
Step 1: Dependencies
fastapi>=0.100.0
uvicorn>=0.23.0
prometheus-client>=0.17.0
numpy>=1.24.0
scipy>=1.11.0
scikit-learn>=1.3.0Step 2: Custom Metrics
"""ML-specific Prometheus metrics."""
from prometheus_client import (
Counter,
Histogram,
Gauge,
Summary,
Info,
REGISTRY
)
# Request metrics
PREDICTION_REQUESTS = Counter(
"ml_prediction_requests_total",
"Total prediction requests",
["model_name", "model_version", "status"]
)
PREDICTION_LATENCY = Histogram(
"ml_prediction_latency_seconds",
"Prediction latency in seconds",
["model_name"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
BATCH_SIZE = Histogram(
"ml_batch_size",
"Batch sizes for predictions",
["model_name"],
buckets=[1, 5, 10, 25, 50, 100, 250, 500]
)
# Model metrics
MODEL_CONFIDENCE = Histogram(
"ml_model_confidence",
"Model prediction confidence scores",
["model_name", "predicted_class"],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]
)
PREDICTION_DISTRIBUTION = Counter(
"ml_prediction_distribution_total",
"Distribution of predictions by class",
["model_name", "predicted_class"]
)
# Performance metrics
MODEL_ACCURACY = Gauge(
"ml_model_accuracy",
"Current model accuracy (from evaluation)",
["model_name", "model_version"]
)
MODEL_F1_SCORE = Gauge(
"ml_model_f1_score",
"Current model F1 score",
["model_name", "model_version"]
)
# Resource metrics
MODEL_MEMORY_BYTES = Gauge(
"ml_model_memory_bytes",
"Memory used by model",
["model_name"]
)
GPU_UTILIZATION = Gauge(
"ml_gpu_utilization_percent",
"GPU utilization percentage",
["gpu_id"]
)
GPU_MEMORY_USED = Gauge(
"ml_gpu_memory_used_bytes",
"GPU memory used",
["gpu_id"]
)
# Data drift metrics
FEATURE_DRIFT_SCORE = Gauge(
"ml_feature_drift_score",
"Feature drift score (KS statistic)",
["model_name", "feature_name"]
)
PREDICTION_DRIFT_SCORE = Gauge(
"ml_prediction_drift_score",
"Prediction distribution drift",
["model_name"]
)
# Model info
MODEL_INFO = Info(
"ml_model",
"Model metadata"
)
class MetricsRecorder:
"""Helper class to record ML metrics."""
def __init__(self, model_name: str, model_version: str = "1.0.0"):
self.model_name = model_name
self.model_version = model_version
# Set model info
MODEL_INFO.info({
"model_name": model_name,
"model_version": model_version,
"framework": "pytorch"
})
def record_prediction(
self,
latency: float,
predicted_class: int,
confidence: float,
batch_size: int = 1,
success: bool = True
) -> None:
"""Record a prediction."""
status = "success" if success else "error"
PREDICTION_REQUESTS.labels(
model_name=self.model_name,
model_version=self.model_version,
status=status
).inc()
PREDICTION_LATENCY.labels(
model_name=self.model_name
).observe(latency)
BATCH_SIZE.labels(
model_name=self.model_name
).observe(batch_size)
MODEL_CONFIDENCE.labels(
model_name=self.model_name,
predicted_class=str(predicted_class)
).observe(confidence)
PREDICTION_DISTRIBUTION.labels(
model_name=self.model_name,
predicted_class=str(predicted_class)
).inc()
def record_accuracy(self, accuracy: float, f1: float) -> None:
"""Record model performance metrics."""
MODEL_ACCURACY.labels(
model_name=self.model_name,
model_version=self.model_version
).set(accuracy)
MODEL_F1_SCORE.labels(
model_name=self.model_name,
model_version=self.model_version
).set(f1)
def record_drift(
self,
feature_drifts: dict[str, float],
prediction_drift: float
) -> None:
"""Record drift metrics."""
for feature, score in feature_drifts.items():
FEATURE_DRIFT_SCORE.labels(
model_name=self.model_name,
feature_name=feature
).set(score)
PREDICTION_DRIFT_SCORE.labels(
model_name=self.model_name
).set(prediction_drift)Step 3: Drift Detection
"""Data drift detection utilities."""
from dataclasses import dataclass
from typing import Optional
import numpy as np
from scipy import stats
from collections import deque
@dataclass
class DriftResult:
"""Drift detection result."""
feature_name: str
drift_score: float
is_drifted: bool
p_value: float
reference_mean: float
current_mean: float
class DriftDetector:
"""
Detect data drift using statistical tests.
Uses Kolmogorov-Smirnov test for continuous features
and Chi-squared test for categorical features.
"""
def __init__(
self,
window_size: int = 1000,
drift_threshold: float = 0.1
):
self.window_size = window_size
self.threshold = drift_threshold
# Reference distributions
self.reference_data: dict[str, np.ndarray] = {}
self.current_windows: dict[str, deque] = {}
def set_reference(self, feature_name: str, data: np.ndarray) -> None:
"""Set reference distribution for a feature."""
self.reference_data[feature_name] = data
self.current_windows[feature_name] = deque(maxlen=self.window_size)
def add_observation(self, feature_name: str, value: float) -> None:
"""Add an observation to the current window."""
if feature_name not in self.current_windows:
self.current_windows[feature_name] = deque(maxlen=self.window_size)
self.current_windows[feature_name].append(value)
def detect_drift(self, feature_name: str) -> Optional[DriftResult]:
"""
Detect drift for a feature.
Uses KS test to compare current window with reference.
"""
if feature_name not in self.reference_data:
return None
if len(self.current_windows.get(feature_name, [])) < 100:
return None
reference = self.reference_data[feature_name]
current = np.array(self.current_windows[feature_name])
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(reference, current)
return DriftResult(
feature_name=feature_name,
drift_score=ks_stat,
is_drifted=ks_stat > self.threshold,
p_value=p_value,
reference_mean=float(np.mean(reference)),
current_mean=float(np.mean(current))
)
def detect_all_drift(self) -> dict[str, DriftResult]:
"""Detect drift for all features."""
results = {}
for feature_name in self.reference_data:
result = self.detect_drift(feature_name)
if result:
results[feature_name] = result
return results
class PredictionDriftDetector:
"""Detect drift in prediction distributions."""
def __init__(self, num_classes: int, window_size: int = 1000):
self.num_classes = num_classes
self.window_size = window_size
self.reference_dist: Optional[np.ndarray] = None
self.current_predictions: deque = deque(maxlen=window_size)
def set_reference_distribution(self, distribution: np.ndarray) -> None:
"""Set expected prediction distribution."""
self.reference_dist = distribution / distribution.sum()
def add_prediction(self, predicted_class: int) -> None:
"""Add a prediction to the current window."""
self.current_predictions.append(predicted_class)
def detect_drift(self) -> Optional[dict]:
"""
Detect prediction drift using Chi-squared test.
"""
if self.reference_dist is None:
return None
if len(self.current_predictions) < 100:
return None
# Calculate current distribution
predictions = np.array(self.current_predictions)
current_counts = np.bincount(predictions, minlength=self.num_classes)
current_dist = current_counts / current_counts.sum()
# Chi-squared test
expected = self.reference_dist * len(predictions)
chi2, p_value = stats.chisquare(current_counts, expected)
# Jensen-Shannon divergence
js_div = self._js_divergence(self.reference_dist, current_dist)
return {
"chi2_statistic": chi2,
"p_value": p_value,
"js_divergence": js_div,
"is_drifted": p_value < 0.05,
"reference_dist": self.reference_dist.tolist(),
"current_dist": current_dist.tolist()
}
def _js_divergence(self, p: np.ndarray, q: np.ndarray) -> float:
"""Calculate Jensen-Shannon divergence."""
m = 0.5 * (p + q)
return float(0.5 * (stats.entropy(p, m) + stats.entropy(q, m)))Step 4: FastAPI with Metrics
"""FastAPI application with Prometheus metrics."""
import time
from contextlib import asynccontextmanager
from typing import Annotated
import numpy as np
from fastapi import FastAPI, Depends, Request
from fastapi.responses import Response
from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from .metrics import MetricsRecorder, PREDICTION_REQUESTS
from .drift import DriftDetector, PredictionDriftDetector
# Global instances
metrics_recorder: MetricsRecorder = None
drift_detector: DriftDetector = None
pred_drift_detector: PredictionDriftDetector = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize monitoring components."""
global metrics_recorder, drift_detector, pred_drift_detector
metrics_recorder = MetricsRecorder(
model_name="classifier",
model_version="1.0.0"
)
drift_detector = DriftDetector(window_size=1000)
pred_drift_detector = PredictionDriftDetector(num_classes=3)
# Set reference distributions (normally from training data)
for i in range(4):
drift_detector.set_reference(
f"feature_{i}",
np.random.randn(1000)
)
pred_drift_detector.set_reference_distribution(
np.array([0.33, 0.33, 0.34])
)
yield
app = FastAPI(
title="ML Monitoring API",
description="Model serving with monitoring",
lifespan=lifespan
)
class PredictionRequest(BaseModel):
"""Prediction request."""
features: list[float]
class PredictionResponse(BaseModel):
"""Prediction response."""
prediction: int
confidence: float
latency_ms: float
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Prediction endpoint with monitoring."""
start_time = time.time()
# Simulate prediction
features = np.array(request.features)
prediction = int(np.argmax(features[:3])) if len(features) >= 3 else 0
confidence = float(np.random.uniform(0.7, 0.99))
latency = time.time() - start_time
# Record metrics
metrics_recorder.record_prediction(
latency=latency,
predicted_class=prediction,
confidence=confidence
)
# Track drift
for i, value in enumerate(features[:4]):
drift_detector.add_observation(f"feature_{i}", value)
pred_drift_detector.add_prediction(prediction)
return PredictionResponse(
prediction=prediction,
confidence=confidence,
latency_ms=latency * 1000
)
@app.get("/drift")
async def get_drift():
"""Get current drift metrics."""
feature_drift = drift_detector.detect_all_drift()
pred_drift = pred_drift_detector.detect_drift()
# Update Prometheus metrics
feature_scores = {
name: result.drift_score
for name, result in feature_drift.items()
}
pred_score = pred_drift["js_divergence"] if pred_drift else 0
metrics_recorder.record_drift(feature_scores, pred_score)
return {
"feature_drift": {
name: {
"score": r.drift_score,
"is_drifted": r.is_drifted,
"reference_mean": r.reference_mean,
"current_mean": r.current_mean
}
for name, r in feature_drift.items()
},
"prediction_drift": pred_drift
}
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""Add processing time to response headers."""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return responseStep 5: Prometheus Configuration
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
rule_files:
- "alerts.yml"
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'ml-service'
static_configs:
- targets: ['api:8000']
metrics_path: '/metrics'
scrape_interval: 10sStep 6: Alert Rules
groups:
- name: ml_alerts
rules:
# High latency alert
- alert: HighPredictionLatency
expr: histogram_quantile(0.95, rate(ml_prediction_latency_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "High prediction latency detected"
description: "95th percentile latency is above 500ms"
# Error rate alert
- alert: HighErrorRate
expr: |
sum(rate(ml_prediction_requests_total{status="error"}[5m]))
/
sum(rate(ml_prediction_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate in predictions"
description: "Error rate is above 5%"
# Data drift alert
- alert: DataDriftDetected
expr: ml_feature_drift_score > 0.15
for: 10m
labels:
severity: warning
annotations:
summary: "Data drift detected"
description: "Feature drift score is elevated"
# Prediction drift alert
- alert: PredictionDriftDetected
expr: ml_prediction_drift_score > 0.1
for: 10m
labels:
severity: warning
annotations:
summary: "Prediction distribution drift detected"
description: "Prediction drift score is elevated"
# Low confidence predictions
- alert: LowConfidencePredictions
expr: |
histogram_quantile(0.5, rate(ml_model_confidence_bucket[5m])) < 0.6
for: 15m
labels:
severity: warning
annotations:
summary: "Low confidence predictions"
description: "Median confidence is below 60%"Step 7: Docker Compose
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
networks:
- monitoring
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus:/etc/prometheus
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
networks:
- monitoring
alertmanager:
image: prom/alertmanager:latest
ports:
- "9093:9093"
volumes:
- ./monitoring/alertmanager:/etc/alertmanager
networks:
- monitoring
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning
- ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards
- grafana-data:/var/lib/grafana
networks:
- monitoring
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
networks:
monitoring:
driver: bridgeKey Metrics to Monitor
| Metric | Type | Purpose |
|---|---|---|
| Request rate | Counter | Traffic volume |
| Latency | Histogram | Performance |
| Error rate | Counter | Reliability |
| Confidence | Histogram | Model certainty |
| Drift score | Gauge | Data quality |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Counter | Monotonically increasing value (requests, errors) | Track totals, calculate rates with rate() |
| Histogram | Bucketed observations (latency, confidence) | Get percentiles (p50, p95, p99) |
| Gauge | Point-in-time value (drift score, memory) | Track current state, alerting |
| Scrape Interval | How often Prometheus pulls metrics | Balance granularity vs storage |
| KS Test | Compare two distributions statistically | Detect feature drift objectively |
| Chi-Squared Test | Compare categorical distributions | Detect prediction drift |
| Alert Rules | Prometheus expressions that fire alerts | Proactive issue detection |
| Sliding Window | Recent N observations for drift detection | Compare current vs reference |
Usage
# Start monitoring stack
docker-compose up -d
# Access dashboards
# Grafana: http://localhost:3000 (admin/admin)
# Prometheus: http://localhost:9090
# Generate test traffic
for i in {1..100}; do
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{"features": [0.1, 0.5, 0.3, 0.8]}'
done
# Check drift
curl http://localhost:8000/driftNext Steps
- A/B Testing - Compare model versions
- Complete Pipeline - End-to-end MLOps