MLOpsAdvanced
Model Registry System
Build a model registry for versioning, tracking, and deploying ML models
Model Registry System
TL;DR
Build a model registry that manages version control, stage transitions (None → Staging → Production → Archived), and artifact storage. Uses MLflow patterns with S3/MinIO for artifacts, Redis for metadata, and FastAPI for API access.
Create a production model registry with versioning, metadata tracking, and deployment automation.
What You'll Learn
- Model versioning and lineage
- Metadata and artifact management
- Stage transitions (staging → production)
- Model comparison and selection
- Deployment automation
Tech Stack
| Component | Technology |
|---|---|
| Registry | MLflow |
| Storage | S3 / MinIO |
| Database | PostgreSQL |
| API | FastAPI |
Architecture
┌──────────────────────────────────────────────────────────────────────────────┐
│ MODEL REGISTRY ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────┐ │
│ │ Training Pipeline │ │
│ └─────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────┐ ┌────────────────────────────────────────┐ │
│ │ Register Model │────────▶│ MODEL REGISTRY │ │
│ └─────────┬─────────┘ │ ┌──────────────┐ ┌─────────────────┐ │ │
│ │ │ │ Metadata DB │ │ Artifact Store │ │ │
│ │ │ │ (Redis) │ │ (S3/MinIO) │ │ │
│ │ │ └──────────────┘ └─────────────────┘ │ │
│ │ │ ┌──────────────────────────────────┐ │ │
│ │ │ │ Version Control │ │ │
│ │ │ │ v1 → v2 → v3 → ... → vN │ │ │
│ │ │ └──────────────────────────────────┘ │ │
│ │ └────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ STAGE MANAGEMENT │ │
│ │ │ │
│ │ ┌──────┐ ┌─────────┐ ┌────────────┐ ┌──────────┐ │ │
│ │ │ None │─────▶│ Staging │─────▶│ Production │─────▶│ Archived │ │ │
│ │ └──────┘ └────┬────┘ └──────┬─────┘ └──────────┘ │ │
│ │ │ │ │ │
│ │ ┌──────▼──────┐ │ │ │
│ │ │ Approval? │ │ │ │
│ │ │ ┌───┐ │ │ │ │
│ │ │ │Yes│─────┼───────────┘ │ │
│ │ │ └───┘ │ │ │
│ │ │ ┌───┐ │ │ │
│ │ │ │No │◀────┘ (back to staging) │ │
│ │ │ └───┘ │ │
│ │ └─────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Deployment │ │
│ │ (Auto-triggered) │ │
│ └───────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘Project Structure
model-registry/
├── src/
│ ├── __init__.py
│ ├── registry.py # Registry core
│ ├── models.py # Data models
│ ├── storage.py # Artifact storage
│ ├── metadata.py # Metadata management
│ └── api.py # FastAPI application
├── tests/
├── migrations/
├── docker-compose.yml
└── requirements.txtImplementation
Step 1: Data Models
"""Data models for model registry."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, Any
import uuid
class ModelStage(str, Enum):
"""Model deployment stages."""
NONE = "None"
STAGING = "Staging"
PRODUCTION = "Production"
ARCHIVED = "Archived"
class ModelStatus(str, Enum):
"""Model status."""
PENDING = "pending"
READY = "ready"
FAILED = "failed"
DEPLOYING = "deploying"
DEPLOYED = "deployed"
@dataclass
class ModelMetrics:
"""Model performance metrics."""
accuracy: float = 0.0
f1_score: float = 0.0
precision: float = 0.0
recall: float = 0.0
latency_ms: float = 0.0
custom_metrics: dict = field(default_factory=dict)
@dataclass
class ModelArtifact:
"""Model artifact information."""
artifact_id: str = field(default_factory=lambda: str(uuid.uuid4()))
path: str = ""
size_bytes: int = 0
checksum: str = ""
format: str = "pickle" # pickle, onnx, torchscript, etc.
@dataclass
class ModelVersion:
"""A specific version of a model."""
version_id: str = field(default_factory=lambda: str(uuid.uuid4()))
model_name: str = ""
version: int = 1
stage: ModelStage = ModelStage.NONE
status: ModelStatus = ModelStatus.PENDING
# Metadata
description: str = ""
tags: dict = field(default_factory=dict)
parameters: dict = field(default_factory=dict)
metrics: ModelMetrics = field(default_factory=ModelMetrics)
# Artifacts
artifact: Optional[ModelArtifact] = None
# Lineage
run_id: str = ""
experiment_id: str = ""
parent_version: Optional[str] = None
# Timestamps
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
deployed_at: Optional[datetime] = None
# Creator
created_by: str = ""
@dataclass
class RegisteredModel:
"""A registered model with all versions."""
model_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
tags: dict = field(default_factory=dict)
versions: list[ModelVersion] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
@property
def latest_version(self) -> Optional[ModelVersion]:
"""Get the latest version."""
if not self.versions:
return None
return max(self.versions, key=lambda v: v.version)
@property
def production_version(self) -> Optional[ModelVersion]:
"""Get the production version."""
for version in self.versions:
if version.stage == ModelStage.PRODUCTION:
return version
return None
@property
def staging_version(self) -> Optional[ModelVersion]:
"""Get the staging version."""
for version in self.versions:
if version.stage == ModelStage.STAGING:
return version
return NoneStep 2: Artifact Storage
"""Artifact storage for model files."""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import BinaryIO, Optional
import hashlib
import boto3
from botocore.exceptions import ClientError
import logging
logger = logging.getLogger(__name__)
class ArtifactStorage(ABC):
"""Abstract artifact storage."""
@abstractmethod
def upload(self, local_path: str, artifact_path: str) -> str:
"""Upload artifact and return storage path."""
pass
@abstractmethod
def download(self, artifact_path: str, local_path: str) -> bool:
"""Download artifact to local path."""
pass
@abstractmethod
def delete(self, artifact_path: str) -> bool:
"""Delete artifact."""
pass
@abstractmethod
def exists(self, artifact_path: str) -> bool:
"""Check if artifact exists."""
pass
class LocalStorage(ArtifactStorage):
"""Local filesystem storage."""
def __init__(self, base_path: str = "./artifacts"):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
def upload(self, local_path: str, artifact_path: str) -> str:
"""Copy file to storage."""
import shutil
source = Path(local_path)
dest = self.base_path / artifact_path
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, dest)
return str(dest)
def download(self, artifact_path: str, local_path: str) -> bool:
"""Copy file from storage."""
import shutil
source = self.base_path / artifact_path
if not source.exists():
return False
dest = Path(local_path)
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, dest)
return True
def delete(self, artifact_path: str) -> bool:
"""Delete file from storage."""
path = self.base_path / artifact_path
if path.exists():
path.unlink()
return True
return False
def exists(self, artifact_path: str) -> bool:
"""Check if file exists."""
return (self.base_path / artifact_path).exists()
class S3Storage(ArtifactStorage):
"""AWS S3 storage."""
def __init__(
self,
bucket: str,
prefix: str = "models/",
endpoint_url: Optional[str] = None
):
self.bucket = bucket
self.prefix = prefix
self.s3 = boto3.client("s3", endpoint_url=endpoint_url)
def _full_path(self, artifact_path: str) -> str:
"""Get full S3 path."""
return f"{self.prefix}{artifact_path}"
def upload(self, local_path: str, artifact_path: str) -> str:
"""Upload to S3."""
s3_path = self._full_path(artifact_path)
try:
self.s3.upload_file(local_path, self.bucket, s3_path)
return f"s3://{self.bucket}/{s3_path}"
except ClientError as e:
logger.error(f"S3 upload failed: {e}")
raise
def download(self, artifact_path: str, local_path: str) -> bool:
"""Download from S3."""
s3_path = self._full_path(artifact_path)
try:
Path(local_path).parent.mkdir(parents=True, exist_ok=True)
self.s3.download_file(self.bucket, s3_path, local_path)
return True
except ClientError:
return False
def delete(self, artifact_path: str) -> bool:
"""Delete from S3."""
s3_path = self._full_path(artifact_path)
try:
self.s3.delete_object(Bucket=self.bucket, Key=s3_path)
return True
except ClientError:
return False
def exists(self, artifact_path: str) -> bool:
"""Check if exists in S3."""
s3_path = self._full_path(artifact_path)
try:
self.s3.head_object(Bucket=self.bucket, Key=s3_path)
return True
except ClientError:
return False
def compute_checksum(file_path: str) -> str:
"""Compute SHA256 checksum of file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()Step 3: Registry Core
"""Model registry core functionality."""
from dataclasses import asdict
from datetime import datetime
from typing import Optional, List
import json
import logging
from .models import (
RegisteredModel,
ModelVersion,
ModelStage,
ModelStatus,
ModelArtifact,
ModelMetrics
)
from .storage import ArtifactStorage, compute_checksum
logger = logging.getLogger(__name__)
class ModelRegistry:
"""
Central model registry for versioning and deployment.
Manages model registration, versioning, stage transitions,
and artifact storage.
"""
def __init__(self, storage: ArtifactStorage, db_client):
self.storage = storage
self.db = db_client
def register_model(
self,
name: str,
description: str = "",
tags: dict = None
) -> RegisteredModel:
"""
Register a new model.
Args:
name: Unique model name
description: Model description
tags: Optional tags
Returns:
RegisteredModel instance
"""
# Check if model exists
existing = self.get_model(name)
if existing:
raise ValueError(f"Model '{name}' already exists")
model = RegisteredModel(
name=name,
description=description,
tags=tags or {}
)
self._save_model(model)
logger.info(f"Registered model: {name}")
return model
def create_version(
self,
model_name: str,
model_path: str,
run_id: str = "",
parameters: dict = None,
metrics: ModelMetrics = None,
description: str = "",
tags: dict = None
) -> ModelVersion:
"""
Create a new model version.
Args:
model_name: Name of registered model
model_path: Local path to model file
run_id: Training run ID
parameters: Model parameters
metrics: Performance metrics
description: Version description
tags: Optional tags
Returns:
ModelVersion instance
"""
model = self.get_model(model_name)
if not model:
# Auto-register model
model = self.register_model(model_name)
# Calculate next version number
next_version = 1
if model.versions:
next_version = max(v.version for v in model.versions) + 1
# Upload artifact
artifact_path = f"{model_name}/v{next_version}/model"
storage_path = self.storage.upload(model_path, artifact_path)
# Create artifact metadata
import os
artifact = ModelArtifact(
path=storage_path,
size_bytes=os.path.getsize(model_path),
checksum=compute_checksum(model_path)
)
# Create version
version = ModelVersion(
model_name=model_name,
version=next_version,
description=description,
tags=tags or {},
parameters=parameters or {},
metrics=metrics or ModelMetrics(),
artifact=artifact,
run_id=run_id,
status=ModelStatus.READY
)
model.versions.append(version)
model.updated_at = datetime.now()
self._save_model(model)
logger.info(f"Created version {next_version} for model {model_name}")
return version
def transition_stage(
self,
model_name: str,
version: int,
stage: ModelStage,
archive_existing: bool = True
) -> ModelVersion:
"""
Transition model version to a new stage.
Args:
model_name: Model name
version: Version number
stage: Target stage
archive_existing: Archive existing version in target stage
Returns:
Updated ModelVersion
"""
model = self.get_model(model_name)
if not model:
raise ValueError(f"Model '{model_name}' not found")
target_version = None
for v in model.versions:
if v.version == version:
target_version = v
break
if not target_version:
raise ValueError(f"Version {version} not found")
# Archive existing version in target stage
if archive_existing and stage in (ModelStage.STAGING, ModelStage.PRODUCTION):
for v in model.versions:
if v.stage == stage and v.version != version:
v.stage = ModelStage.ARCHIVED
v.updated_at = datetime.now()
# Update target version
target_version.stage = stage
target_version.updated_at = datetime.now()
if stage == ModelStage.PRODUCTION:
target_version.deployed_at = datetime.now()
self._save_model(model)
logger.info(
f"Transitioned {model_name} v{version} to {stage.value}"
)
return target_version
def get_model(self, name: str) -> Optional[RegisteredModel]:
"""Get model by name."""
data = self.db.get(f"model:{name}")
if data:
return self._deserialize_model(data)
return None
def get_version(
self,
model_name: str,
version: Optional[int] = None,
stage: Optional[ModelStage] = None
) -> Optional[ModelVersion]:
"""
Get specific model version.
Args:
model_name: Model name
version: Specific version number
stage: Get version by stage
Returns:
ModelVersion if found
"""
model = self.get_model(model_name)
if not model:
return None
if version:
for v in model.versions:
if v.version == version:
return v
if stage:
for v in model.versions:
if v.stage == stage:
return v
return model.latest_version
def list_models(self) -> List[RegisteredModel]:
"""List all registered models."""
models = []
for key in self.db.scan_iter("model:*"):
data = self.db.get(key)
if data:
models.append(self._deserialize_model(data))
return models
def compare_versions(
self,
model_name: str,
version_a: int,
version_b: int
) -> dict:
"""Compare two model versions."""
model = self.get_model(model_name)
if not model:
raise ValueError(f"Model '{model_name}' not found")
va = vb = None
for v in model.versions:
if v.version == version_a:
va = v
elif v.version == version_b:
vb = v
if not va or not vb:
raise ValueError("Version not found")
return {
"version_a": {
"version": va.version,
"metrics": asdict(va.metrics),
"parameters": va.parameters
},
"version_b": {
"version": vb.version,
"metrics": asdict(vb.metrics),
"parameters": vb.parameters
},
"metric_diff": {
"accuracy": vb.metrics.accuracy - va.metrics.accuracy,
"f1_score": vb.metrics.f1_score - va.metrics.f1_score,
}
}
def download_model(
self,
model_name: str,
version: Optional[int] = None,
stage: Optional[ModelStage] = None,
local_path: str = "./downloaded_model"
) -> str:
"""Download model artifact."""
model_version = self.get_version(model_name, version, stage)
if not model_version:
raise ValueError("Model version not found")
if not model_version.artifact:
raise ValueError("No artifact found")
# Extract artifact path from storage path
artifact_path = f"{model_name}/v{model_version.version}/model"
self.storage.download(artifact_path, local_path)
return local_path
def _save_model(self, model: RegisteredModel) -> None:
"""Save model to database."""
data = self._serialize_model(model)
self.db.set(f"model:{model.name}", data)
def _serialize_model(self, model: RegisteredModel) -> str:
"""Serialize model to JSON."""
data = {
"model_id": model.model_id,
"name": model.name,
"description": model.description,
"tags": model.tags,
"created_at": model.created_at.isoformat(),
"updated_at": model.updated_at.isoformat(),
"versions": []
}
for v in model.versions:
version_data = {
"version_id": v.version_id,
"model_name": v.model_name,
"version": v.version,
"stage": v.stage.value,
"status": v.status.value,
"description": v.description,
"tags": v.tags,
"parameters": v.parameters,
"metrics": asdict(v.metrics),
"run_id": v.run_id,
"created_at": v.created_at.isoformat(),
"updated_at": v.updated_at.isoformat(),
}
if v.artifact:
version_data["artifact"] = asdict(v.artifact)
if v.deployed_at:
version_data["deployed_at"] = v.deployed_at.isoformat()
data["versions"].append(version_data)
return json.dumps(data)
def _deserialize_model(self, data: str) -> RegisteredModel:
"""Deserialize model from JSON."""
d = json.loads(data)
model = RegisteredModel(
model_id=d["model_id"],
name=d["name"],
description=d["description"],
tags=d["tags"],
created_at=datetime.fromisoformat(d["created_at"]),
updated_at=datetime.fromisoformat(d["updated_at"])
)
for vd in d["versions"]:
artifact = None
if "artifact" in vd:
artifact = ModelArtifact(**vd["artifact"])
version = ModelVersion(
version_id=vd["version_id"],
model_name=vd["model_name"],
version=vd["version"],
stage=ModelStage(vd["stage"]),
status=ModelStatus(vd["status"]),
description=vd["description"],
tags=vd["tags"],
parameters=vd["parameters"],
metrics=ModelMetrics(**vd["metrics"]),
artifact=artifact,
run_id=vd["run_id"],
created_at=datetime.fromisoformat(vd["created_at"]),
updated_at=datetime.fromisoformat(vd["updated_at"]),
deployed_at=datetime.fromisoformat(vd["deployed_at"]) if vd.get("deployed_at") else None
)
model.versions.append(version)
return modelStep 4: FastAPI Application
"""FastAPI application for model registry."""
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Optional, List
import tempfile
import redis
import os
from .registry import ModelRegistry
from .storage import LocalStorage, S3Storage
from .models import ModelStage, ModelMetrics
# Global registry
registry: ModelRegistry = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan."""
global registry
# Initialize storage
storage_type = os.getenv("STORAGE_TYPE", "local")
if storage_type == "s3":
storage = S3Storage(
bucket=os.getenv("S3_BUCKET", "models"),
endpoint_url=os.getenv("S3_ENDPOINT")
)
else:
storage = LocalStorage("./artifacts")
# Initialize database
db = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379))
)
registry = ModelRegistry(storage, db)
yield
db.close()
app = FastAPI(
title="Model Registry API",
description="ML Model Registry Service",
lifespan=lifespan
)
# Request/Response Models
class RegisterModelRequest(BaseModel):
name: str
description: str = ""
tags: dict = {}
class CreateVersionRequest(BaseModel):
run_id: str = ""
description: str = ""
tags: dict = {}
parameters: dict = {}
accuracy: float = 0.0
f1_score: float = 0.0
class TransitionRequest(BaseModel):
stage: str
archive_existing: bool = True
class ModelResponse(BaseModel):
model_id: str
name: str
description: str
tags: dict
version_count: int
latest_version: Optional[int]
production_version: Optional[int]
class VersionResponse(BaseModel):
version_id: str
version: int
stage: str
status: str
accuracy: float
f1_score: float
created_at: str
@app.post("/models", response_model=ModelResponse)
async def register_model(request: RegisterModelRequest):
"""Register a new model."""
try:
model = registry.register_model(
name=request.name,
description=request.description,
tags=request.tags
)
return ModelResponse(
model_id=model.model_id,
name=model.name,
description=model.description,
tags=model.tags,
version_count=0,
latest_version=None,
production_version=None
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/models", response_model=List[ModelResponse])
async def list_models():
"""List all registered models."""
models = registry.list_models()
return [
ModelResponse(
model_id=m.model_id,
name=m.name,
description=m.description,
tags=m.tags,
version_count=len(m.versions),
latest_version=m.latest_version.version if m.latest_version else None,
production_version=m.production_version.version if m.production_version else None
)
for m in models
]
@app.get("/models/{model_name}")
async def get_model(model_name: str):
"""Get model details."""
model = registry.get_model(model_name)
if not model:
raise HTTPException(status_code=404, detail="Model not found")
return {
"model_id": model.model_id,
"name": model.name,
"description": model.description,
"tags": model.tags,
"versions": [
{
"version": v.version,
"stage": v.stage.value,
"status": v.status.value,
"metrics": {
"accuracy": v.metrics.accuracy,
"f1_score": v.metrics.f1_score
}
}
for v in model.versions
]
}
@app.post("/models/{model_name}/versions")
async def create_version(
model_name: str,
model_file: UploadFile = File(...),
run_id: str = "",
description: str = "",
accuracy: float = 0.0,
f1_score: float = 0.0
):
"""Create a new model version by uploading model file."""
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await model_file.read()
tmp.write(content)
tmp_path = tmp.name
try:
metrics = ModelMetrics(accuracy=accuracy, f1_score=f1_score)
version = registry.create_version(
model_name=model_name,
model_path=tmp_path,
run_id=run_id,
description=description,
metrics=metrics
)
return {
"version_id": version.version_id,
"version": version.version,
"stage": version.stage.value,
"status": version.status.value
}
finally:
os.unlink(tmp_path)
@app.post("/models/{model_name}/versions/{version}/transition")
async def transition_stage(
model_name: str,
version: int,
request: TransitionRequest
):
"""Transition model version to a new stage."""
try:
stage = ModelStage(request.stage)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid stage: {request.stage}"
)
try:
model_version = registry.transition_stage(
model_name=model_name,
version=version,
stage=stage,
archive_existing=request.archive_existing
)
return {
"version": model_version.version,
"stage": model_version.stage.value,
"status": "success"
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/models/{model_name}/versions/{version}")
async def get_version(model_name: str, version: int):
"""Get specific model version."""
model_version = registry.get_version(model_name, version)
if not model_version:
raise HTTPException(status_code=404, detail="Version not found")
return {
"version_id": model_version.version_id,
"version": model_version.version,
"stage": model_version.stage.value,
"status": model_version.status.value,
"description": model_version.description,
"metrics": {
"accuracy": model_version.metrics.accuracy,
"f1_score": model_version.metrics.f1_score
},
"parameters": model_version.parameters,
"run_id": model_version.run_id,
"created_at": model_version.created_at.isoformat()
}
@app.get("/models/{model_name}/production")
async def get_production_model(model_name: str):
"""Get production model version."""
version = registry.get_version(
model_name,
stage=ModelStage.PRODUCTION
)
if not version:
raise HTTPException(
status_code=404,
detail="No production version found"
)
return {
"version": version.version,
"artifact_path": version.artifact.path if version.artifact else None,
"metrics": {
"accuracy": version.metrics.accuracy,
"f1_score": version.metrics.f1_score
}
}
@app.get("/models/{model_name}/compare")
async def compare_versions(
model_name: str,
version_a: int,
version_b: int
):
"""Compare two model versions."""
try:
comparison = registry.compare_versions(
model_name, version_a, version_b
)
return comparison
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))Step 5: Docker Compose
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- STORAGE_TYPE=local
- REDIS_HOST=redis
volumes:
- ./artifacts:/app/artifacts
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
# Optional: MinIO for S3-compatible storage
minio:
image: minio/minio
ports:
- "9000:9000"
- "9001:9001"
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
command: server /data --console-address ":9001"
volumes:
- minio-data:/data
volumes:
redis-data:
minio-data:Usage Example
# Register a model
curl -X POST http://localhost:8000/models \
-H "Content-Type: application/json" \
-d '{"name": "classifier", "description": "Binary classifier"}'
# Upload model version
curl -X POST http://localhost:8000/models/classifier/versions \
-F "model_file=@model.pkl" \
-F "accuracy=0.95" \
-F "f1_score=0.93"
# Promote to staging
curl -X POST http://localhost:8000/models/classifier/versions/1/transition \
-H "Content-Type: application/json" \
-d '{"stage": "Staging"}'
# Promote to production
curl -X POST http://localhost:8000/models/classifier/versions/1/transition \
-H "Content-Type: application/json" \
-d '{"stage": "Production"}'
# Get production model
curl http://localhost:8000/models/classifier/productionModel Stages
| Stage | Purpose | Typical Duration |
|---|---|---|
| None | Just registered | Until validation |
| Staging | Testing/validation | Days to weeks |
| Production | Live serving | Until replaced |
| Archived | Previous versions | Indefinite |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| ModelStage | Enum tracking deployment state (None/Staging/Production/Archived) | Enables controlled promotion workflow with clear rollback paths |
| ModelVersion | Snapshot of model with metadata, metrics, and artifact reference | Allows comparison between versions and reproducible deployments |
| Artifact Storage | S3/MinIO storage for serialized model files | Separates large binary files from metadata for scalable storage |
| Checksum Validation | SHA256 hash of model file | Ensures model integrity during transfers and detects corruption |
| Stage Transition | API to move versions between stages with auto-archiving | Prevents multiple production versions and maintains clean state |
| Version Comparison | Side-by-side metrics diff between versions | Data-driven decisions about which version to promote |
| Abstract Storage | Interface pattern for LocalStorage and S3Storage | Swap backends without changing registry code |
| Lineage Tracking | Links model to run_id and experiment_id | Trace production models back to training experiments |
Next Steps
- Complete Pipeline - CI/CD integration
- Auto-Scaling - Scale model serving