MLOpsAdvanced
Complete MLOps Pipeline
Build an end-to-end ML pipeline with CI/CD, testing, and automated deployment
Complete MLOps Pipeline
Build a production-grade ML pipeline with automated training, testing, and deployment using GitHub Actions.
TL;DR
On PR: run tests + lint. On main merge: train model with MLflow tracking, validate against thresholds (accuracy > 0.85), register to MLflow. CD pipeline: build Docker image, deploy to staging, run integration tests, blue-green deploy to production. Prefect orchestrates training; GitHub Actions handles CI/CD.
What You'll Learn
- CI/CD for ML projects
- Automated model training
- Model validation and testing
- Blue-green deployments
- Pipeline orchestration
Tech Stack
| Component | Technology |
|---|---|
| CI/CD | GitHub Actions |
| Orchestration | Prefect / Airflow |
| Registry | MLflow |
| Deployment | Kubernetes |
| Testing | pytest, Great Expectations |
Architecture
┌──────────────────────────────────────────────────────────────────────────────┐
│ COMPLETE MLOPS PIPELINE │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ DEVELOPMENT │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Code Change │───▶│ Pull Request│───▶│ Unit Tests │ │ │
│ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ │ │
│ │ │ Linting │ │ │
│ │ └──────┬──────┘ │ │
│ └────────────────────────────────────────────────┼───────────────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────┼───────────────────┐ │
│ │ CI PIPELINE │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Train Model │───▶│ Validation │───▶│ Register │ │ │
│ │ │ (MLflow) │ │ (thresholds)│ │ (Staging) │ │ │
│ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ │
│ └────────────────────────────────────────────────┼───────────────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────┼───────────────────┐ │
│ │ CD PIPELINE │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Staging │───▶│ Integration │───▶│ Production │ │ │
│ │ │ Deploy │ │ Tests │ │ Blue-Green │ │ │
│ │ └─────────────┘ └─────────────┘ └──────┬──────┘ │ │
│ └────────────────────────────────────────────────┼───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Monitoring ──▶ Drift Detected? ──▶ Retrain Trigger ──▶ CI Pipeline │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘Project Structure
mlops-pipeline/
├── src/
│ ├── __init__.py
│ ├── train.py # Training script
│ ├── evaluate.py # Model evaluation
│ ├── serve.py # Model serving
│ └── utils.py # Utilities
├── pipelines/
│ ├── training.py # Training pipeline
│ └── deployment.py # Deployment pipeline
├── tests/
│ ├── unit/
│ ├── integration/
│ └── data_validation/
├── .github/
│ └── workflows/
│ ├── ci.yml
│ └── cd.yml
├── kubernetes/
│ ├── deployment.yaml
│ └── service.yaml
├── Dockerfile
└── requirements.txtImplementation
Step 1: Training Script
"""Model training with MLflow tracking."""
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
import mlflow
import numpy as np
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, classification_report
import joblib
@dataclass
class TrainingConfig:
"""Training configuration."""
model_name: str = "iris-classifier"
n_estimators: int = 100
max_depth: Optional[int] = None
test_size: float = 0.2
random_state: int = 42
output_dir: str = "./models"
@dataclass
class TrainingResult:
"""Training result."""
model_path: str
accuracy: float
f1_score: float
cv_score: float
run_id: str
def load_data() -> Tuple[np.ndarray, np.ndarray]:
"""Load training data."""
iris = load_iris()
return iris.data, iris.target
def train_model(config: TrainingConfig) -> TrainingResult:
"""
Train model with MLflow tracking.
Args:
config: Training configuration
Returns:
TrainingResult with metrics and paths
"""
# Set up MLflow
mlflow.set_experiment(config.model_name)
with mlflow.start_run() as run:
# Log parameters
mlflow.log_params({
"n_estimators": config.n_estimators,
"max_depth": config.max_depth,
"test_size": config.test_size,
})
# Load and split data
X, y = load_data()
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=config.test_size,
random_state=config.random_state
)
# Train model
model = RandomForestClassifier(
n_estimators=config.n_estimators,
max_depth=config.max_depth,
random_state=config.random_state
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average="weighted")
# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5)
cv_mean = cv_scores.mean()
# Log metrics
mlflow.log_metrics({
"accuracy": accuracy,
"f1_score": f1,
"cv_score_mean": cv_mean,
"cv_score_std": cv_scores.std(),
})
# Save model
output_dir = Path(config.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
model_path = output_dir / f"{config.model_name}.joblib"
joblib.dump(model, model_path)
# Log model to MLflow
mlflow.sklearn.log_model(model, "model")
# Log classification report
report = classification_report(y_test, y_pred)
mlflow.log_text(report, "classification_report.txt")
return TrainingResult(
model_path=str(model_path),
accuracy=accuracy,
f1_score=f1,
cv_score=cv_mean,
run_id=run.info.run_id
)
if __name__ == "__main__":
config = TrainingConfig()
result = train_model(config)
print(f"Training complete. Accuracy: {result.accuracy:.4f}")
print(f"Model saved to: {result.model_path}")
print(f"MLflow run ID: {result.run_id}")Step 2: Model Evaluation
"""Model evaluation and validation."""
from dataclasses import dataclass
from typing import Dict, List, Optional
import numpy as np
from sklearn.metrics import (
accuracy_score,
precision_score,
recall_score,
f1_score,
confusion_matrix
)
import joblib
@dataclass
class EvaluationResult:
"""Evaluation result."""
accuracy: float
precision: float
recall: float
f1: float
confusion_matrix: np.ndarray
passed: bool
failures: List[str]
@dataclass
class ValidationThresholds:
"""Validation thresholds."""
min_accuracy: float = 0.85
min_f1: float = 0.80
max_latency_ms: float = 100.0
class ModelEvaluator:
"""Evaluate model against thresholds."""
def __init__(self, thresholds: ValidationThresholds):
self.thresholds = thresholds
def evaluate(
self,
model_path: str,
X_test: np.ndarray,
y_test: np.ndarray
) -> EvaluationResult:
"""
Evaluate model on test data.
Args:
model_path: Path to saved model
X_test: Test features
y_test: Test labels
Returns:
EvaluationResult with metrics and validation status
"""
# Load model
model = joblib.load(model_path)
# Predictions
y_pred = model.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average="weighted")
recall = recall_score(y_test, y_pred, average="weighted")
f1 = f1_score(y_test, y_pred, average="weighted")
cm = confusion_matrix(y_test, y_pred)
# Validate against thresholds
failures = []
if accuracy < self.thresholds.min_accuracy:
failures.append(
f"Accuracy {accuracy:.4f} < {self.thresholds.min_accuracy}"
)
if f1 < self.thresholds.min_f1:
failures.append(
f"F1 score {f1:.4f} < {self.thresholds.min_f1}"
)
return EvaluationResult(
accuracy=accuracy,
precision=precision,
recall=recall,
f1=f1,
confusion_matrix=cm,
passed=len(failures) == 0,
failures=failures
)
def compare_models(
self,
current_model: str,
new_model: str,
X_test: np.ndarray,
y_test: np.ndarray,
min_improvement: float = 0.01
) -> Dict:
"""Compare new model against current production model."""
current_result = self.evaluate(current_model, X_test, y_test)
new_result = self.evaluate(new_model, X_test, y_test)
improvement = new_result.accuracy - current_result.accuracy
should_deploy = improvement >= min_improvement
return {
"current_accuracy": current_result.accuracy,
"new_accuracy": new_result.accuracy,
"improvement": improvement,
"should_deploy": should_deploy,
"new_model_passed": new_result.passed
}Step 3: Data Validation
"""Data validation tests using Great Expectations style."""
from dataclasses import dataclass
from typing import List, Optional
import numpy as np
import pandas as pd
@dataclass
class ValidationResult:
"""Data validation result."""
passed: bool
checks_passed: int
checks_failed: int
failures: List[str]
class DataValidator:
"""Validate data quality."""
def __init__(self):
self.failures: List[str] = []
def expect_column_values_to_not_be_null(
self,
df: pd.DataFrame,
column: str
) -> bool:
"""Check for null values."""
null_count = df[column].isnull().sum()
if null_count > 0:
self.failures.append(
f"Column '{column}' has {null_count} null values"
)
return False
return True
def expect_column_values_to_be_in_range(
self,
df: pd.DataFrame,
column: str,
min_value: float,
max_value: float
) -> bool:
"""Check value range."""
values = df[column]
out_of_range = ((values < min_value) | (values > max_value)).sum()
if out_of_range > 0:
self.failures.append(
f"Column '{column}' has {out_of_range} values outside "
f"range [{min_value}, {max_value}]"
)
return False
return True
def expect_column_unique_value_count_to_be_between(
self,
df: pd.DataFrame,
column: str,
min_count: int,
max_count: int
) -> bool:
"""Check unique value count."""
unique_count = df[column].nunique()
if not (min_count <= unique_count <= max_count):
self.failures.append(
f"Column '{column}' has {unique_count} unique values, "
f"expected between {min_count} and {max_count}"
)
return False
return True
def expect_table_row_count_to_be_between(
self,
df: pd.DataFrame,
min_count: int,
max_count: int
) -> bool:
"""Check row count."""
row_count = len(df)
if not (min_count <= row_count <= max_count):
self.failures.append(
f"Table has {row_count} rows, "
f"expected between {min_count} and {max_count}"
)
return False
return True
def validate(self, df: pd.DataFrame) -> ValidationResult:
"""Run all validations."""
self.failures = []
checks = [
self.expect_table_row_count_to_be_between(df, 100, 10000),
self.expect_column_values_to_not_be_null(df, "feature_0"),
# Add more checks as needed
]
passed = sum(checks)
failed = len(checks) - passed
return ValidationResult(
passed=failed == 0,
checks_passed=passed,
checks_failed=failed,
failures=self.failures
)Step 4: Training Pipeline (Prefect)
"""Training pipeline with Prefect."""
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact
from datetime import datetime
import mlflow
from src.train import train_model, TrainingConfig, TrainingResult
from src.evaluate import ModelEvaluator, ValidationThresholds
@task(name="load-data")
def load_training_data():
"""Load and prepare training data."""
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target,
test_size=0.2,
random_state=42
)
return X_train, X_test, y_train, y_test
@task(name="train-model")
def train(config: TrainingConfig) -> TrainingResult:
"""Train the model."""
return train_model(config)
@task(name="evaluate-model")
def evaluate(model_path: str, X_test, y_test) -> dict:
"""Evaluate the trained model."""
evaluator = ModelEvaluator(ValidationThresholds())
result = evaluator.evaluate(model_path, X_test, y_test)
return {
"accuracy": result.accuracy,
"f1_score": result.f1,
"passed": result.passed,
"failures": result.failures
}
@task(name="register-model")
def register_model(
run_id: str,
model_name: str,
stage: str = "Staging"
) -> str:
"""Register model in MLflow registry."""
client = mlflow.tracking.MlflowClient()
# Register model
model_uri = f"runs:/{run_id}/model"
result = mlflow.register_model(model_uri, model_name)
# Transition to stage
client.transition_model_version_stage(
name=model_name,
version=result.version,
stage=stage
)
return result.version
@task(name="create-report")
def create_report(training_result: TrainingResult, eval_result: dict):
"""Create training report artifact."""
report = f"""
# Training Report
**Date**: {datetime.now().isoformat()}
**Run ID**: {training_result.run_id}
## Metrics
| Metric | Value |
|--------|-------|
| Accuracy | {training_result.accuracy:.4f} |
| F1 Score | {training_result.f1_score:.4f} |
| CV Score | {training_result.cv_score:.4f} |
## Validation
- **Status**: {"PASSED" if eval_result["passed"] else "FAILED"}
- **Failures**: {eval_result["failures"] or "None"}
## Model
- **Path**: {training_result.model_path}
"""
create_markdown_artifact(
key="training-report",
markdown=report,
description="Training pipeline report"
)
@flow(name="training-pipeline")
def training_pipeline(
model_name: str = "iris-classifier",
n_estimators: int = 100,
register: bool = True
):
"""
Complete training pipeline.
Args:
model_name: Name for the model
n_estimators: Number of trees
register: Whether to register in MLflow
"""
# Load data
X_train, X_test, y_train, y_test = load_training_data()
# Configure and train
config = TrainingConfig(
model_name=model_name,
n_estimators=n_estimators
)
training_result = train(config)
# Evaluate
eval_result = evaluate(training_result.model_path, X_test, y_test)
# Register if validation passed
if eval_result["passed"] and register:
version = register_model(
training_result.run_id,
model_name,
"Staging"
)
print(f"Model registered as version {version}")
# Create report
create_report(training_result, eval_result)
return {
"training": training_result,
"evaluation": eval_result
}
if __name__ == "__main__":
result = training_pipeline()
print(f"Pipeline complete: {result}")Step 5: GitHub Actions CI
name: CI Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
PYTHON_VERSION: "3.11"
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: |
pytest tests/unit -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install linters
run: pip install ruff mypy
- name: Run ruff
run: ruff check src/
- name: Run mypy
run: mypy src/ --ignore-missing-imports
train:
needs: [test, lint]
runs-on: ubuntu-latest
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training pipeline
run: python pipelines/training.py
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 7Step 6: GitHub Actions CD
name: CD Pipeline
on:
workflow_run:
workflows: ["CI Pipeline"]
types: [completed]
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
build-and-push:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
permissions:
contents: read
packages: write
outputs:
image-tag: ${{ steps.meta.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=
type=raw,value=latest
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
deploy-staging:
needs: build-and-push
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v4
- name: Set up kubectl
uses: azure/setup-kubectl@v4
- name: Configure kubectl
run: |
echo "${{ secrets.KUBE_CONFIG }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
- name: Deploy to staging
run: |
kubectl set image deployment/ml-service \
ml-service=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n staging
- name: Wait for rollout
run: |
kubectl rollout status deployment/ml-service -n staging --timeout=300s
integration-tests:
needs: deploy-staging
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run integration tests
run: |
pip install pytest requests
pytest tests/integration -v \
--base-url=${{ secrets.STAGING_URL }}
deploy-production:
needs: integration-tests
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Blue-green deployment
run: |
# Deploy to green
kubectl apply -f kubernetes/deployment-green.yaml -n production
# Wait for green to be ready
kubectl rollout status deployment/ml-service-green -n production
# Switch traffic
kubectl patch service ml-service -n production \
-p '{"spec":{"selector":{"version":"green"}}}'
# Scale down blue
kubectl scale deployment ml-service-blue --replicas=0 -n production
- name: Notify deployment
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "ML Service deployed to production",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Deployment complete: ${{ github.sha }}"
}
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}Step 7: Kubernetes Manifests
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-service
labels:
app: ml-service
spec:
replicas: 3
selector:
matchLabels:
app: ml-service
template:
metadata:
labels:
app: ml-service
spec:
containers:
- name: ml-service
image: ghcr.io/org/ml-service:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: MODEL_PATH
value: /models/model.joblib
- name: MLFLOW_TRACKING_URI
valueFrom:
secretKeyRef:
name: mlflow-secrets
key: tracking-uri
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
volumeMounts:
- name: model-volume
mountPath: /models
volumes:
- name: model-volume
persistentVolumeClaim:
claimName: model-pvc
---
apiVersion: v1
kind: Service
metadata:
name: ml-service
spec:
selector:
app: ml-service
ports:
- port: 80
targetPort: 8000
type: ClusterIPPipeline Stages
| Stage | Purpose | Trigger |
|---|---|---|
| Test | Unit tests, coverage | Every PR |
| Lint | Code quality | Every PR |
| Train | Model training | Main branch |
| Deploy Staging | Integration testing | Training success |
| Deploy Production | Live deployment | Staging tests pass |
Key Concepts Recap
| Concept | What It Is | Why It Matters |
|---|---|---|
| MLflow Tracking | Log params, metrics, artifacts per run | Reproducibility, experiment comparison |
| Model Registry | Version models with stages (Staging/Prod) | Controlled promotion, rollback |
| Validation Thresholds | Min accuracy, F1 before deployment | Gate bad models before production |
| Prefect Flow | DAG of tasks with retries, caching | Orchestrate complex training pipelines |
| Blue-Green Deploy | Run two versions, switch traffic instantly | Zero-downtime deployments |
| Integration Tests | Test full API against staging | Catch issues before production |
| Data Validation | Check nulls, ranges, row counts | Prevent garbage-in, garbage-out |
| Drift-Triggered Retrain | Monitoring detects drift → auto retrain | Maintain model quality over time |
Next Steps
- Auto-Scaling - Scale based on load
- Model Registry - Version control models