Перейти до вмісту

ML Pipelines

Цей контент ще не доступний вашою мовою.

AI/ML Engineering Track | Complexity: [COMPLEX] | Time: 6-8


Prerequisites: Module 49 (Data Versioning & Feature Stores)

San Francisco. October 3, 2014. 3:17 AM. Maxime Beauchemin’s phone buzzed with yet another alert. Airbnb’s data pipeline had failed again. This time it was a cascade: the pricing model hadn’t retrained because the feature pipeline died, which happened because the data validation job timed out, which happened because… nobody could tell anymore.

Beauchemin dragged himself to his laptop and started digging through logs. Four hours later, he’d traced the failure to a single upstream task that had silently failed two days ago. The cron job hadn’t reported the error. No one knew until everything downstream collapsed.

“This is insane,” he muttered. “We’re running a billion-dollar company on bash scripts and hope.”

Over the next few months, Beauchemin built something different: a system where tasks declared their dependencies, where failures triggered immediate alerts, where you could see the entire pipeline at a glance. He called it “Airflow.”

“I wrote Airflow out of frustration. Every data team was solving the same problem badly—orchestrating complex workflows with cron and prayer. I thought: what if we could make pipelines first-class citizens, with proper scheduling, monitoring, and visualization?” — Maxime Beauchemin, Creator of Apache Airflow

Airbnb open-sourced Airflow in 2015. Today, it orchestrates ML pipelines at Uber, Spotify, and thousands of other companies. That 3 AM wake-up call spawned an entire industry.


By the end of this module, you will:

  • Master Apache Airflow for ML pipeline orchestration
  • Build Kubeflow Pipelines for Kubernetes-native ML
  • Create visual AI workflows with n8n
  • Compare modern orchestration tools (Prefect, Dagster, Temporal)
  • Design production-ready ML pipelines
  • Implement retry, monitoring, and alerting patterns

The History of ML Orchestration: From Cron to Cloud-Native

Section titled “The History of ML Orchestration: From Cron to Cloud-Native”

Before dedicated orchestration tools, teams ran ML pipelines with cron jobs and bash scripts. This worked—barely—for simple pipelines. But as companies grew, the limitations became painful.

Did You Know? In 2013, LinkedIn’s data team maintained over 10,000 cron jobs across dozens of servers. Engineers spent 30% of their time just debugging scheduling conflicts and mysterious failures. When two jobs tried to write to the same table simultaneously, the only clue was corrupted data discovered days later.

The problems were universal:

  • No dependency management: Cron doesn’t know job A must finish before job B starts
  • No visibility: You couldn’t see what was running, what failed, or why
  • No retries: Failures meant manual intervention or data loss
  • No auditing: Who ran what? When? With what parameters? Nobody knew

The Birth of Modern Orchestration (2014-2016)

Section titled “The Birth of Modern Orchestration (2014-2016)”

Airflow emerges at Airbnb (2014). Maxime Beauchemin’s frustration became the industry’s solution. Key innovations: DAGs as code, dependency management, UI visualization, extensible operators. Airbnb open-sourced it in 2015.

Luigi at Spotify (2012-2015). Erik Bernhardsson built Luigi to orchestrate Spotify’s recommendation system. While simpler than Airflow, Luigi proved that Python-native orchestration could work at scale.

Did You Know? Airflow was named after the HVAC concept because Beauchemin saw pipelines like air ducts—data flowing through channels with valves (operators) controlling the flow. He almost named it “Dataflow” but Google already had that trademark.

Oozie at Yahoo (2010-2014). The Hadoop ecosystem’s answer to orchestration. XML-based, verbose, but reliable. Teams that survived Oozie appreciated Airflow’s Python elegance.

As ML moved to containers, orchestration tools adapted:

Kubeflow (2017): Google open-sourced their internal ML toolkit for Kubernetes. Finally, data scientists could request GPUs without understanding node affinity.

Argo Workflows (2017): YAML-native Kubernetes workflows. No Python required—define pipelines in the same language as your deployments.

MLflow (2018): Databricks introduced experiment tracking and model registry, solving a parallel problem to orchestration.

Did You Know? The first Kubeflow release required 40 YAML files to deploy a single model training job. By version 1.0 (2020), a single @component decorator could do the same thing. This 100x simplification drove adoption from 0 to 10,000+ companies in three years.

Prefect 2.0 (2022): Rewrote from scratch with “Python-native” philosophy. Flows are just decorated Python functions—no DAG boilerplate required.

Dagster (2019-2023): Nick Schrock (GraphQL creator) introduced “Software-Defined Assets”—think about what data you produce, not what tasks you run.

Temporal (2020): Uber engineers who built Cadence created something radically different—durable execution that survives infrastructure failures. Your workflow remembers its state even if every server dies.

n8n (2019): Visual workflow automation for the AI era. Suddenly, non-programmers could build RAG pipelines by dragging and connecting nodes visually. This democratization of ML operations opened the door for business analysts and domain experts to create sophisticated AI workflows without writing any code at all.


Every ML system in production needs orchestration. Training a model once is easy. Training it daily, with data validation, feature engineering, model evaluation, and deployment - that’s where orchestration becomes essential.

Think of ML orchestration like an airport control tower. Individual planes (ML tasks) know how to fly, but without coordination, you’d have chaos—planes taking off into each other, landing on occupied runways, fuel trucks colliding with baggage carts. The control tower (orchestrator) ensures everything happens in the right order, at the right time, with the right resources, and knows immediately when something goes wrong.

The Reality of ML in Production:

WITHOUT ORCHESTRATION WITH ORCHESTRATION
==================== ==================
Manual cron jobs Declarative DAGs
"It worked on my machine" Reproducible pipelines
No visibility Full observability
Failures go unnoticed Automatic retries + alerts
No dependency management Clear task dependencies
Ad-hoc scheduling Intelligent scheduling

Did You Know? Airbnb created Apache Airflow in 2014 when Maxime Beauchemin needed to orchestrate their complex data pipelines. They open-sourced it in 2015, and it became an Apache top-level project in 2019. Today, Airflow powers ML pipelines at Uber, Spotify, Twitter, and thousands of other companies.


┌─────────────────────────────────────────────────────────────────────────┐
│ ML ORCHESTRATION TOOLS │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ CODE-FIRST VISUAL/LOW-CODE │
│ ────────── ────────────── │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ AIRFLOW │ │ PREFECT │ │ n8n │ │ LANGFLOW │ │
│ │ (Apache) │ │ (Modern) │ │ (Visual) │ │ (LangChain) │ │
│ │ │ │ │ │ │ │ │ │
│ │ Python DAGs │ │ Python- │ │ Drag-drop │ │ AI chains │ │
│ │ Scheduling │ │ native │ │ 400+ nodes │ │ Visual │ │
│ │ Battle- │ │ Dynamic │ │ AI nodes │ │ builder │ │
│ │ tested │ │ Hybrid │ │ Self-host │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DAGSTER │ │ KUBEFLOW │ │ WINDMILL │ │ FLOWISE │ │
│ │ (Asset) │ │ (K8s ML) │ │ (Scripts) │ │ (LLM) │ │
│ │ │ │ │ │ │ │ │ │
│ │ Data-aware │ │ K8s-native │ │ Any lang │ │ Drag-drop │ │
│ │ Typed │ │ ML-focused │ │ Visual + │ │ LLM flows │ │
│ │ Software- │ │ Pipelines │ │ code │ │ │ │
│ │ defined │ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ │
│ │ TEMPORAL │ WHEN TO USE WHAT: │
│ │ (Durable) │ ─────────────────── │
│ │ │ Complex ML Pipelines → Airflow, Kubeflow │
│ │ Long- │ Data Engineering → Dagster, Airflow │
│ │ running │ Quick AI Prototypes → n8n, LangFlow │
│ │ Reliable │ Production Agents → n8n, Temporal │
│ │ workflows │ Long-running Jobs → Temporal │
│ └─────────────┘ K8s-native ML → Kubeflow │
│ │
└─────────────────────────────────────────────────────────────────────────┘

Airflow is the industry standard for workflow orchestration. It lets you define workflows as code (DAGs - Directed Acyclic Graphs), schedule them, and monitor their execution.

AIRFLOW ARCHITECTURE
====================
┌─────────────────────────────────────────────────────────────────┐
│ AIRFLOW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ SCHEDULER │───▶│ EXECUTOR │───▶│ WORKERS │ │
│ │ │ │ │ │ │ │
│ │ Triggers │ │ Celery/K8s/ │ │ Run tasks │ │
│ │ DAG runs │ │ Local │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ METADATA │ │ WEB UI │ │
│ │ DATABASE │◀────────────────────▶│ │ │
│ │ (Postgres) │ │ Monitoring │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

A DAG (Directed Acyclic Graph) defines the workflow structure. Think of a DAG like a recipe with dependencies: you can chop vegetables and boil water in parallel (no dependencies), but you can’t add the vegetables until the water is boiling (dependency). The “acyclic” part means you can’t create circular dependencies—the vegetables can’t require the soup to be done before being chopped.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# DAG definition
default_args = {
'owner': 'ml_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['ml-team@company.com'],
}
with DAG(
'ml_training_pipeline',
default_args=default_args,
description='Daily ML model training pipeline',
schedule_interval='@daily', # or '0 6 * * *' for 6 AM
catchup=False,
tags=['ml', 'training'],
) as dag:
# Task 1: Extract data
extract_data = PythonOperator(
task_id='extract_data',
python_callable=extract_from_database,
)
# Task 2: Validate data
validate_data = PythonOperator(
task_id='validate_data',
python_callable=run_data_validation,
)
# Task 3: Feature engineering
feature_engineering = PythonOperator(
task_id='feature_engineering',
python_callable=engineer_features,
)
# Task 4: Train model
train_model = PythonOperator(
task_id='train_model',
python_callable=train_ml_model,
)
# Task 5: Evaluate model
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model_performance,
)
# Task 6: Deploy if good
deploy_model = PythonOperator(
task_id='deploy_model',
python_callable=deploy_to_production,
)
# Define dependencies (the DAG structure)
extract_data >> validate_data >> feature_engineering
feature_engineering >> train_model >> evaluate_model >> deploy_model
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
@dag(
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def ml_pipeline_with_branching():
"""
ML pipeline with conditional deployment based on metrics.
"""
@task
def extract_data():
"""Extract training data from source."""
# Extract logic
return {'rows': 10000, 'features': 50}
@task
def validate_data(data_info: dict):
"""Validate data quality."""
if data_info['rows'] < 1000:
raise ValueError("Insufficient data!")
return {'valid': True, 'rows': data_info['rows']}
@task
def train_model(data_info: dict):
"""Train the ML model."""
# Training logic
return {
'accuracy': 0.92,
'f1_score': 0.89,
'model_path': '/models/v1.0'
}
@task
def evaluate_model(metrics: dict):
"""Evaluate model and decide deployment."""
return {
'deploy': metrics['accuracy'] > 0.90,
'metrics': metrics
}
def choose_deployment_path(**context):
"""Branch based on model quality."""
ti = context['ti']
evaluation = ti.xcom_pull(task_ids='evaluate_model')
if evaluation['deploy']:
return 'deploy_to_production'
else:
return 'notify_failure'
branch = BranchPythonOperator(
task_id='branch_on_quality',
python_callable=choose_deployment_path,
)
@task
def deploy_to_production(evaluation: dict):
"""Deploy model to production."""
print(f"Deploying model with accuracy: {evaluation['metrics']['accuracy']}")
return {'deployed': True}
@task
def notify_failure(evaluation: dict):
"""Send notification about failed quality check."""
print(f"Model did not meet threshold: {evaluation['metrics']['accuracy']}")
return {'notified': True}
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def cleanup():
"""Cleanup temporary files."""
print("Cleaning up...")
# Build the DAG
data = extract_data()
validated = validate_data(data)
model = train_model(validated)
evaluation = evaluate_model(model)
branch >> [deploy_to_production(evaluation), notify_failure(evaluation)]
[deploy_to_production(evaluation), notify_failure(evaluation)] >> cleanup()
# Instantiate the DAG
ml_pipeline = ml_pipeline_with_branching()

Did You Know? The TaskFlow API (using @task decorators) was introduced in Airflow 2.0 (December 2020). It was created by Kaxil Naik and the community to make DAGs more Pythonic and reduce boilerplate. Before TaskFlow, passing data between tasks required explicit XCom pulls, which was verbose and error-prone.


Kubeflow Pipelines is designed specifically for ML on Kubernetes. Think of Kubeflow like a factory assembly line where each station (container) has specialized equipment. The data processing station has different tools than the GPU training station, but they all connect smoothly. And because each station is independent, you can upgrade or replace one without disrupting the others.

Kubeflow is perfect for:

  • GPU-intensive training jobs
  • Reproducible experiments
  • Multi-team ML platforms
KUBEFLOW PIPELINES ARCHITECTURE
===============================
┌─────────────────────────────────────────────────────────────────┐
│ KUBEFLOW PIPELINES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ PIPELINE DEFINITION (Python SDK) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ @component → @pipeline → compile() → upload() │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ KUBERNETES CLUSTER │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Step 1 │─▶│ Step 2 │─▶│ Step 3 │─▶│ Step 4 │ │ │
│ │ │ (Pod) │ │ (Pod) │ │ (Pod) │ │ (Pod) │ │ │
│ │ │ │ │ +GPU │ │ │ │ │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ METADATA STORE │ ARTIFACT STORE │ UI DASHBOARD │ │
│ │ (MySQL) │ (MinIO/GCS) │ (Runs, Metrics) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
from kfp import dsl
from kfp import compiler
from kfp.dsl import Dataset, Model, Metrics, Input, Output
# Define a component
@dsl.component(
base_image='python:3.10',
packages_to_install=['pandas', 'scikit-learn']
)
def preprocess_data(
input_data: Input[Dataset],
output_data: Output[Dataset],
test_size: float = 0.2
):
"""Preprocess and split data."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(input_data.path)
# Preprocessing
df = df.dropna()
df = df[df['value'] > 0]
# Split
train, test = train_test_split(df, test_size=test_size)
# Save
train.to_csv(output_data.path, index=False)
@dsl.component(
base_image='python:3.10',
packages_to_install=['pandas', 'scikit-learn', 'xgboost']
)
def train_model(
training_data: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 6
):
"""Train XGBoost model."""
import pandas as pd
import xgboost as xgb
from sklearn.metrics import accuracy_score, f1_score
import json
df = pd.read_csv(training_data.path)
X = df.drop('target', axis=1)
y = df['target']
# Train
model = xgb.XGBClassifier(
n_estimators=n_estimators,
max_depth=max_depth
)
model.fit(X, y)
# Evaluate
predictions = model.predict(X)
accuracy = accuracy_score(y, predictions)
f1 = f1_score(y, predictions, average='weighted')
# Save model
model.save_model(model_output.path)
# Log metrics
metrics_output.log_metric('accuracy', accuracy)
metrics_output.log_metric('f1_score', f1)
@dsl.component(base_image='python:3.10')
def deploy_model(
model: Input[Model],
metrics: Input[Metrics],
endpoint: str
) -> str:
"""Deploy model if metrics pass threshold."""
# Check metrics
if metrics.metadata.get('accuracy', 0) < 0.85:
return "Model did not meet accuracy threshold"
# Deploy logic
print(f"Deploying to {endpoint}")
return f"Deployed to {endpoint}"
# Define the pipeline
@dsl.pipeline(
name='ML Training Pipeline',
description='End-to-end ML training with Kubeflow'
)
def ml_training_pipeline(
input_data_path: str,
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 6,
deploy_endpoint: str = 'production'
):
# Step 1: Preprocess
preprocess_task = preprocess_data(
input_data=dsl.importer(
artifact_uri=input_data_path,
artifact_class=Dataset
),
test_size=test_size
)
# Step 2: Train (request GPU)
train_task = train_model(
training_data=preprocess_task.outputs['output_data'],
n_estimators=n_estimators,
max_depth=max_depth
)
train_task.set_gpu_limit(1)
train_task.set_memory_limit('8G')
# Step 3: Deploy
deploy_task = deploy_model(
model=train_task.outputs['model_output'],
metrics=train_task.outputs['metrics_output'],
endpoint=deploy_endpoint
)
# Compile the pipeline
compiler.Compiler().compile(
ml_training_pipeline,
'ml_pipeline.yaml'
)

Did You Know? Kubeflow started as an internal Google project in 2017 to simplify ML on Kubernetes. The name comes from “Kubernetes” + “TensorFlow” (though it now supports all frameworks). Google’s David Aronchick led the project, which became one of the fastest-growing Kubernetes projects, reaching 10,000+ GitHub stars within two years.


n8n is a visual workflow automation tool that’s particularly powerful for AI applications. Think of n8n like building with LEGO blocks—each block (node) does one thing, and you snap them together visually to create complex AI workflows. Non-programmers can build RAG pipelines, chatbot backends, and document processors by dragging and connecting nodes, while programmers can add custom code blocks when needed.

n8n FOR AI WORKFLOWS
====================
┌─────────────────────────────────────────────────────────────────┐
│ n8n WORKFLOW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ TRIGGER │───▶│ FETCH │───▶│ LLM │───▶│ STORE │ │
│ │ │ │ DATA │ │ PROCESS │ │ RESULT │ │
│ │ Webhook │ │ HTTP │ │ OpenAI │ │ Postgres│ │
│ │ Cron │ │ DB │ │ Claude │ │ Vector │ │
│ │ Email │ │ S3 │ │ Ollama │ │ Notion │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ AI-SPECIFIC NODES: │
│ ───────────────── │
│ • OpenAI (gpt-5, embeddings) │
│ • Anthropic (Claude) │
│ • Ollama (local models) │
│ • Vector Stores (Pinecone, Qdrant, Supabase) │
│ • Document Loaders (PDF, web, etc.) │
│ • Text Splitters (chunk documents) │
│ • LangChain nodes │
│ │
│ EXAMPLE RAG WORKFLOW: │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Webhook│─▶│Embed │─▶│Vector│─▶│ LLM │─▶│Return│ │
│ │Query │ │Query │ │Search│ │Answer│ │Response│ │
│ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
{
"name": "AI Document Q&A",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"parameters": {
"path": "ask",
"method": "POST"
}
},
{
"name": "Embed Question",
"type": "@n8n/n8n-nodes-langchain.embeddingsOpenAi",
"parameters": {
"model": "text-embedding-3-small"
}
},
{
"name": "Vector Store Search",
"type": "@n8n/n8n-nodes-langchain.vectorStoreQdrant",
"parameters": {
"operation": "search",
"topK": 5
}
},
{
"name": "Generate Answer",
"type": "@n8n/n8n-nodes-langchain.openAi",
"parameters": {
"model": "gpt-5",
"prompt": "Based on the following context, answer the question.\n\nContext: {{$node['Vector Store Search'].json.results}}\n\nQuestion: {{$node['Webhook'].json.question}}"
}
},
{
"name": "Return Response",
"type": "n8n-nodes-base.respondToWebhook",
"parameters": {
"responseBody": "={{$node['Generate Answer'].json.text}}"
}
}
],
"connections": {
"Webhook": { "main": [[{ "node": "Embed Question" }]] },
"Embed Question": { "main": [[{ "node": "Vector Store Search" }]] },
"Vector Store Search": { "main": [[{ "node": "Generate Answer" }]] },
"Generate Answer": { "main": [[{ "node": "Return Response" }]] }
}
}

Did You Know? n8n was created by Jan Oberhauser in Berlin in 2019. The name “n8n” is a numeronym for “nodemation” (n-8 letters-n). It raised $12M in Series A funding in 2022 and has become the go-to tool for self-hosted workflow automation, with over 400 integrations and a thriving community of AI workflow builders.


Prefect is a modern alternative to Airflow, designed to be more Pythonic and developer-friendly.

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def extract_data(source: str) -> dict:
"""Extract data from source."""
print(f"Extracting from {source}")
return {"rows": 10000, "source": source}
@task
def transform_data(data: dict) -> dict:
"""Transform the data."""
return {"rows": data["rows"], "transformed": True}
@task
def train_model(data: dict) -> dict:
"""Train ML model."""
return {"accuracy": 0.95, "rows_used": data["rows"]}
@task
def deploy_if_good(metrics: dict) -> str:
"""Deploy model if metrics are good."""
if metrics["accuracy"] > 0.90:
return "Deployed to production!"
return "Model not good enough"
@flow(name="ML Training Pipeline")
def ml_pipeline(source: str = "s3://data/training"):
"""Complete ML training pipeline."""
# Extract
raw_data = extract_data(source)
# Transform
clean_data = transform_data(raw_data)
# Train
metrics = train_model(clean_data)
# Deploy
result = deploy_if_good(metrics)
return result
# Run the flow
if __name__ == "__main__":
ml_pipeline()

Prefect vs Airflow:

PREFECT AIRFLOW
─────── ───────
Python-native DAG files in specific folder
Dynamic workflows Static DAG structure
Local-first Server-first
Hybrid execution Centralized execution
Built-in caching Manual caching
Modern UI Classic UI

Dagster takes a different approach: instead of defining tasks, you define assets (the data you produce). Think of the difference like cooking: Airflow is like a recipe that says “chop vegetables, then sauté, then serve”—it focuses on the steps. Dagster is like describing the meal itself—“we need chopped vegetables, we need sautéed vegetables, we need a served dish”—and the system figures out the steps to make each thing.

from dagster import asset, AssetExecutionContext, Definitions
from dagster import MaterializeResult, MetadataValue
import pandas as pd
@asset(
description="Raw user data from database",
group_name="bronze"
)
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
"""Extract raw user data."""
context.log.info("Extracting raw users...")
return pd.DataFrame({
'user_id': range(1000),
'name': [f'User {i}' for i in range(1000)],
'signup_date': pd.date_range('2024-01-01', periods=1000)
})
@asset(
description="Cleaned and validated user data",
group_name="silver",
deps=[raw_users]
)
def clean_users(context: AssetExecutionContext, raw_users: pd.DataFrame) -> pd.DataFrame:
"""Clean and validate user data."""
# Remove duplicates
df = raw_users.drop_duplicates()
# Add derived columns
df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days
context.log.info(f"Cleaned {len(df)} users")
return df
@asset(
description="User features for ML",
group_name="gold",
deps=[clean_users]
)
def user_features(context: AssetExecutionContext, clean_users: pd.DataFrame) -> MaterializeResult:
"""Engineer features for ML."""
df = clean_users.copy()
# Feature engineering
df['is_new_user'] = df['days_since_signup'] < 30
df['user_segment'] = pd.cut(
df['days_since_signup'],
bins=[0, 30, 90, 365, float('inf')],
labels=['new', 'active', 'mature', 'veteran']
)
# Save
df.to_parquet('/data/features/user_features.parquet')
return MaterializeResult(
metadata={
'num_rows': MetadataValue.int(len(df)),
'num_features': MetadataValue.int(len(df.columns)),
'schema': MetadataValue.md(df.dtypes.to_markdown())
}
)
@asset(
description="Trained churn prediction model",
group_name="ml",
deps=[user_features]
)
def churn_model(context: AssetExecutionContext) -> MaterializeResult:
"""Train churn prediction model."""
# Load features
features = pd.read_parquet('/data/features/user_features.parquet')
# Train model (simplified)
accuracy = 0.92
return MaterializeResult(
metadata={
'accuracy': MetadataValue.float(accuracy),
'training_rows': MetadataValue.int(len(features))
}
)
# Define the Dagster job
defs = Definitions(
assets=[raw_users, clean_users, user_features, churn_model]
)

Dagster’s Asset Graph:

┌─────────────┐
│ raw_users │ (Bronze)
└──────┬──────┘
┌─────────────┐
│ clean_users │ (Silver)
└──────┬──────┘
┌─────────────┐
│user_features│ (Gold)
└──────┬──────┘
┌─────────────┐
│ churn_model │ (ML)
└─────────────┘

Did You Know? Dagster was created by Nick Schrock, who previously created GraphQL at Facebook. He founded Elementl in 2018 with the insight that data pipelines should be treated like software: with types, tests, and clear interfaces. The asset-based approach mirrors how data teams actually think about their work.


Temporal is different from other tools - it’s designed for long-running, reliable workflows that need to survive failures. Think of Temporal like a notary that records every step of a complex business process. If the power goes out mid-signature, when it comes back, the notary knows exactly which documents were signed and which weren’t—nothing is lost, and you resume from exactly where you stopped. This “durable execution” is why Temporal is used for critical ML workflows that can’t afford to restart from scratch.

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from dataclasses import dataclass
from datetime import timedelta
@dataclass
class TrainingConfig:
dataset_path: str
model_type: str
hyperparameters: dict
@dataclass
class TrainingResult:
model_path: str
metrics: dict
# Activities (the actual work)
@activity.defn
async def download_dataset(path: str) -> str:
"""Download dataset (can take hours for large datasets)."""
# Temporal handles retries, timeouts, heartbeats
print(f"Downloading from {path}...")
return "/local/data/training.parquet"
@activity.defn
async def train_model(local_path: str, config: TrainingConfig) -> dict:
"""Train model (can take hours/days)."""
# Long-running training
print(f"Training {config.model_type}...")
return {"accuracy": 0.95, "path": "/models/v1"}
@activity.defn
async def deploy_model(model_path: str) -> str:
"""Deploy to production."""
print(f"Deploying {model_path}...")
return "https://api.company.com/model/v1"
# Workflow (the orchestration)
@workflow.defn
class MLTrainingWorkflow:
"""
Durable ML training workflow.
If this crashes mid-training, Temporal will resume
from exactly where it left off!
"""
@workflow.run
async def run(self, config: TrainingConfig) -> TrainingResult:
# Step 1: Download (with timeout and retries)
local_path = await workflow.execute_activity(
download_dataset,
config.dataset_path,
start_to_close_timeout=timedelta(hours=2),
retry_policy=RetryPolicy(maximum_attempts=3)
)
# Step 2: Train (long-running, with heartbeat)
training_result = await workflow.execute_activity(
train_model,
local_path,
config,
start_to_close_timeout=timedelta(days=7),
heartbeat_timeout=timedelta(minutes=10),
)
# Step 3: Human approval (workflow waits!)
approved = await workflow.execute_activity(
wait_for_human_approval,
training_result,
start_to_close_timeout=timedelta(days=30),
)
if approved:
# Step 4: Deploy
endpoint = await workflow.execute_activity(
deploy_model,
training_result["path"],
start_to_close_timeout=timedelta(minutes=30),
)
return TrainingResult(
model_path=training_result["path"],
metrics=training_result
)
else:
raise Exception("Human rejected the model")
# Run the workflow
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
MLTrainingWorkflow.run,
TrainingConfig(
dataset_path="s3://data/training",
model_type="xgboost",
hyperparameters={"n_estimators": 100}
),
id="ml-training-2024-01-15",
task_queue="ml-training",
)
print(f"Training complete: {result}")

When to Use Temporal:

USE TEMPORAL WHEN:
──────────────────
• Workflows can take hours/days/weeks
• Human approval steps are needed
• Workflows must survive infrastructure failures
• You need exactly-once execution guarantees
• Complex compensation/rollback logic
DON'T USE TEMPORAL WHEN:
────────────────────────
• Simple scheduled jobs (use Airflow)
• Quick data transformations (use Dagster)
• Real-time streaming (use Kafka/Flink)

┌────────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ Feature │ AIRFLOW │ PREFECT │ DAGSTER │ KUBEFLOW │ TEMPORAL │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Paradigm │ Task-based │ Task-based │ Asset-based │ Container │ Durable │
│ │ DAGs │ Flows │ Assets │ Pipelines │ Workflows │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Learning Curve │ Medium │ Low │ Medium │ High │ High │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Best For │ Data/ML │ Modern │ Data │ K8s ML │ Long- │
│ │ Pipelines │ Pipelines │ Products │ Training │ running │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Scheduling │ Built-in │ Built-in │ Built-in │ Built-in │ ️ External │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Dynamic DAGs │ ️ Limited │ Native │ Native │ ️ Limited │ Native │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ GPU Support │ ️ Manual │ ️ Manual │ ️ Manual │ Native │ ️ Manual │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Self-hosted │ Yes │ Yes │ Yes │ Yes │ Yes │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Managed Cloud │ MWAA │ Prefect │ Dagster │ Vertex │ Temporal │
│ │ GCP │ Cloud │ Cloud │ AI │ Cloud │
├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ Community │ ⭐⭐⭐⭐⭐ │ ⭐⭐⭐⭐ │ ⭐⭐⭐⭐ │ ⭐⭐⭐⭐ │ ⭐⭐⭐ │
└────────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘

# Airflow
from airflow.decorators import task
@task(
retries=5,
retry_delay=timedelta(minutes=1),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1)
)
def flaky_api_call():
"""Call external API with retries."""
pass
# Prefect
from prefect import task
@task(
retries=5,
retry_delay_seconds=[60, 120, 240, 480, 960] # Custom backoff
)
def flaky_api_call():
pass
# Airflow
from airflow.operators.python import BranchPythonOperator
def choose_path(**context):
metrics = context['ti'].xcom_pull(task_ids='evaluate')
if metrics['accuracy'] > 0.9:
return 'deploy_production'
elif metrics['accuracy'] > 0.8:
return 'deploy_staging'
else:
return 'retrain'
branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_path
)
# Airflow - Dynamic Task Mapping
@task
def process_partition(partition_id: int):
return f"Processed {partition_id}"
@dag
def parallel_processing():
partitions = list(range(10))
# This creates 10 parallel tasks!
results = process_partition.expand(partition_id=partitions)
aggregate_results(results)
# Prefect - Concurrent Execution
from prefect import flow, task
from prefect.futures import wait
@task
def process_item(item):
return item * 2
@flow
def parallel_flow():
items = range(100)
futures = [process_item.submit(item) for item in items]
results = [f.result() for f in futures]
return results
# Airflow - Wait for file
from airflow.sensors.filesystem import FileSensor
wait_for_data = FileSensor(
task_id='wait_for_data',
filepath='/data/daily/{{ ds }}/data.parquet',
poke_interval=300, # Check every 5 minutes
timeout=3600 * 6, # Timeout after 6 hours
mode='reschedule' # Don't block worker
)
# Airflow - Wait for external DAG
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='data_ingestion',
external_task_id='load_complete',
timeout=3600
)

CHOOSING AN ORCHESTRATION TOOL
==============================
START HERE
┌─────────────────────────────────────┐
│ Do you need Kubernetes-native ML? │
└─────────────────────────────────────┘
│ │
YES NO
│ │
▼ ▼
┌─────────┐ ┌─────────────────────────────┐
│KUBEFLOW │ │ Do workflows run for │
│PIPELINES│ │ hours/days with human │
└─────────┘ │ approval steps? │
└─────────────────────────────┘
│ │
YES NO
│ │
▼ ▼
┌─────────┐ ┌─────────────────────────────┐
│TEMPORAL │ │ Is your team data-centric │
└─────────┘ │ (thinking in assets)? │
└─────────────────────────────┘
│ │
YES NO
│ │
▼ ▼
┌─────────┐ ┌─────────────────────────────┐
│ DAGSTER │ │ Need battle-tested, │
└─────────┘ │ enterprise-grade? │
└─────────────────────────────┘
│ │
YES NO
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ AIRFLOW │ │ PREFECT │
└─────────┘ └─────────┘
VISUAL/LOW-CODE NEEDS:
──────────────────────
• Quick AI prototypes → n8n
• LangChain flows → LangFlow
• LLM chat flows → Flowise
• Multi-language scripts → Windmill

Goal: Create a complete ML training pipeline in Airflow that demonstrates dependencies, branching, and best practices.

Requirements:

  1. Extract data from a CSV file (simulated database)
  2. Validate data quality (row counts, null checks, schema validation)
  3. Train a model (XGBoost classifier)
  4. Evaluate and branch based on metrics
  5. Deploy to “production” or send failure notification

Success Criteria:

  • DAG runs without errors
  • Branching works correctly based on accuracy threshold
  • XCom passes data between tasks properly
  • Retries are configured for flaky tasks

Starter Code:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml_team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'ml_training_exercise',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
# Your tasks here
pass

Extension Challenges:

  • Add data versioning with DVC integration
  • Implement model registry integration
  • Add Slack notifications on success/failure

Goal: Build a modern ML pipeline using Prefect’s Python-native approach, demonstrating caching, parallelism, and dynamic workflows.

Requirements:

  1. Fetch data from an API (use a public API like JSONPlaceholder)
  2. Process multiple datasets in parallel
  3. Aggregate results with proper error handling
  4. Implement caching for expensive operations

Success Criteria:

  • Flow executes successfully
  • Parallel tasks run concurrently (verify with logs)
  • Caching prevents redundant computation on re-runs
  • Dynamic task creation based on input data

Starter Code:

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_data(user_id: int) -> dict:
"""Fetch user data from API."""
response = httpx.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
return response.json()
@flow(name="Parallel Data Processing")
def process_users(user_ids: list[int]):
# Your implementation here
pass

Extension Challenges:

  • Add Prefect deployments for scheduled runs
  • Implement custom retry logic for API failures
  • Add observability with Prefect’s built-in metrics

Goal: Build a data platform using Dagster’s asset-based paradigm, demonstrating the medallion architecture (bronze/silver/gold) for ML feature engineering.

Requirements:

  1. Raw data asset (bronze layer) - simulated data ingestion
  2. Clean data asset (silver layer) - validation and cleaning
  3. Feature data asset (gold layer) - ML feature engineering
  4. ML model asset - trained model ready for serving

Success Criteria:

  • Asset graph visualizes correctly in Dagster UI
  • Metadata is logged for each asset
  • Incremental updates work correctly
  • Asset dependencies are properly defined

Starter Code:

from dagster import asset, AssetExecutionContext, Definitions
from dagster import MaterializeResult, MetadataValue
import pandas as pd
@asset(
description="Raw event data from source system",
group_name="bronze",
metadata={"owner": "data_engineering"}
)
def raw_events(context: AssetExecutionContext) -> pd.DataFrame:
"""Simulate raw event data extraction."""
context.log.info("Extracting raw events...")
# Your implementation here
pass
# Define silver, gold, and ml_model assets

Extension Challenges:

  • Add partitioned assets for daily data
  • Implement asset checks for data quality
  • Add IO managers for different storage backends

Goal: Run the same ML pipeline in Airflow, Prefect, and Dagster to understand the trade-offs between tools.

Requirements:

  1. Implement identical logic in all three frameworks
  2. Measure: lines of code, setup time, debugging experience
  3. Document: what was easy/hard in each tool
  4. Recommend: which tool for which use case

Deliverable: A written comparison document with code samples and quantitative metrics.

This exercise develops judgment about tool selection—a critical skill for MLOps roles.

Did You Know? At Stripe, teams are allowed to choose between Airflow and Temporal based on use case. This “toolbox” approach—rather than mandating one tool—increased engineer satisfaction by 40% and reduced time-to-production by 25%. The key is understanding when each tool shines.


  • “Why We Moved from Airflow to Prefect” (Netflix)
  • “Building ML Pipelines at Scale with Kubeflow” (Google)
  • “Asset-Based Data Pipelines with Dagster” (Elementl)

A major fintech company ran their fraud detection model retraining on Airflow. One day, the data extraction task failed silently—it returned an empty DataFrame instead of raising an exception. The pipeline continued merrily:

  • Feature engineering processed zero rows (no errors)
  • Model training fit on zero samples (technically valid)
  • Deployment pushed the “model” to production

For three days, the fraud model predicted 0% fraud probability for everyone. $2 million in fraudulent transactions sailed through before a human noticed the anomaly.

Lesson learned: Add data validation tasks with row count checks and raise exceptions on empty data.

@task
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
"""Fail fast on bad data."""
if len(df) == 0:
raise ValueError("Empty DataFrame - data extraction likely failed!")
if len(df) < 1000:
raise ValueError(f"Suspiciously small dataset: {len(df)} rows")
return df

A startup used Kubeflow to run hyperparameter tuning. The pipeline requested GPU nodes for each experiment—200 parallel runs, each on a $3/hour GPU instance. The engineer forgot to set a timeout.

Some experiments converged quickly (20 minutes). Others got stuck in local minima and ran forever. Three weeks later, the finance team noticed a $50,000 cloud bill for a single hyperparameter search.

Lesson learned: Always set timeouts and cost alerts. Use preemptible/spot instances for non-critical jobs.

An ML team at a logistics company built an elaborate Airflow deployment with 50+ DAGs. Over time, different teams added cross-DAG dependencies using sensors. Nobody documented them.

One day, a seemingly unrelated change broke everything: DAG A waited for DAG B, which waited for DAG C, which waited for DAG A. Classic deadlock. But because dependencies spanned multiple DAGs, the Airflow UI showed everything as “running normally.”

It took four days and a full pipeline audit to untangle. The company lost $400,000 in delayed shipment optimizations.

Lesson learned: Document all cross-DAG dependencies. Use airflow dags list-import-errors and external dependency tracking.

A global e-commerce company scheduled their daily retraining at “00:00” thinking it meant midnight local time. It meant midnight UTC. For their West Coast users, the model retrained at 4 PM—peak shopping hours—causing a 40% latency spike.

Worse, their data extraction job ran at “23:00 UTC” (3 PM Pacific). The model trained on incomplete daily data for months before anyone noticed accuracy degradation.

Lesson learned: Always use explicit timezone specifications. Store everything in UTC and convert at display time.


# WRONG - Hardcoded values
@task
def train_model():
model = XGBClassifier(
n_estimators=100, # Can't change without code deploy
max_depth=6, # A/B testing requires code changes
learning_rate=0.1
)
data = pd.read_csv('/data/training.csv') # Fixed path
return model.fit(data)
# RIGHT - Parameterized configuration
@task
def train_model(config: dict, data_path: str):
"""Train with configurable hyperparameters."""
model = XGBClassifier(**config['hyperparameters'])
data = pd.read_csv(data_path)
return model.fit(data)
# Usage: Pass config via Airflow Variables or Prefect Blocks

Why it matters: Hardcoded values require code deployments to change. Parameterized configs enable A/B testing, emergency fixes, and environment-specific settings without touching code.

# WRONG - Non-idempotent task
@task
def append_predictions():
"""Appends predictions to table - running twice doubles data!"""
predictions = generate_predictions()
db.execute("INSERT INTO predictions VALUES (?)", predictions)
# RIGHT - Idempotent task
@task
def upsert_predictions(run_date: str):
"""Upserts predictions - safe to re-run."""
predictions = generate_predictions()
# Delete existing predictions for this run, then insert
db.execute("DELETE FROM predictions WHERE run_date = ?", run_date)
db.execute("INSERT INTO predictions VALUES (?)", predictions)

Why it matters: Orchestrators retry failed tasks. Non-idempotent tasks corrupt data on retry. Design every task to be safely re-runnable.

Mistake 3: Passing Large Data Through XCom/Artifacts

Section titled “Mistake 3: Passing Large Data Through XCom/Artifacts”
# WRONG - Passing DataFrames through XCom
@task
def extract():
return pd.read_csv('huge_file.csv') # 10GB DataFrame in XCom!
@task
def transform(data):
return data.transform(...) # Serialization nightmare
# RIGHT - Pass references, not data
@task
def extract() -> str:
"""Extract data and return path reference."""
df = pd.read_csv('huge_file.csv')
output_path = '/tmp/extracted_data.parquet'
df.to_parquet(output_path)
return output_path # Just pass the path
@task
def transform(data_path: str) -> str:
"""Load from path, transform, save."""
df = pd.read_parquet(data_path)
df_transformed = df.transform(...)
output_path = '/tmp/transformed_data.parquet'
df_transformed.to_parquet(output_path)
return output_path

Why it matters: XCom (Airflow) and artifacts are for metadata, not data. Passing gigabytes through them causes database bloat, serialization errors, and memory exhaustion.


Question 1: “How would you design an ML pipeline that needs to train on data arriving at different times?”

Section titled “Question 1: “How would you design an ML pipeline that needs to train on data arriving at different times?””

Strong Answer:

“I’d use a sensor-based approach with watermarking. Here’s my design:

  1. Data arrival sensors: Use FileSensor or custom sensors that poll for data readiness. Each source has its own sensor with configurable timeout.

  2. Watermark tracking: Track the latest complete data timestamp for each source. Don’t start training until all sources pass the watermark.

  3. Graceful degradation: If one source is late but others are ready, decide based on business needs—wait (data quality priority) or proceed with stale data (freshness priority).

  4. Alerting: If any sensor exceeds SLA, alert the on-call engineer before the pipeline auto-fails.

from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
# Define sensors for each data source
wait_for_transactions = FileSensor(
task_id='wait_for_transactions',
filepath='/data/transactions/{{ ds }}/*.parquet',
timeout=3600 * 4, # 4 hour SLA
mode='reschedule'
)
wait_for_user_data = FileSensor(
task_id='wait_for_user_data',
filepath='/data/users/{{ ds }}/*.parquet',
timeout=3600 * 2, # 2 hour SLA
mode='reschedule'
)
# Both must complete before training starts
[wait_for_transactions, wait_for_user_data] >> start_training

This approach handles real-world data arrival patterns while maintaining pipeline reliability.”

Question 2: “Compare Airflow and Prefect. When would you choose each?”

Section titled “Question 2: “Compare Airflow and Prefect. When would you choose each?””

Strong Answer:

“Both are excellent—the choice depends on team and use case:

Choose Airflow when:

  • You need battle-tested reliability (10+ years in production)
  • Your team already knows it (most data engineers do)
  • You need extensive operator ecosystem (500+ community operators)
  • Enterprise requirements: RBAC, audit logging, compliance

Choose Prefect when:

  • You want truly Python-native development (no DAG boilerplate at all)
  • Dynamic workflows (task count determined at runtime)
  • Hybrid execution (run locally during development, cloud in production)
  • Modern developer experience (better debugging, native async)

Example use cases:

  • Daily batch ETL with stable structure → Airflow
  • ML experiments with varying hyperparameter combinations → Prefect
  • Complex data platform with multiple teams → Airflow
  • Startup moving fast with small team → Prefect

The best choice is often ‘what your team knows.’ A well-run Airflow pipeline beats a poorly-run anything else.”

Question 3: “How do you handle ML pipeline failures at 3 AM?”

Section titled “Question 3: “How do you handle ML pipeline failures at 3 AM?””

Strong Answer:

“Production ML requires defense in depth:

Prevention (before failure):

  • Data validation tasks that fail fast on bad input
  • Resource limits (memory, timeout) to prevent runaway jobs
  • Idempotent tasks that handle retries safely

Detection (catch failures quickly):

  • Automatic retries with exponential backoff (3 attempts, 5-10-20 minute delays)
  • SLA alerts when tasks exceed expected duration
  • Data quality monitoring (row counts, null rates, value distributions)

Response (minimize impact):

  • Automated rollback to last-known-good model
  • Detailed error context in alerts (not just ‘Task failed’)
  • Runbooks for common failure modes

Example monitoring setup:

default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'email_on_failure': True,
'email': ['ml-oncall@company.com'],
'sla': timedelta(hours=2) # Alert if task runs too long
}
@task(on_failure_callback=notify_pagerduty)
def train_model():
# Training logic
pass

The goal isn’t zero failures—it’s fast detection, automatic recovery, and minimal manual intervention at 3 AM.”


ApproachMonthly CostHidden CostsBest For
Managed Airflow (MWAA)$500-2,000Low ops overheadAWS shops, medium scale
Self-hosted Airflow$200-500 (compute)High ops overheadLarge teams, customization needs
Prefect Cloud$0-1,200MinimalFast-moving teams
Dagster Cloud$0-2,000MinimalData-centric teams
Kubeflow (self-hosted)$300-1,000K8s expertise requiredGPU-heavy ML teams
Temporal Cloud$200-5,000Workflow complexityLong-running workflows
Total Cost = Infrastructure + Engineering Time + Failure Cost
Infrastructure:
- Managed service fees OR self-hosted compute
- Database (metadata store)
- Storage (logs, artifacts)
Engineering Time:
- Initial setup: 40-200 hours
- Ongoing maintenance: 4-20 hours/month
- Debugging failures: 10-40 hours/month (without orchestration: 3x more)
Failure Cost:
- Revenue loss per hour of downtime
- Data quality issues reaching customers
- Engineer time for manual recovery

Scenario: E-commerce company running daily price optimization

Without OrchestrationWith Airflow
4 hours/week debugging cron1 hour/week pipeline maintenance
$50K/month in pricing errors$5K/month (90% reduction)
3 engineers on-call rotation1 engineer with auto-alerting
Manual retraining triggersAutomatic daily retraining

Annual savings: ~$500K in error reduction + ~$200K in engineering time = $700K

Annual cost: ~$30K (managed service + compute)

ROI: 23x in first year


Analogies for Understanding ML Orchestration

Section titled “Analogies for Understanding ML Orchestration”

An ML pipeline orchestrator is like a symphony conductor. Individual musicians (tasks) are experts at their instruments—the violinist doesn’t need the conductor to play notes. But without coordination:

  • The brass might start before the strings are ready
  • The percussion could overpower the woodwinds
  • Nobody knows when the finale arrives

The conductor (orchestrator) doesn’t play any instrument—they coordinate timing, manage dependencies, and ensure the whole performance comes together. When a musician misses a cue, the conductor signals adjustments. When the venue changes, the conductor adapts the performance.

Just as a conductor uses a score (DAG definition) to coordinate musicians (tasks), an orchestrator uses workflow definitions to ensure data flows correctly through each processing stage.

Think of ML orchestration like managing an airport:

  • Planes = ML tasks (training, inference, data processing)
  • Runways = Compute resources (GPUs, CPU clusters)
  • Flight schedules = Pipeline schedules
  • Control tower = Orchestrator (Airflow, Prefect)
  • Radar = Monitoring and observability
  • Fuel trucks = Data dependencies

Without air traffic control, you’d have planes circling forever waiting for runways, crashes on the tarmac, and no one knowing where any flight is. With proper orchestration, every plane knows its slot, runways are efficiently utilized, and problems are spotted before they become crashes.

A production ML pipeline is like a busy restaurant kitchen:

Without orchestration (chaos):

  • The chef starts the steak before the salad prep is done
  • The dessert is ready before the appetizer
  • Nobody knows if the special is available
  • Customers wait two hours or get cold food

With orchestration (Michelin star kitchen):

  • Tickets (DAG runs) define what needs to be cooked and in what order
  • Each station (task) knows its dependencies
  • The expeditor (scheduler) times everything to arrive hot
  • If the grill breaks (task failure), the kitchen manager (alerting) is notified immediately
  • Historical data shows which dishes take longest (observability)

Henry Ford revolutionized manufacturing with assembly lines—and ML orchestration applies the same principle to data:

Manual ML development is like hand-building cars:

  • One expert does everything
  • Inconsistent quality
  • Can’t scale
  • Knowledge lives in one person’s head

Orchestrated ML is like Ford’s assembly line:

  • Specialized stations (tasks) for each step
  • Standardized interfaces (data contracts)
  • Continuous flow (scheduled runs)
  • Quality checks at each stage (validation tasks)
  • Easy to scale (add more workers)
  • Easy to improve (optimize one station without rebuilding everything)

The assembly line analogy also explains why orchestration enables team scaling. Just as Ford could hire specialists for each station, teams can have data engineers build ingestion, ML engineers build training, and platform engineers build deployment—all coordinated by the orchestrator.


The next generation of orchestrators will use AI to optimize themselves:

Self-tuning pipelines that automatically adjust:

  • Resource allocation based on historical usage patterns
  • Retry strategies based on failure mode classification
  • Scheduling based on cost optimization (spot instance availability)

Did You Know? Google’s internal ML platform (Vertex AI) already uses reinforcement learning to optimize pipeline scheduling. Their AI scheduler reduced training costs by 23% by learning when GPU prices drop and queueing non-urgent jobs accordingly.

Intelligent alerting that distinguishes:

  • “This failure always resolves with a retry” (auto-retry, no alert)
  • “This failure pattern preceded a major outage last month” (immediate escalation)
  • “This task is slow but within normal variance” (no alert)

The line between ML orchestration and platform engineering is blurring:

GitOps for ML: Pipelines defined entirely in Git, deployed through pull requests, with preview environments for testing DAG changes before production.

Service mesh integration: Orchestrators connecting directly with Istio/Linkerd for intelligent traffic routing, A/B testing, and canary deployments of models.

Developer platforms: Backstage and similar platforms integrating orchestrator UIs, making ML pipelines first-class citizens alongside microservices.

Batch orchestration is mature; real-time is the frontier:

Stream-batch unification: Tools like Apache Flink and Bytewax are bridging batch and streaming. Future orchestrators will treat “run every minute” and “run on every event” as the same abstraction.

Feature platforms: Feature stores (Feast, Tecton) are becoming orchestration-aware. Your pipeline doesn’t just compute features—it registers them for real-time serving.

Online learning pipelines: Models that update continuously require different orchestration patterns—smaller, faster, more frequent runs with continuous validation.

Trend 4: Multi-Cloud and Edge Orchestration

Section titled “Trend 4: Multi-Cloud and Edge Orchestration”

The future isn’t “cloud vs. on-prem”—it’s everywhere simultaneously:

Hybrid execution: Train on-prem (data residency requirements), deploy to cloud (global scale), serve at edge (low latency).

Orchestrator federation: One DAG that spans AWS, GCP, and an on-prem cluster—with intelligent task placement based on cost, latency, and data locality.

Edge ML pipelines: Kubeflow and similar tools are extending to Kubernetes clusters running on edge devices. Your pipeline might preprocess on the factory floor, train in the cloud, and deploy back to the floor.

If you’re learning ML orchestration today, focus on:

  1. Fundamentals first: Airflow isn’t going away. Master it before chasing shiny objects.

  2. Kubernetes literacy: Even if you use managed services, understanding K8s concepts (pods, services, volumes) makes you more effective.

  3. Platform thinking: The best MLOps engineers think beyond individual pipelines to the platform that runs them all.

  4. Cost awareness: Cloud bills matter. Engineers who can optimize costs while maintaining reliability are invaluable.

  5. Observability skills: Knowing how to instrument, monitor, and debug distributed systems is more valuable than knowing any specific tool.


When pipelines fail at 3 AM, you need systematic debugging approaches. Here’s the methodology used by experienced MLOps engineers.

First, determine what failed and how badly:

FAILURE SCOPE ASSESSMENT
========================
SINGLE TASK FAILURE:
- Check task logs first
- Usually: bad data, resource exhaustion, external dependency
- Fix: Patch and re-run from failed task
CASCADING FAILURE:
- Multiple downstream tasks failed
- Usually: upstream data issue or resource contention
- Fix: Fix root cause, clear downstream, re-run
SCHEDULER FAILURE:
- No tasks running at all
- Usually: orchestrator health issue
- Fix: Check scheduler logs, restart if needed
SILENT FAILURE:
- Pipeline "succeeded" but results are wrong
- Usually: data validation missing
- Fix: Add validation tasks, investigate data

Every orchestrator provides task logs. Know how to access them quickly:

Airflow: airflow tasks logs <dag_id> <task_id> <execution_date>

Prefect: Flow runs page → Click run → Task logs panel

Dagster: Asset details → Materialization history → Log viewer

The fastest debugging happens locally. Extract the failing task’s logic and run it standalone:

# Instead of running the whole pipeline
# Extract the function and test directly
def debug_failing_task():
"""Run the failing task logic locally."""
# Copy the task's inputs from the failed run
input_data = pd.read_parquet('/tmp/debug/input.parquet')
# Run the transformation
result = transform_data(input_data)
# Inspect intermediate state
print(f"Input shape: {input_data.shape}")
print(f"Output shape: {result.shape}")
print(f"Null counts: {result.isnull().sum()}")
if __name__ == "__main__":
debug_failing_task()

Most ML pipeline failures are data issues. Validate assumptions:

def data_health_check(df: pd.DataFrame, context: str) -> None:
"""Standard data health check for debugging."""
print(f"\n=== Data Health Check: {context} ===")
print(f"Shape: {df.shape}")
print(f"Memory: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
print(f"\nNull counts:")
print(df.isnull().sum())
print(f"\nData types:")
print(df.dtypes)
print(f"\nNumeric statistics:")
print(df.describe())
SymptomLikely CauseFix
OOM (Out of Memory)Data larger than expectedAdd chunking, increase memory, or sample
TimeoutSlow external dependencyAdd timeout handling, use async, cache
Schema mismatchUpstream data changedAdd schema validation, alert on drift
Empty resultsFilter too aggressiveAdd row count assertions, log filter stats
Duplicate dataIdempotency missingImplement upsert pattern, add dedup
Stale dataSensor misconfiguredCheck sensor timeout and poke interval

Did You Know? Netflix’s ML platform team found that 67% of production pipeline failures were caused by data issues (schema changes, null values, volume spikes), not code bugs. They built an automated data validation framework that reduced debugging time by 80% and implemented “data contracts” between teams to catch breaking changes before production.

“The Log Line That Saved Us Hours”

Always log context at task start:

@task
def process_batch(batch_id: str):
logger.info(f"Starting batch {batch_id} at {datetime.now()}")
logger.info(f"Memory available: {psutil.virtual_memory().available / 1e9:.2f} GB")
logger.info(f"CPU count: {os.cpu_count()}")
# ... your logic

This simple pattern has saved countless hours of debugging. When something fails, you immediately know the resource state when it started. Senior engineers at companies like Uber and Spotify swear by this approach—they estimate it cuts initial debugging time by at least fifty percent because you never have to guess what environment the task ran in or what resources were available at the time of failure.

“The Checkpoint Pattern”

For long-running tasks, checkpoint progress so retries don’t start from zero:

@task
def train_model_with_checkpoints(config: dict):
"""Training with checkpoint support."""
checkpoint_path = f"/checkpoints/{config['run_id']}.pt"
# Resume if checkpoint exists
if os.path.exists(checkpoint_path):
model.load_state_dict(torch.load(checkpoint_path))
logger.info(f"Resumed from checkpoint")
for epoch in range(config['epochs']):
# Training logic...
# Save checkpoint every N epochs
if epoch % 10 == 0:
torch.save(model.state_dict(), checkpoint_path)

“The Canary Test Pattern”

Before running on all data, test on a sample:

@task
def safe_transform(data_path: str):
"""Transform with canary testing."""
df = pd.read_parquet(data_path)
# Canary: run on 1% sample first
sample = df.sample(frac=0.01)
try:
result_sample = transform(sample)
assert len(result_sample) > 0, "Transform produced empty result"
assert result_sample.isnull().sum().sum() == 0, "Transform produced nulls"
except Exception as e:
raise ValueError(f"Canary test failed: {e}")
# Full run only if canary passes
return transform(df)

PRODUCTION ML PIPELINE ARCHITECTURE
===================================
┌─────────────────────────────────────────────────────────────────────────────┐
│ ORCHESTRATION LAYER │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TRIGGER │───▶│ VALIDATE │───▶│ TRAIN │───▶│ DEPLOY │ │
│ │ (Sensor) │ │ (Data) │ │ (Model) │ │ (Conditional)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
└─────────┼──────────────────┼──────────────────┼──────────────────┼──────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ DATA LAKE │ │ FEATURE │ │ EXPERIMENT │ │ MODEL │
│ (S3/GCS) │ │ STORE │ │ TRACKER │ │ REGISTRY │
│ │ │ (Feast) │ │ (MLflow) │ │ (MLflow) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘

Before launching your first production pipeline:

Infrastructure

  • Orchestrator deployed (Airflow/Prefect/Dagster)
  • Metadata database provisioned (PostgreSQL for Airflow)
  • Artifact storage configured (S3/GCS bucket)
  • Worker nodes scaled appropriately
  • Network policies allowing task communication

Pipeline Code

  • DAG defined with clear dependencies
  • All tasks are idempotent
  • Data validation at pipeline start
  • Model evaluation before deployment
  • Conditional deployment (only if metrics pass)
  • Cleanup tasks for temporary files

Observability

  • SLA alerts configured
  • Failure notifications (email/Slack/PagerDuty)
  • Logging to centralized system
  • Metrics exported (task duration, success rate)
  • Dashboard for pipeline health

Operations

  • Runbook for common failures
  • On-call rotation defined
  • Escalation path documented
  • Recovery procedures tested
  • Cost monitoring in place

  1. Orchestration is not optional for production ML. The question isn’t whether to use it, but which tool fits your needs.

  2. Start with Airflow if you don’t know what you need. It’s the industry standard with the largest community and job market.

  3. Design for failure from day one. Idempotent tasks, data validation, retries with backoff, and meaningful alerts.

  4. Don’t pass data through orchestration. Pass references (paths, URLs, IDs). Let storage systems handle the bytes.

  5. DAGs should be version-controlled like any other code. Include tests for critical paths.

  6. Timezone bugs are real and painful. Standardize on UTC everywhere, convert at display time only.

  7. Cross-DAG dependencies are dangerous. Document them explicitly and audit regularly for circular dependencies.

  8. Visual tools (n8n) enable non-engineers to build AI workflows. Consider hybrid approaches where data engineers build core pipelines and business users build integrations.

  9. Kubeflow shines for GPU workloads but adds Kubernetes complexity. Don’t adopt it just because it’s “cloud-native.”

  10. Temporal is overkill for most use cases but essential for multi-day workflows with human approval steps.


ORCHESTRATION TOOLS SUMMARY
===========================
AIRFLOW: Industry standard, battle-tested, large community
PREFECT: Modern, Python-native, better developer experience
DAGSTER: Asset-based, data-centric, great for analytics
KUBEFLOW: Kubernetes-native, ML-focused, GPU support
TEMPORAL: Durable execution, long-running, reliable
n8n: Visual, low-code, AI-native nodes
WHEN TO USE:
────────────
Data Engineering → Airflow, Dagster
ML Training → Kubeflow, Airflow
Quick Prototypes → n8n, Prefect
Long-running Jobs → Temporal
Modern Teams → Prefect, Dagster

Module 51 will cover Model Deployment & Serving Patterns, including FastAPI, gRPC, canary deployments, and A/B testing.


Module 50 Complete! “The best pipeline is one that runs without you thinking about it.”