MLOpsAdvanced
Auto-Scaling Model Serving
Build an auto-scaling ML service that handles traffic spikes efficiently
Auto-Scaling Model Serving
Create a Kubernetes-based auto-scaling system that dynamically adjusts capacity based on traffic and latency.
TL;DR
Batch incoming requests (wait up to 50ms for 32 requests) for GPU efficiency. Expose queue size and latency as Prometheus metrics. Configure HPA or KEDA to scale pods when CPU > 70%, queue > 20, or p95 latency > 500ms. Scale up fast (100%/15s), scale down slowly (10%/60s).
What You'll Learn
- Horizontal Pod Autoscaler (HPA)
- Custom metrics scaling
- Request queuing and batching
- Load testing and capacity planning
- Cost optimization strategies
Tech Stack
| Component | Technology |
|---|---|
| Orchestration | Kubernetes |
| Metrics | Prometheus |
| Load Balancing | NGINX / Envoy |
| Autoscaling | KEDA / HPA |
| Load Testing | Locust |
Architecture
┌──────────────────────────────────────────────────────────────────────────────┐
│ AUTO-SCALING ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Load Balancer │─────▶│ Ingress │─────▶│ Service │ │
│ └───────────────┘ └───────────────┘ └───────┬───────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────┼────────────────┐ │
│ │ KUBERNETES CLUSTER │ │ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────────────────────┐ │ │
│ │ │ PODS │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │ Pod N │ │ │ │
│ │ │ │ /metrics│ │ /metrics│ │ /metrics│ │ ... │ │ │ │
│ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └─────────┘ │ │ │
│ │ └────────┼──────────────┼──────────────┼────────────────────────┘ │ │
│ │ │ │ │ │ │
│ │ └──────────────┴──────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Prometheus (scrapes /metrics from all pods) │ │ │
│ │ └────────────────────────────┬────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ HPA / KEDA Controller │ │ │
│ │ │ Scales pods based on metrics │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ ▲ │ │
│ └────────────────────────────────┼─────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────────┴─────────────────────────────────────┐ │
│ │ SCALING TRIGGERS │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ CPU > 70% │ │ Memory > │ │ RPS > 100 │ │ p95 lat > │ │ │
│ │ │ │ │ 80% │ │ per pod │ │ 500ms │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘Project Structure
auto-scaling/
├── src/
│ ├── __init__.py
│ ├── server.py # FastAPI with batching
│ ├── batcher.py # Request batching
│ ├── metrics.py # Custom metrics
│ └── model.py # Model inference
├── kubernetes/
│ ├── deployment.yaml
│ ├── hpa.yaml
│ ├── service.yaml
│ └── keda-scaled-object.yaml
├── load-tests/
│ └── locustfile.py
├── monitoring/
│ └── prometheus-rules.yaml
├── Dockerfile
└── requirements.txtImplementation
Step 1: FastAPI with Batching
"""FastAPI server with request batching for efficient scaling."""
import asyncio
import time
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from .batcher import RequestBatcher, BatchConfig
from .model import ModelWrapper
# Metrics
REQUEST_COUNT = Counter(
"ml_requests_total",
"Total requests",
["status"]
)
REQUEST_LATENCY = Histogram(
"ml_request_latency_seconds",
"Request latency",
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
BATCH_SIZE = Histogram(
"ml_batch_size",
"Batch sizes processed",
buckets=[1, 2, 4, 8, 16, 32, 64]
)
QUEUE_SIZE = Gauge(
"ml_queue_size",
"Current queue size"
)
ACTIVE_REQUESTS = Gauge(
"ml_active_requests",
"Currently processing requests"
)
# Global instances
model: ModelWrapper = None
batcher: RequestBatcher = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan."""
global model, batcher
# Initialize model
model = ModelWrapper()
model.load()
# Initialize batcher
batch_config = BatchConfig(
max_batch_size=32,
max_wait_ms=50,
timeout_ms=5000
)
batcher = RequestBatcher(model, batch_config)
# Start batcher background task
batcher_task = asyncio.create_task(batcher.run())
yield
# Cleanup
batcher.stop()
await batcher_task
app = FastAPI(
title="Auto-Scaling ML Service",
lifespan=lifespan
)
class PredictRequest(BaseModel):
"""Prediction request."""
features: list[float]
request_id: str = ""
class PredictResponse(BaseModel):
"""Prediction response."""
prediction: int
confidence: float
latency_ms: float
batch_size: int
@app.get("/health")
async def health():
"""Health check for Kubernetes probes."""
if model and model.is_loaded:
return {"status": "healthy"}
raise HTTPException(status_code=503, detail="Model not loaded")
@app.get("/ready")
async def ready():
"""Readiness check."""
if model and model.is_loaded and batcher:
queue_size = batcher.queue_size
QUEUE_SIZE.set(queue_size)
# Not ready if queue is too large
if queue_size > 100:
raise HTTPException(
status_code=503,
detail=f"Queue size too large: {queue_size}"
)
return {"ready": True, "queue_size": queue_size}
raise HTTPException(status_code=503, detail="Not ready")
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return Response(
content=generate_latest(),
media_type="text/plain"
)
@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
"""
Prediction endpoint with automatic batching.
Requests are queued and processed in batches for efficiency.
"""
start_time = time.time()
ACTIVE_REQUESTS.inc()
try:
# Submit to batcher and wait for result
result = await batcher.submit(request.features, request.request_id)
latency = (time.time() - start_time) * 1000
REQUEST_LATENCY.observe(latency / 1000)
REQUEST_COUNT.labels(status="success").inc()
return PredictResponse(
prediction=result["prediction"],
confidence=result["confidence"],
latency_ms=latency,
batch_size=result["batch_size"]
)
except asyncio.TimeoutError:
REQUEST_COUNT.labels(status="timeout").inc()
raise HTTPException(status_code=504, detail="Request timeout")
except Exception as e:
REQUEST_COUNT.labels(status="error").inc()
raise HTTPException(status_code=500, detail=str(e))
finally:
ACTIVE_REQUESTS.dec()Step 2: Request Batcher
"""Request batching for efficient GPU/CPU utilization."""
import asyncio
from dataclasses import dataclass
from typing import Optional
import numpy as np
import logging
logger = logging.getLogger(__name__)
@dataclass
class BatchConfig:
"""Batching configuration."""
max_batch_size: int = 32
max_wait_ms: int = 50
timeout_ms: int = 5000
@dataclass
class PendingRequest:
"""A pending request in the queue."""
features: list[float]
request_id: str
future: asyncio.Future
created_at: float
class RequestBatcher:
"""
Batches incoming requests for efficient processing.
Collects requests until either:
- max_batch_size is reached
- max_wait_ms has elapsed since first request
"""
def __init__(self, model, config: BatchConfig):
self.model = model
self.config = config
self.queue: asyncio.Queue = asyncio.Queue()
self.running = False
@property
def queue_size(self) -> int:
"""Current queue size."""
return self.queue.qsize()
async def submit(
self,
features: list[float],
request_id: str = ""
) -> dict:
"""
Submit a request for batched processing.
Args:
features: Input features
request_id: Optional request ID
Returns:
Prediction result
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
request = PendingRequest(
features=features,
request_id=request_id,
future=future,
created_at=loop.time()
)
await self.queue.put(request)
# Wait for result with timeout
try:
return await asyncio.wait_for(
future,
timeout=self.config.timeout_ms / 1000
)
except asyncio.TimeoutError:
# Cancel the request
future.cancel()
raise
async def run(self):
"""Main batching loop."""
self.running = True
logger.info("Batcher started")
while self.running:
try:
batch = await self._collect_batch()
if batch:
await self._process_batch(batch)
except Exception as e:
logger.error(f"Batch processing error: {e}")
def stop(self):
"""Stop the batcher."""
self.running = False
async def _collect_batch(self) -> list[PendingRequest]:
"""Collect requests into a batch."""
batch = []
deadline = None
while len(batch) < self.config.max_batch_size:
# Calculate wait time
if deadline is None:
timeout = None # Wait indefinitely for first request
else:
loop = asyncio.get_event_loop()
remaining = deadline - loop.time()
if remaining <= 0:
break
timeout = remaining
try:
request = await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
batch.append(request)
# Set deadline after first request
if deadline is None:
loop = asyncio.get_event_loop()
deadline = loop.time() + (self.config.max_wait_ms / 1000)
except asyncio.TimeoutError:
# Timeout reached, process current batch
break
return batch
async def _process_batch(self, batch: list[PendingRequest]):
"""Process a batch of requests."""
if not batch:
return
batch_size = len(batch)
logger.debug(f"Processing batch of {batch_size} requests")
# Record batch size metric
from .server import BATCH_SIZE
BATCH_SIZE.observe(batch_size)
try:
# Prepare batch input
features = np.array([req.features for req in batch])
# Run inference
results = self.model.predict_batch(features)
# Distribute results
for i, request in enumerate(batch):
if not request.future.cancelled():
request.future.set_result({
"prediction": results["predictions"][i],
"confidence": results["confidences"][i],
"batch_size": batch_size
})
except Exception as e:
# Set exception for all requests
for request in batch:
if not request.future.cancelled():
request.future.set_exception(e)Step 3: Model Wrapper
"""Model wrapper with batched inference."""
import numpy as np
import torch
import torch.nn as nn
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class SimpleModel(nn.Module):
"""Example model."""
def __init__(self, input_size: int = 4, num_classes: int = 3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, num_classes)
)
def forward(self, x):
return self.net(x)
class ModelWrapper:
"""Wrapper for model inference."""
def __init__(self, model_path: Optional[str] = None):
self.model_path = model_path
self.model: Optional[nn.Module] = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self._is_loaded = False
@property
def is_loaded(self) -> bool:
return self._is_loaded
def load(self) -> bool:
"""Load the model."""
try:
if self.model_path:
self.model = torch.load(self.model_path, map_location=self.device)
else:
self.model = SimpleModel()
self.model.to(self.device)
self.model.eval()
self._is_loaded = True
# Warmup
self._warmup()
logger.info(f"Model loaded on {self.device}")
return True
except Exception as e:
logger.error(f"Failed to load model: {e}")
return False
def _warmup(self):
"""Warm up the model."""
dummy = torch.randn(1, 4).to(self.device)
with torch.no_grad():
self.model(dummy)
def predict_batch(self, features: np.ndarray) -> dict:
"""
Run batched inference.
Args:
features: Array of shape (batch_size, num_features)
Returns:
Dict with predictions and confidences
"""
with torch.no_grad():
inputs = torch.tensor(features, dtype=torch.float32).to(self.device)
logits = self.model(inputs)
probs = torch.softmax(logits, dim=-1)
confidences, predictions = torch.max(probs, dim=-1)
return {
"predictions": predictions.cpu().numpy().tolist(),
"confidences": confidences.cpu().numpy().tolist()
}Step 4: Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-service
labels:
app: ml-service
spec:
replicas: 2
selector:
matchLabels:
app: ml-service
template:
metadata:
labels:
app: ml-service
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
containers:
- name: ml-service
image: ml-service:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
env:
- name: MAX_BATCH_SIZE
value: "32"
- name: MAX_WAIT_MS
value: "50"
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
failureThreshold: 3
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10Step 5: Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-service
minReplicas: 2
maxReplicas: 20
metrics:
# CPU-based scaling
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# Memory-based scaling
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# Custom metric: requests per second
- type: Pods
pods:
metric:
name: ml_requests_per_second
target:
type: AverageValue
averageValue: "100"
# Custom metric: queue size
- type: Pods
pods:
metric:
name: ml_queue_size
target:
type: AverageValue
averageValue: "20"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: MaxStep 6: KEDA ScaledObject (Alternative)
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ml-service-scaler
spec:
scaleTargetRef:
name: ml-service
pollingInterval: 15
cooldownPeriod: 300
minReplicaCount: 2
maxReplicaCount: 50
triggers:
# Prometheus-based scaling
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: ml_request_rate
query: |
sum(rate(ml_requests_total[1m])) by (deployment)
threshold: "100"
# Queue-based scaling
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: ml_queue_depth
query: |
avg(ml_queue_size) by (deployment)
threshold: "20"
# Latency-based scaling
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: ml_p95_latency
query: |
histogram_quantile(0.95,
sum(rate(ml_request_latency_seconds_bucket[5m])) by (le)
)
threshold: "0.5"Step 7: Load Testing
"""Load testing with Locust."""
from locust import HttpUser, task, between
import random
class MLServiceUser(HttpUser):
"""Simulated user for load testing."""
wait_time = between(0.1, 0.5)
@task(10)
def predict(self):
"""Make prediction request."""
features = [random.random() for _ in range(4)]
self.client.post(
"/predict",
json={"features": features}
)
@task(1)
def health_check(self):
"""Check health endpoint."""
self.client.get("/health")
class BurstUser(HttpUser):
"""User that generates burst traffic."""
wait_time = between(0, 0.01) # Minimal wait
@task
def predict_burst(self):
"""Burst prediction requests."""
features = [random.random() for _ in range(4)]
self.client.post(
"/predict",
json={"features": features}
)Step 8: Prometheus Rules
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: ml-service-rules
spec:
groups:
- name: ml-service-scaling
interval: 15s
rules:
# Request rate per pod
- record: ml_requests_per_second
expr: |
sum(rate(ml_requests_total[1m])) by (pod)
# Average queue size
- record: ml_avg_queue_size
expr: |
avg(ml_queue_size) by (deployment)
# P95 latency
- record: ml_p95_latency
expr: |
histogram_quantile(0.95,
sum(rate(ml_request_latency_seconds_bucket[5m])) by (le)
)
- name: ml-service-alerts
rules:
- alert: HighLatency
expr: ml_p95_latency > 1
for: 5m
labels:
severity: warning
annotations:
summary: "High ML service latency"
- alert: HighQueueDepth
expr: ml_avg_queue_size > 50
for: 2m
labels:
severity: warning
annotations:
summary: "ML service queue depth high"
- alert: ScalingLimit
expr: |
kube_horizontalpodautoscaler_status_current_replicas
== kube_horizontalpodautoscaler_spec_max_replicas
for: 10m
labels:
severity: critical
annotations:
summary: "ML service at max replicas"Scaling Strategies
| Strategy | Use Case | Pros | Cons |
|---|---|---|---|
| CPU-based | General workloads | Simple, reliable | Slow response |
| Request-based | API services | Direct correlation | Needs custom metrics |
| Queue-based | Batch processing | Handles bursts | Requires queue monitoring |
| Predictive | Scheduled traffic | Proactive | Requires historical data |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| Request Batching | Queue requests, process in groups | Better GPU utilization, higher throughput |
| max_wait_ms | Max time to wait before processing batch | Balance latency vs batch efficiency |
| HPA | Kubernetes Horizontal Pod Autoscaler | Built-in, simple CPU/memory scaling |
| KEDA | Event-driven autoscaler | Prometheus metrics, more triggers |
| Custom Metrics | Queue size, RPS, latency gauges | Scale on application-specific signals |
| Stabilization Window | Wait before scaling down | Prevent thrashing on traffic spikes |
| Scale-Up Policy | How fast to add pods | Fast (100%/15s) for burst handling |
| Scale-Down Policy | How fast to remove pods | Slow (10%/60s) to maintain stability |
Load Testing Commands
# Start Locust
locust -f load-tests/locustfile.py --host http://localhost:8000
# Run headless load test
locust -f load-tests/locustfile.py \
--host http://localhost:8000 \
--users 100 \
--spawn-rate 10 \
--run-time 5m \
--headless
# Watch HPA scaling
kubectl get hpa ml-service-hpa -w
# Check pod scaling
kubectl get pods -l app=ml-service -wNext Steps
- Model Registry - Version and deploy models
- Complete Pipeline - Full CI/CD integration