ML Pipelines
AI/ML Engineering Track | Complexity:
[COMPLEX]| Time: 6-8
Prerequisites: Module 49 (Data Versioning & Feature Stores)
San Francisco. October 3, 2014. 3:17 AM. Maxime Beauchemin’s phone buzzed with yet another alert. Airbnb’s data pipeline had failed again. This time it was a cascade: the pricing model hadn’t retrained because the feature pipeline died, which happened because the data validation job timed out, which happened because… nobody could tell anymore.
Beauchemin dragged himself to his laptop and started digging through logs. Four hours later, he’d traced the failure to a single upstream task that had silently failed two days ago. The cron job hadn’t reported the error. No one knew until everything downstream collapsed.
“This is insane,” he muttered. “We’re running a billion-dollar company on bash scripts and hope.”
Over the next few months, Beauchemin built something different: a system where tasks declared their dependencies, where failures triggered immediate alerts, where you could see the entire pipeline at a glance. He called it “Airflow.”
“I wrote Airflow out of frustration. Every data team was solving the same problem badly—orchestrating complex workflows with cron and prayer. I thought: what if we could make pipelines first-class citizens, with proper scheduling, monitoring, and visualization?” — Maxime Beauchemin, Creator of Apache Airflow
Airbnb open-sourced Airflow in 2015. Today, it orchestrates ML pipelines at Uber, Spotify, and thousands of other companies. That 3 AM wake-up call spawned an entire industry.
What You’ll Be Able to Do
Section titled “What You’ll Be Able to Do”By the end of this module, you will:
- Master Apache Airflow for ML pipeline orchestration
- Build Kubeflow Pipelines for Kubernetes-native ML
- Create visual AI workflows with n8n
- Compare modern orchestration tools (Prefect, Dagster, Temporal)
- Design production-ready ML pipelines
- Implement retry, monitoring, and alerting patterns
The History of ML Orchestration: From Cron to Cloud-Native
Section titled “The History of ML Orchestration: From Cron to Cloud-Native”The Cron Era (Pre-2014)
Section titled “The Cron Era (Pre-2014)”Before dedicated orchestration tools, teams ran ML pipelines with cron jobs and bash scripts. This worked—barely—for simple pipelines. But as companies grew, the limitations became painful.
Did You Know? In 2013, LinkedIn’s data team maintained over 10,000 cron jobs across dozens of servers. Engineers spent 30% of their time just debugging scheduling conflicts and mysterious failures. When two jobs tried to write to the same table simultaneously, the only clue was corrupted data discovered days later.
The problems were universal:
- No dependency management: Cron doesn’t know job A must finish before job B starts
- No visibility: You couldn’t see what was running, what failed, or why
- No retries: Failures meant manual intervention or data loss
- No auditing: Who ran what? When? With what parameters? Nobody knew
The Birth of Modern Orchestration (2014-2016)
Section titled “The Birth of Modern Orchestration (2014-2016)”Airflow emerges at Airbnb (2014). Maxime Beauchemin’s frustration became the industry’s solution. Key innovations: DAGs as code, dependency management, UI visualization, extensible operators. Airbnb open-sourced it in 2015.
Luigi at Spotify (2012-2015). Erik Bernhardsson built Luigi to orchestrate Spotify’s recommendation system. While simpler than Airflow, Luigi proved that Python-native orchestration could work at scale.
Did You Know? Airflow was named after the HVAC concept because Beauchemin saw pipelines like air ducts—data flowing through channels with valves (operators) controlling the flow. He almost named it “Dataflow” but Google already had that trademark.
Oozie at Yahoo (2010-2014). The Hadoop ecosystem’s answer to orchestration. XML-based, verbose, but reliable. Teams that survived Oozie appreciated Airflow’s Python elegance.
The Kubernetes Revolution (2017-2020)
Section titled “The Kubernetes Revolution (2017-2020)”As ML moved to containers, orchestration tools adapted:
Kubeflow (2017): Google open-sourced their internal ML toolkit for Kubernetes. Finally, data scientists could request GPUs without understanding node affinity.
Argo Workflows (2017): YAML-native Kubernetes workflows. No Python required—define pipelines in the same language as your deployments.
MLflow (2018): Databricks introduced experiment tracking and model registry, solving a parallel problem to orchestration.
Did You Know? The first Kubeflow release required 40 YAML files to deploy a single model training job. By version 1.0 (2020), a single
@componentdecorator could do the same thing. This 100x simplification drove adoption from 0 to 10,000+ companies in three years.
The Modern Era (2020-Present)
Section titled “The Modern Era (2020-Present)”Prefect 2.0 (2022): Rewrote from scratch with “Python-native” philosophy. Flows are just decorated Python functions—no DAG boilerplate required.
Dagster (2019-2023): Nick Schrock (GraphQL creator) introduced “Software-Defined Assets”—think about what data you produce, not what tasks you run.
Temporal (2020): Uber engineers who built Cadence created something radically different—durable execution that survives infrastructure failures. Your workflow remembers its state even if every server dies.
n8n (2019): Visual workflow automation for the AI era. Suddenly, non-programmers could build RAG pipelines by dragging and connecting nodes visually. This democratization of ML operations opened the door for business analysts and domain experts to create sophisticated AI workflows without writing any code at all.
Why Pipeline Orchestration Matters
Section titled “Why Pipeline Orchestration Matters”Every ML system in production needs orchestration. Training a model once is easy. Training it daily, with data validation, feature engineering, model evaluation, and deployment - that’s where orchestration becomes essential.
Think of ML orchestration like an airport control tower. Individual planes (ML tasks) know how to fly, but without coordination, you’d have chaos—planes taking off into each other, landing on occupied runways, fuel trucks colliding with baggage carts. The control tower (orchestrator) ensures everything happens in the right order, at the right time, with the right resources, and knows immediately when something goes wrong.
The Reality of ML in Production:
WITHOUT ORCHESTRATION WITH ORCHESTRATION==================== ==================
Manual cron jobs Declarative DAGs"It worked on my machine" Reproducible pipelinesNo visibility Full observabilityFailures go unnoticed Automatic retries + alertsNo dependency management Clear task dependenciesAd-hoc scheduling Intelligent schedulingDid You Know? Airbnb created Apache Airflow in 2014 when Maxime Beauchemin needed to orchestrate their complex data pipelines. They open-sourced it in 2015, and it became an Apache top-level project in 2019. Today, Airflow powers ML pipelines at Uber, Spotify, Twitter, and thousands of other companies.
The Orchestration Landscape
Section titled “The Orchestration Landscape”┌─────────────────────────────────────────────────────────────────────────┐│ ML ORCHESTRATION TOOLS │├─────────────────────────────────────────────────────────────────────────┤│ ││ CODE-FIRST VISUAL/LOW-CODE ││ ────────── ────────────── ││ ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ AIRFLOW │ │ PREFECT │ │ n8n │ │ LANGFLOW │ ││ │ (Apache) │ │ (Modern) │ │ (Visual) │ │ (LangChain) │ ││ │ │ │ │ │ │ │ │ ││ │ Python DAGs │ │ Python- │ │ Drag-drop │ │ AI chains │ ││ │ Scheduling │ │ native │ │ 400+ nodes │ │ Visual │ ││ │ Battle- │ │ Dynamic │ │ AI nodes │ │ builder │ ││ │ tested │ │ Hybrid │ │ Self-host │ │ │ ││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ ││ ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ DAGSTER │ │ KUBEFLOW │ │ WINDMILL │ │ FLOWISE │ ││ │ (Asset) │ │ (K8s ML) │ │ (Scripts) │ │ (LLM) │ ││ │ │ │ │ │ │ │ │ ││ │ Data-aware │ │ K8s-native │ │ Any lang │ │ Drag-drop │ ││ │ Typed │ │ ML-focused │ │ Visual + │ │ LLM flows │ ││ │ Software- │ │ Pipelines │ │ code │ │ │ ││ │ defined │ │ │ │ │ │ │ ││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ ││ ││ ┌─────────────┐ ││ │ TEMPORAL │ WHEN TO USE WHAT: ││ │ (Durable) │ ─────────────────── ││ │ │ Complex ML Pipelines → Airflow, Kubeflow ││ │ Long- │ Data Engineering → Dagster, Airflow ││ │ running │ Quick AI Prototypes → n8n, LangFlow ││ │ Reliable │ Production Agents → n8n, Temporal ││ │ workflows │ Long-running Jobs → Temporal ││ └─────────────┘ K8s-native ML → Kubeflow ││ │└─────────────────────────────────────────────────────────────────────────┘Apache Airflow Deep Dive
Section titled “Apache Airflow Deep Dive”What is Airflow?
Section titled “What is Airflow?”Airflow is the industry standard for workflow orchestration. It lets you define workflows as code (DAGs - Directed Acyclic Graphs), schedule them, and monitor their execution.
AIRFLOW ARCHITECTURE====================
┌─────────────────────────────────────────────────────────────────┐│ AIRFLOW │├─────────────────────────────────────────────────────────────────┤│ ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ SCHEDULER │───▶│ EXECUTOR │───▶│ WORKERS │ ││ │ │ │ │ │ │ ││ │ Triggers │ │ Celery/K8s/ │ │ Run tasks │ ││ │ DAG runs │ │ Local │ │ │ ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ │ │ ││ ▼ ▼ ││ ┌─────────────┐ ┌─────────────┐ ││ │ METADATA │ │ WEB UI │ ││ │ DATABASE │◀────────────────────▶│ │ ││ │ (Postgres) │ │ Monitoring │ ││ └─────────────┘ └─────────────┘ ││ │└─────────────────────────────────────────────────────────────────┘DAG Basics
Section titled “DAG Basics”A DAG (Directed Acyclic Graph) defines the workflow structure. Think of a DAG like a recipe with dependencies: you can chop vegetables and boil water in parallel (no dependencies), but you can’t add the vegetables until the water is boiling (dependency). The “acyclic” part means you can’t create circular dependencies—the vegetables can’t require the soup to be done before being chopped.
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
# DAG definitiondefault_args = { 'owner': 'ml_team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'retries': 3, 'retry_delay': timedelta(minutes=5), 'email_on_failure': True, 'email': ['ml-team@company.com'],}
with DAG( 'ml_training_pipeline', default_args=default_args, description='Daily ML model training pipeline', schedule_interval='@daily', # or '0 6 * * *' for 6 AM catchup=False, tags=['ml', 'training'],) as dag:
# Task 1: Extract data extract_data = PythonOperator( task_id='extract_data', python_callable=extract_from_database, )
# Task 2: Validate data validate_data = PythonOperator( task_id='validate_data', python_callable=run_data_validation, )
# Task 3: Feature engineering feature_engineering = PythonOperator( task_id='feature_engineering', python_callable=engineer_features, )
# Task 4: Train model train_model = PythonOperator( task_id='train_model', python_callable=train_ml_model, )
# Task 5: Evaluate model evaluate_model = PythonOperator( task_id='evaluate_model', python_callable=evaluate_model_performance, )
# Task 6: Deploy if good deploy_model = PythonOperator( task_id='deploy_model', python_callable=deploy_to_production, )
# Define dependencies (the DAG structure) extract_data >> validate_data >> feature_engineering feature_engineering >> train_model >> evaluate_model >> deploy_modelML-Specific Patterns in Airflow
Section titled “ML-Specific Patterns in Airflow”from airflow.decorators import dag, taskfrom airflow.operators.python import BranchPythonOperatorfrom airflow.utils.trigger_rule import TriggerRule
@dag( schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False,)def ml_pipeline_with_branching(): """ ML pipeline with conditional deployment based on metrics. """
@task def extract_data(): """Extract training data from source.""" # Extract logic return {'rows': 10000, 'features': 50}
@task def validate_data(data_info: dict): """Validate data quality.""" if data_info['rows'] < 1000: raise ValueError("Insufficient data!") return {'valid': True, 'rows': data_info['rows']}
@task def train_model(data_info: dict): """Train the ML model.""" # Training logic return { 'accuracy': 0.92, 'f1_score': 0.89, 'model_path': '/models/v1.0' }
@task def evaluate_model(metrics: dict): """Evaluate model and decide deployment.""" return { 'deploy': metrics['accuracy'] > 0.90, 'metrics': metrics }
def choose_deployment_path(**context): """Branch based on model quality.""" ti = context['ti'] evaluation = ti.xcom_pull(task_ids='evaluate_model')
if evaluation['deploy']: return 'deploy_to_production' else: return 'notify_failure'
branch = BranchPythonOperator( task_id='branch_on_quality', python_callable=choose_deployment_path, )
@task def deploy_to_production(evaluation: dict): """Deploy model to production.""" print(f"Deploying model with accuracy: {evaluation['metrics']['accuracy']}") return {'deployed': True}
@task def notify_failure(evaluation: dict): """Send notification about failed quality check.""" print(f"Model did not meet threshold: {evaluation['metrics']['accuracy']}") return {'notified': True}
@task(trigger_rule=TriggerRule.ONE_SUCCESS) def cleanup(): """Cleanup temporary files.""" print("Cleaning up...")
# Build the DAG data = extract_data() validated = validate_data(data) model = train_model(validated) evaluation = evaluate_model(model)
branch >> [deploy_to_production(evaluation), notify_failure(evaluation)] [deploy_to_production(evaluation), notify_failure(evaluation)] >> cleanup()
# Instantiate the DAGml_pipeline = ml_pipeline_with_branching()Did You Know? The TaskFlow API (using @task decorators) was introduced in Airflow 2.0 (December 2020). It was created by Kaxil Naik and the community to make DAGs more Pythonic and reduce boilerplate. Before TaskFlow, passing data between tasks required explicit XCom pulls, which was verbose and error-prone.
Kubeflow Pipelines
Section titled “Kubeflow Pipelines”Why This Module Matters
Section titled “Why This Module Matters”Kubeflow Pipelines is designed specifically for ML on Kubernetes. Think of Kubeflow like a factory assembly line where each station (container) has specialized equipment. The data processing station has different tools than the GPU training station, but they all connect smoothly. And because each station is independent, you can upgrade or replace one without disrupting the others.
Kubeflow is perfect for:
- GPU-intensive training jobs
- Reproducible experiments
- Multi-team ML platforms
KUBEFLOW PIPELINES ARCHITECTURE===============================
┌─────────────────────────────────────────────────────────────────┐│ KUBEFLOW PIPELINES │├─────────────────────────────────────────────────────────────────┤│ ││ PIPELINE DEFINITION (Python SDK) ││ ┌─────────────────────────────────────────────────────────┐ ││ │ @component → @pipeline → compile() → upload() │ ││ └─────────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────────┐ ││ │ KUBERNETES CLUSTER │ ││ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││ │ │ Step 1 │─▶│ Step 2 │─▶│ Step 3 │─▶│ Step 4 │ │ ││ │ │ (Pod) │ │ (Pod) │ │ (Pod) │ │ (Pod) │ │ ││ │ │ │ │ +GPU │ │ │ │ │ │ ││ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ ││ └─────────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────────┐ ││ │ METADATA STORE │ ARTIFACT STORE │ UI DASHBOARD │ ││ │ (MySQL) │ (MinIO/GCS) │ (Runs, Metrics) │ ││ └─────────────────────────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────┘Kubeflow Pipeline Definition
Section titled “Kubeflow Pipeline Definition”from kfp import dslfrom kfp import compilerfrom kfp.dsl import Dataset, Model, Metrics, Input, Output
# Define a component@dsl.component( base_image='python:3.10', packages_to_install=['pandas', 'scikit-learn'])def preprocess_data( input_data: Input[Dataset], output_data: Output[Dataset], test_size: float = 0.2): """Preprocess and split data.""" import pandas as pd from sklearn.model_selection import train_test_split
df = pd.read_csv(input_data.path)
# Preprocessing df = df.dropna() df = df[df['value'] > 0]
# Split train, test = train_test_split(df, test_size=test_size)
# Save train.to_csv(output_data.path, index=False)
@dsl.component( base_image='python:3.10', packages_to_install=['pandas', 'scikit-learn', 'xgboost'])def train_model( training_data: Input[Dataset], model_output: Output[Model], metrics_output: Output[Metrics], n_estimators: int = 100, max_depth: int = 6): """Train XGBoost model.""" import pandas as pd import xgboost as xgb from sklearn.metrics import accuracy_score, f1_score import json
df = pd.read_csv(training_data.path) X = df.drop('target', axis=1) y = df['target']
# Train model = xgb.XGBClassifier( n_estimators=n_estimators, max_depth=max_depth ) model.fit(X, y)
# Evaluate predictions = model.predict(X) accuracy = accuracy_score(y, predictions) f1 = f1_score(y, predictions, average='weighted')
# Save model model.save_model(model_output.path)
# Log metrics metrics_output.log_metric('accuracy', accuracy) metrics_output.log_metric('f1_score', f1)
@dsl.component(base_image='python:3.10')def deploy_model( model: Input[Model], metrics: Input[Metrics], endpoint: str) -> str: """Deploy model if metrics pass threshold.""" # Check metrics if metrics.metadata.get('accuracy', 0) < 0.85: return "Model did not meet accuracy threshold"
# Deploy logic print(f"Deploying to {endpoint}") return f"Deployed to {endpoint}"
# Define the pipeline@dsl.pipeline( name='ML Training Pipeline', description='End-to-end ML training with Kubeflow')def ml_training_pipeline( input_data_path: str, test_size: float = 0.2, n_estimators: int = 100, max_depth: int = 6, deploy_endpoint: str = 'production'): # Step 1: Preprocess preprocess_task = preprocess_data( input_data=dsl.importer( artifact_uri=input_data_path, artifact_class=Dataset ), test_size=test_size )
# Step 2: Train (request GPU) train_task = train_model( training_data=preprocess_task.outputs['output_data'], n_estimators=n_estimators, max_depth=max_depth ) train_task.set_gpu_limit(1) train_task.set_memory_limit('8G')
# Step 3: Deploy deploy_task = deploy_model( model=train_task.outputs['model_output'], metrics=train_task.outputs['metrics_output'], endpoint=deploy_endpoint )
# Compile the pipelinecompiler.Compiler().compile( ml_training_pipeline, 'ml_pipeline.yaml')Did You Know? Kubeflow started as an internal Google project in 2017 to simplify ML on Kubernetes. The name comes from “Kubernetes” + “TensorFlow” (though it now supports all frameworks). Google’s David Aronchick led the project, which became one of the fastest-growing Kubernetes projects, reaching 10,000+ GitHub stars within two years.
n8n: Visual AI Workflows
Section titled “n8n: Visual AI Workflows”Why n8n?
Section titled “Why n8n?”n8n is a visual workflow automation tool that’s particularly powerful for AI applications. Think of n8n like building with LEGO blocks—each block (node) does one thing, and you snap them together visually to create complex AI workflows. Non-programmers can build RAG pipelines, chatbot backends, and document processors by dragging and connecting nodes, while programmers can add custom code blocks when needed.
n8n FOR AI WORKFLOWS====================
┌─────────────────────────────────────────────────────────────────┐│ n8n WORKFLOW │├─────────────────────────────────────────────────────────────────┤│ ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ TRIGGER │───▶│ FETCH │───▶│ LLM │───▶│ STORE │ ││ │ │ │ DATA │ │ PROCESS │ │ RESULT │ ││ │ Webhook │ │ HTTP │ │ OpenAI │ │ Postgres│ ││ │ Cron │ │ DB │ │ Claude │ │ Vector │ ││ │ Email │ │ S3 │ │ Ollama │ │ Notion │ ││ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││ ││ AI-SPECIFIC NODES: ││ ───────────────── ││ • OpenAI (gpt-5, embeddings) ││ • Anthropic (Claude) ││ • Ollama (local models) ││ • Vector Stores (Pinecone, Qdrant, Supabase) ││ • Document Loaders (PDF, web, etc.) ││ • Text Splitters (chunk documents) ││ • LangChain nodes ││ ││ EXAMPLE RAG WORKFLOW: ││ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ││ │Webhook│─▶│Embed │─▶│Vector│─▶│ LLM │─▶│Return│ ││ │Query │ │Query │ │Search│ │Answer│ │Response│ ││ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ ││ │└─────────────────────────────────────────────────────────────────┘n8n Workflow Example (JSON)
Section titled “n8n Workflow Example (JSON)”{ "name": "AI Document Q&A", "nodes": [ { "name": "Webhook", "type": "n8n-nodes-base.webhook", "parameters": { "path": "ask", "method": "POST" } }, { "name": "Embed Question", "type": "@n8n/n8n-nodes-langchain.embeddingsOpenAi", "parameters": { "model": "text-embedding-3-small" } }, { "name": "Vector Store Search", "type": "@n8n/n8n-nodes-langchain.vectorStoreQdrant", "parameters": { "operation": "search", "topK": 5 } }, { "name": "Generate Answer", "type": "@n8n/n8n-nodes-langchain.openAi", "parameters": { "model": "gpt-5", "prompt": "Based on the following context, answer the question.\n\nContext: {{$node['Vector Store Search'].json.results}}\n\nQuestion: {{$node['Webhook'].json.question}}" } }, { "name": "Return Response", "type": "n8n-nodes-base.respondToWebhook", "parameters": { "responseBody": "={{$node['Generate Answer'].json.text}}" } } ], "connections": { "Webhook": { "main": [[{ "node": "Embed Question" }]] }, "Embed Question": { "main": [[{ "node": "Vector Store Search" }]] }, "Vector Store Search": { "main": [[{ "node": "Generate Answer" }]] }, "Generate Answer": { "main": [[{ "node": "Return Response" }]] } }}Did You Know? n8n was created by Jan Oberhauser in Berlin in 2019. The name “n8n” is a numeronym for “nodemation” (n-8 letters-n). It raised $12M in Series A funding in 2022 and has become the go-to tool for self-hosted workflow automation, with over 400 integrations and a thriving community of AI workflow builders.
Modern Alternatives: Prefect & Dagster
Section titled “Modern Alternatives: Prefect & Dagster”Prefect: Python-Native Workflows
Section titled “Prefect: Python-Native Workflows”Prefect is a modern alternative to Airflow, designed to be more Pythonic and developer-friendly.
from prefect import flow, taskfrom prefect.tasks import task_input_hashfrom datetime import timedelta
@task( retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))def extract_data(source: str) -> dict: """Extract data from source.""" print(f"Extracting from {source}") return {"rows": 10000, "source": source}
@taskdef transform_data(data: dict) -> dict: """Transform the data.""" return {"rows": data["rows"], "transformed": True}
@taskdef train_model(data: dict) -> dict: """Train ML model.""" return {"accuracy": 0.95, "rows_used": data["rows"]}
@taskdef deploy_if_good(metrics: dict) -> str: """Deploy model if metrics are good.""" if metrics["accuracy"] > 0.90: return "Deployed to production!" return "Model not good enough"
@flow(name="ML Training Pipeline")def ml_pipeline(source: str = "s3://data/training"): """Complete ML training pipeline.""" # Extract raw_data = extract_data(source)
# Transform clean_data = transform_data(raw_data)
# Train metrics = train_model(clean_data)
# Deploy result = deploy_if_good(metrics)
return result
# Run the flowif __name__ == "__main__": ml_pipeline()Prefect vs Airflow:
PREFECT AIRFLOW─────── ───────Python-native DAG files in specific folderDynamic workflows Static DAG structureLocal-first Server-firstHybrid execution Centralized executionBuilt-in caching Manual cachingModern UI Classic UIDagster: Asset-Based Pipelines
Section titled “Dagster: Asset-Based Pipelines”Dagster takes a different approach: instead of defining tasks, you define assets (the data you produce). Think of the difference like cooking: Airflow is like a recipe that says “chop vegetables, then sauté, then serve”—it focuses on the steps. Dagster is like describing the meal itself—“we need chopped vegetables, we need sautéed vegetables, we need a served dish”—and the system figures out the steps to make each thing.
from dagster import asset, AssetExecutionContext, Definitionsfrom dagster import MaterializeResult, MetadataValueimport pandas as pd
@asset( description="Raw user data from database", group_name="bronze")def raw_users(context: AssetExecutionContext) -> pd.DataFrame: """Extract raw user data.""" context.log.info("Extracting raw users...") return pd.DataFrame({ 'user_id': range(1000), 'name': [f'User {i}' for i in range(1000)], 'signup_date': pd.date_range('2024-01-01', periods=1000) })
@asset( description="Cleaned and validated user data", group_name="silver", deps=[raw_users])def clean_users(context: AssetExecutionContext, raw_users: pd.DataFrame) -> pd.DataFrame: """Clean and validate user data.""" # Remove duplicates df = raw_users.drop_duplicates()
# Add derived columns df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days
context.log.info(f"Cleaned {len(df)} users") return df
@asset( description="User features for ML", group_name="gold", deps=[clean_users])def user_features(context: AssetExecutionContext, clean_users: pd.DataFrame) -> MaterializeResult: """Engineer features for ML.""" df = clean_users.copy()
# Feature engineering df['is_new_user'] = df['days_since_signup'] < 30 df['user_segment'] = pd.cut( df['days_since_signup'], bins=[0, 30, 90, 365, float('inf')], labels=['new', 'active', 'mature', 'veteran'] )
# Save df.to_parquet('/data/features/user_features.parquet')
return MaterializeResult( metadata={ 'num_rows': MetadataValue.int(len(df)), 'num_features': MetadataValue.int(len(df.columns)), 'schema': MetadataValue.md(df.dtypes.to_markdown()) } )
@asset( description="Trained churn prediction model", group_name="ml", deps=[user_features])def churn_model(context: AssetExecutionContext) -> MaterializeResult: """Train churn prediction model.""" # Load features features = pd.read_parquet('/data/features/user_features.parquet')
# Train model (simplified) accuracy = 0.92
return MaterializeResult( metadata={ 'accuracy': MetadataValue.float(accuracy), 'training_rows': MetadataValue.int(len(features)) } )
# Define the Dagster jobdefs = Definitions( assets=[raw_users, clean_users, user_features, churn_model])Dagster’s Asset Graph:
┌─────────────┐│ raw_users │ (Bronze)└──────┬──────┘ │ ▼┌─────────────┐│ clean_users │ (Silver)└──────┬──────┘ │ ▼┌─────────────┐│user_features│ (Gold)└──────┬──────┘ │ ▼┌─────────────┐│ churn_model │ (ML)└─────────────┘Did You Know? Dagster was created by Nick Schrock, who previously created GraphQL at Facebook. He founded Elementl in 2018 with the insight that data pipelines should be treated like software: with types, tests, and clear interfaces. The asset-based approach mirrors how data teams actually think about their work.
Temporal: Durable Execution
Section titled “Temporal: Durable Execution”Temporal is different from other tools - it’s designed for long-running, reliable workflows that need to survive failures. Think of Temporal like a notary that records every step of a complex business process. If the power goes out mid-signature, when it comes back, the notary knows exactly which documents were signed and which weren’t—nothing is lost, and you resume from exactly where you stopped. This “durable execution” is why Temporal is used for critical ML workflows that can’t afford to restart from scratch.
from temporalio import activity, workflowfrom temporalio.client import Clientfrom temporalio.worker import Workerfrom dataclasses import dataclassfrom datetime import timedelta
@dataclassclass TrainingConfig: dataset_path: str model_type: str hyperparameters: dict
@dataclassclass TrainingResult: model_path: str metrics: dict
# Activities (the actual work)@activity.defnasync def download_dataset(path: str) -> str: """Download dataset (can take hours for large datasets).""" # Temporal handles retries, timeouts, heartbeats print(f"Downloading from {path}...") return "/local/data/training.parquet"
@activity.defnasync def train_model(local_path: str, config: TrainingConfig) -> dict: """Train model (can take hours/days).""" # Long-running training print(f"Training {config.model_type}...") return {"accuracy": 0.95, "path": "/models/v1"}
@activity.defnasync def deploy_model(model_path: str) -> str: """Deploy to production.""" print(f"Deploying {model_path}...") return "https://api.company.com/model/v1"
# Workflow (the orchestration)@workflow.defnclass MLTrainingWorkflow: """ Durable ML training workflow.
If this crashes mid-training, Temporal will resume from exactly where it left off! """
@workflow.run async def run(self, config: TrainingConfig) -> TrainingResult: # Step 1: Download (with timeout and retries) local_path = await workflow.execute_activity( download_dataset, config.dataset_path, start_to_close_timeout=timedelta(hours=2), retry_policy=RetryPolicy(maximum_attempts=3) )
# Step 2: Train (long-running, with heartbeat) training_result = await workflow.execute_activity( train_model, local_path, config, start_to_close_timeout=timedelta(days=7), heartbeat_timeout=timedelta(minutes=10), )
# Step 3: Human approval (workflow waits!) approved = await workflow.execute_activity( wait_for_human_approval, training_result, start_to_close_timeout=timedelta(days=30), )
if approved: # Step 4: Deploy endpoint = await workflow.execute_activity( deploy_model, training_result["path"], start_to_close_timeout=timedelta(minutes=30), )
return TrainingResult( model_path=training_result["path"], metrics=training_result ) else: raise Exception("Human rejected the model")
# Run the workflowasync def main(): client = await Client.connect("localhost:7233")
result = await client.execute_workflow( MLTrainingWorkflow.run, TrainingConfig( dataset_path="s3://data/training", model_type="xgboost", hyperparameters={"n_estimators": 100} ), id="ml-training-2024-01-15", task_queue="ml-training", )
print(f"Training complete: {result}")When to Use Temporal:
USE TEMPORAL WHEN:──────────────────• Workflows can take hours/days/weeks• Human approval steps are needed• Workflows must survive infrastructure failures• You need exactly-once execution guarantees• Complex compensation/rollback logic
DON'T USE TEMPORAL WHEN:────────────────────────• Simple scheduled jobs (use Airflow)• Quick data transformations (use Dagster)• Real-time streaming (use Kafka/Flink)Comparison Matrix
Section titled “Comparison Matrix”┌────────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐│ Feature │ AIRFLOW │ PREFECT │ DAGSTER │ KUBEFLOW │ TEMPORAL │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Paradigm │ Task-based │ Task-based │ Asset-based │ Container │ Durable ││ │ DAGs │ Flows │ Assets │ Pipelines │ Workflows │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Learning Curve │ Medium │ Low │ Medium │ High │ High │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Best For │ Data/ML │ Modern │ Data │ K8s ML │ Long- ││ │ Pipelines │ Pipelines │ Products │ Training │ running │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Scheduling │ Built-in │ Built-in │ Built-in │ Built-in │ ️ External │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Dynamic DAGs │ ️ Limited │ Native │ Native │ ️ Limited │ Native │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ GPU Support │ ️ Manual │ ️ Manual │ ️ Manual │ Native │ ️ Manual │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Self-hosted │ Yes │ Yes │ Yes │ Yes │ Yes │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Managed Cloud │ MWAA │ Prefect │ Dagster │ Vertex │ Temporal ││ │ GCP │ Cloud │ Cloud │ AI │ Cloud │├────────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤│ Community │ ⭐⭐⭐⭐⭐ │ ⭐⭐⭐⭐ │ ⭐⭐⭐⭐ │ ⭐⭐⭐⭐ │ ⭐⭐⭐ │└────────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘Production Patterns
Section titled “Production Patterns”Pattern 1: Retry with Exponential Backoff
Section titled “Pattern 1: Retry with Exponential Backoff”# Airflowfrom airflow.decorators import task
@task( retries=5, retry_delay=timedelta(minutes=1), retry_exponential_backoff=True, max_retry_delay=timedelta(hours=1))def flaky_api_call(): """Call external API with retries.""" pass
# Prefectfrom prefect import task
@task( retries=5, retry_delay_seconds=[60, 120, 240, 480, 960] # Custom backoff)def flaky_api_call(): passPattern 2: Conditional Branching
Section titled “Pattern 2: Conditional Branching”# Airflowfrom airflow.operators.python import BranchPythonOperator
def choose_path(**context): metrics = context['ti'].xcom_pull(task_ids='evaluate') if metrics['accuracy'] > 0.9: return 'deploy_production' elif metrics['accuracy'] > 0.8: return 'deploy_staging' else: return 'retrain'
branch = BranchPythonOperator( task_id='branch', python_callable=choose_path)Pattern 3: Parallel Execution
Section titled “Pattern 3: Parallel Execution”# Airflow - Dynamic Task Mapping@taskdef process_partition(partition_id: int): return f"Processed {partition_id}"
@dagdef parallel_processing(): partitions = list(range(10))
# This creates 10 parallel tasks! results = process_partition.expand(partition_id=partitions)
aggregate_results(results)
# Prefect - Concurrent Executionfrom prefect import flow, taskfrom prefect.futures import wait
@taskdef process_item(item): return item * 2
@flowdef parallel_flow(): items = range(100) futures = [process_item.submit(item) for item in items] results = [f.result() for f in futures] return resultsPattern 4: Sensors and Triggers
Section titled “Pattern 4: Sensors and Triggers”# Airflow - Wait for filefrom airflow.sensors.filesystem import FileSensor
wait_for_data = FileSensor( task_id='wait_for_data', filepath='/data/daily/{{ ds }}/data.parquet', poke_interval=300, # Check every 5 minutes timeout=3600 * 6, # Timeout after 6 hours mode='reschedule' # Don't block worker)
# Airflow - Wait for external DAGfrom airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor( task_id='wait_for_upstream', external_dag_id='data_ingestion', external_task_id='load_complete', timeout=3600)Decision Framework
Section titled “Decision Framework”CHOOSING AN ORCHESTRATION TOOL==============================
START HERE │ ▼┌─────────────────────────────────────┐│ Do you need Kubernetes-native ML? │└─────────────────────────────────────┘ │ │ YES NO │ │ ▼ ▼┌─────────┐ ┌─────────────────────────────┐│KUBEFLOW │ │ Do workflows run for ││PIPELINES│ │ hours/days with human │└─────────┘ │ approval steps? │ └─────────────────────────────┘ │ │ YES NO │ │ ▼ ▼ ┌─────────┐ ┌─────────────────────────────┐ │TEMPORAL │ │ Is your team data-centric │ └─────────┘ │ (thinking in assets)? │ └─────────────────────────────┘ │ │ YES NO │ │ ▼ ▼ ┌─────────┐ ┌─────────────────────────────┐ │ DAGSTER │ │ Need battle-tested, │ └─────────┘ │ enterprise-grade? │ └─────────────────────────────┘ │ │ YES NO │ │ ▼ ▼ ┌─────────┐ ┌─────────┐ │ AIRFLOW │ │ PREFECT │ └─────────┘ └─────────┘
VISUAL/LOW-CODE NEEDS:──────────────────────• Quick AI prototypes → n8n• LangChain flows → LangFlow• LLM chat flows → Flowise• Multi-language scripts → WindmillHands-On Exercises
Section titled “Hands-On Exercises”Exercise 1: Build an Airflow ML DAG
Section titled “Exercise 1: Build an Airflow ML DAG”Goal: Create a complete ML training pipeline in Airflow that demonstrates dependencies, branching, and best practices.
Requirements:
- Extract data from a CSV file (simulated database)
- Validate data quality (row counts, null checks, schema validation)
- Train a model (XGBoost classifier)
- Evaluate and branch based on metrics
- Deploy to “production” or send failure notification
Success Criteria:
- DAG runs without errors
- Branching works correctly based on accuracy threshold
- XCom passes data between tasks properly
- Retries are configured for flaky tasks
Starter Code:
from airflow import DAGfrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom datetime import datetime, timedelta
default_args = { 'owner': 'ml_team', 'retries': 3, 'retry_delay': timedelta(minutes=5)}
with DAG( 'ml_training_exercise', default_args=default_args, schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False) as dag: # Your tasks here passExtension Challenges:
- Add data versioning with DVC integration
- Implement model registry integration
- Add Slack notifications on success/failure
Exercise 2: Create a Prefect Flow
Section titled “Exercise 2: Create a Prefect Flow”Goal: Build a modern ML pipeline using Prefect’s Python-native approach, demonstrating caching, parallelism, and dynamic workflows.
Requirements:
- Fetch data from an API (use a public API like JSONPlaceholder)
- Process multiple datasets in parallel
- Aggregate results with proper error handling
- Implement caching for expensive operations
Success Criteria:
- Flow executes successfully
- Parallel tasks run concurrently (verify with logs)
- Caching prevents redundant computation on re-runs
- Dynamic task creation based on input data
Starter Code:
from prefect import flow, taskfrom prefect.tasks import task_input_hashfrom datetime import timedeltaimport httpx
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))def fetch_data(user_id: int) -> dict: """Fetch user data from API.""" response = httpx.get(f"https://jsonplaceholder.typicode.com/users/{user_id}") return response.json()
@flow(name="Parallel Data Processing")def process_users(user_ids: list[int]): # Your implementation here passExtension Challenges:
- Add Prefect deployments for scheduled runs
- Implement custom retry logic for API failures
- Add observability with Prefect’s built-in metrics
Exercise 3: Design a Dagster Asset Graph
Section titled “Exercise 3: Design a Dagster Asset Graph”Goal: Build a data platform using Dagster’s asset-based paradigm, demonstrating the medallion architecture (bronze/silver/gold) for ML feature engineering.
Requirements:
- Raw data asset (bronze layer) - simulated data ingestion
- Clean data asset (silver layer) - validation and cleaning
- Feature data asset (gold layer) - ML feature engineering
- ML model asset - trained model ready for serving
Success Criteria:
- Asset graph visualizes correctly in Dagster UI
- Metadata is logged for each asset
- Incremental updates work correctly
- Asset dependencies are properly defined
Starter Code:
from dagster import asset, AssetExecutionContext, Definitionsfrom dagster import MaterializeResult, MetadataValueimport pandas as pd
@asset( description="Raw event data from source system", group_name="bronze", metadata={"owner": "data_engineering"})def raw_events(context: AssetExecutionContext) -> pd.DataFrame: """Simulate raw event data extraction.""" context.log.info("Extracting raw events...") # Your implementation here pass
# Define silver, gold, and ml_model assetsExtension Challenges:
- Add partitioned assets for daily data
- Implement asset checks for data quality
- Add IO managers for different storage backends
Exercise 4: Compare Tool Performance
Section titled “Exercise 4: Compare Tool Performance”Goal: Run the same ML pipeline in Airflow, Prefect, and Dagster to understand the trade-offs between tools.
Requirements:
- Implement identical logic in all three frameworks
- Measure: lines of code, setup time, debugging experience
- Document: what was easy/hard in each tool
- Recommend: which tool for which use case
Deliverable: A written comparison document with code samples and quantitative metrics.
This exercise develops judgment about tool selection—a critical skill for MLOps roles.
Did You Know? At Stripe, teams are allowed to choose between Airflow and Temporal based on use case. This “toolbox” approach—rather than mandating one tool—increased engineer satisfaction by 40% and reduced time-to-production by 25%. The key is understanding when each tool shines.
Further Reading
Section titled “Further Reading”Documentation
Section titled “Documentation”Articles
Section titled “Articles”- “Why We Moved from Airflow to Prefect” (Netflix)
- “Building ML Pipelines at Scale with Kubeflow” (Google)
- “Asset-Based Data Pipelines with Dagster” (Elementl)
Production War Stories
Section titled “Production War Stories”The $2 Million Silent Failure (2019)
Section titled “The $2 Million Silent Failure (2019)”A major fintech company ran their fraud detection model retraining on Airflow. One day, the data extraction task failed silently—it returned an empty DataFrame instead of raising an exception. The pipeline continued merrily:
- Feature engineering processed zero rows (no errors)
- Model training fit on zero samples (technically valid)
- Deployment pushed the “model” to production
For three days, the fraud model predicted 0% fraud probability for everyone. $2 million in fraudulent transactions sailed through before a human noticed the anomaly.
Lesson learned: Add data validation tasks with row count checks and raise exceptions on empty data.
@taskdef validate_data(df: pd.DataFrame) -> pd.DataFrame: """Fail fast on bad data.""" if len(df) == 0: raise ValueError("Empty DataFrame - data extraction likely failed!") if len(df) < 1000: raise ValueError(f"Suspiciously small dataset: {len(df)} rows") return dfThe Runaway GPU Cost (2021)
Section titled “The Runaway GPU Cost (2021)”A startup used Kubeflow to run hyperparameter tuning. The pipeline requested GPU nodes for each experiment—200 parallel runs, each on a $3/hour GPU instance. The engineer forgot to set a timeout.
Some experiments converged quickly (20 minutes). Others got stuck in local minima and ran forever. Three weeks later, the finance team noticed a $50,000 cloud bill for a single hyperparameter search.
Lesson learned: Always set timeouts and cost alerts. Use preemptible/spot instances for non-critical jobs.
The Circular Dependency Nightmare (2020)
Section titled “The Circular Dependency Nightmare (2020)”An ML team at a logistics company built an elaborate Airflow deployment with 50+ DAGs. Over time, different teams added cross-DAG dependencies using sensors. Nobody documented them.
One day, a seemingly unrelated change broke everything: DAG A waited for DAG B, which waited for DAG C, which waited for DAG A. Classic deadlock. But because dependencies spanned multiple DAGs, the Airflow UI showed everything as “running normally.”
It took four days and a full pipeline audit to untangle. The company lost $400,000 in delayed shipment optimizations.
Lesson learned: Document all cross-DAG dependencies. Use airflow dags list-import-errors and external dependency tracking.
The Timezone Bug (2022)
Section titled “The Timezone Bug (2022)”A global e-commerce company scheduled their daily retraining at “00:00” thinking it meant midnight local time. It meant midnight UTC. For their West Coast users, the model retrained at 4 PM—peak shopping hours—causing a 40% latency spike.
Worse, their data extraction job ran at “23:00 UTC” (3 PM Pacific). The model trained on incomplete daily data for months before anyone noticed accuracy degradation.
Lesson learned: Always use explicit timezone specifications. Store everything in UTC and convert at display time.
Common Mistakes and How to Avoid Them
Section titled “Common Mistakes and How to Avoid Them”Mistake 1: Hardcoding Configuration
Section titled “Mistake 1: Hardcoding Configuration”# WRONG - Hardcoded values@taskdef train_model(): model = XGBClassifier( n_estimators=100, # Can't change without code deploy max_depth=6, # A/B testing requires code changes learning_rate=0.1 ) data = pd.read_csv('/data/training.csv') # Fixed path return model.fit(data)
# RIGHT - Parameterized configuration@taskdef train_model(config: dict, data_path: str): """Train with configurable hyperparameters.""" model = XGBClassifier(**config['hyperparameters']) data = pd.read_csv(data_path) return model.fit(data)
# Usage: Pass config via Airflow Variables or Prefect BlocksWhy it matters: Hardcoded values require code deployments to change. Parameterized configs enable A/B testing, emergency fixes, and environment-specific settings without touching code.
Mistake 2: Not Implementing Idempotency
Section titled “Mistake 2: Not Implementing Idempotency”# WRONG - Non-idempotent task@taskdef append_predictions(): """Appends predictions to table - running twice doubles data!""" predictions = generate_predictions() db.execute("INSERT INTO predictions VALUES (?)", predictions)
# RIGHT - Idempotent task@taskdef upsert_predictions(run_date: str): """Upserts predictions - safe to re-run.""" predictions = generate_predictions() # Delete existing predictions for this run, then insert db.execute("DELETE FROM predictions WHERE run_date = ?", run_date) db.execute("INSERT INTO predictions VALUES (?)", predictions)Why it matters: Orchestrators retry failed tasks. Non-idempotent tasks corrupt data on retry. Design every task to be safely re-runnable.
Mistake 3: Passing Large Data Through XCom/Artifacts
Section titled “Mistake 3: Passing Large Data Through XCom/Artifacts”# WRONG - Passing DataFrames through XCom@taskdef extract(): return pd.read_csv('huge_file.csv') # 10GB DataFrame in XCom!
@taskdef transform(data): return data.transform(...) # Serialization nightmare
# RIGHT - Pass references, not data@taskdef extract() -> str: """Extract data and return path reference.""" df = pd.read_csv('huge_file.csv') output_path = '/tmp/extracted_data.parquet' df.to_parquet(output_path) return output_path # Just pass the path
@taskdef transform(data_path: str) -> str: """Load from path, transform, save.""" df = pd.read_parquet(data_path) df_transformed = df.transform(...) output_path = '/tmp/transformed_data.parquet' df_transformed.to_parquet(output_path) return output_pathWhy it matters: XCom (Airflow) and artifacts are for metadata, not data. Passing gigabytes through them causes database bloat, serialization errors, and memory exhaustion.
Interview Preparation
Section titled “Interview Preparation”Question 1: “How would you design an ML pipeline that needs to train on data arriving at different times?”
Section titled “Question 1: “How would you design an ML pipeline that needs to train on data arriving at different times?””Strong Answer:
“I’d use a sensor-based approach with watermarking. Here’s my design:
-
Data arrival sensors: Use FileSensor or custom sensors that poll for data readiness. Each source has its own sensor with configurable timeout.
-
Watermark tracking: Track the latest complete data timestamp for each source. Don’t start training until all sources pass the watermark.
-
Graceful degradation: If one source is late but others are ready, decide based on business needs—wait (data quality priority) or proceed with stale data (freshness priority).
-
Alerting: If any sensor exceeds SLA, alert the on-call engineer before the pipeline auto-fails.
from airflow.sensors.filesystem import FileSensorfrom airflow.utils.dates import days_ago
# Define sensors for each data sourcewait_for_transactions = FileSensor( task_id='wait_for_transactions', filepath='/data/transactions/{{ ds }}/*.parquet', timeout=3600 * 4, # 4 hour SLA mode='reschedule')
wait_for_user_data = FileSensor( task_id='wait_for_user_data', filepath='/data/users/{{ ds }}/*.parquet', timeout=3600 * 2, # 2 hour SLA mode='reschedule')
# Both must complete before training starts[wait_for_transactions, wait_for_user_data] >> start_trainingThis approach handles real-world data arrival patterns while maintaining pipeline reliability.”
Question 2: “Compare Airflow and Prefect. When would you choose each?”
Section titled “Question 2: “Compare Airflow and Prefect. When would you choose each?””Strong Answer:
“Both are excellent—the choice depends on team and use case:
Choose Airflow when:
- You need battle-tested reliability (10+ years in production)
- Your team already knows it (most data engineers do)
- You need extensive operator ecosystem (500+ community operators)
- Enterprise requirements: RBAC, audit logging, compliance
Choose Prefect when:
- You want truly Python-native development (no DAG boilerplate at all)
- Dynamic workflows (task count determined at runtime)
- Hybrid execution (run locally during development, cloud in production)
- Modern developer experience (better debugging, native async)
Example use cases:
- Daily batch ETL with stable structure → Airflow
- ML experiments with varying hyperparameter combinations → Prefect
- Complex data platform with multiple teams → Airflow
- Startup moving fast with small team → Prefect
The best choice is often ‘what your team knows.’ A well-run Airflow pipeline beats a poorly-run anything else.”
Question 3: “How do you handle ML pipeline failures at 3 AM?”
Section titled “Question 3: “How do you handle ML pipeline failures at 3 AM?””Strong Answer:
“Production ML requires defense in depth:
Prevention (before failure):
- Data validation tasks that fail fast on bad input
- Resource limits (memory, timeout) to prevent runaway jobs
- Idempotent tasks that handle retries safely
Detection (catch failures quickly):
- Automatic retries with exponential backoff (3 attempts, 5-10-20 minute delays)
- SLA alerts when tasks exceed expected duration
- Data quality monitoring (row counts, null rates, value distributions)
Response (minimize impact):
- Automated rollback to last-known-good model
- Detailed error context in alerts (not just ‘Task failed’)
- Runbooks for common failure modes
Example monitoring setup:
default_args = { 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'email_on_failure': True, 'email': ['ml-oncall@company.com'], 'sla': timedelta(hours=2) # Alert if task runs too long}
@task(on_failure_callback=notify_pagerduty)def train_model(): # Training logic passThe goal isn’t zero failures—it’s fast detection, automatic recovery, and minimal manual intervention at 3 AM.”
The Economics of ML Orchestration
Section titled “The Economics of ML Orchestration”Build vs. Buy Analysis
Section titled “Build vs. Buy Analysis”| Approach | Monthly Cost | Hidden Costs | Best For |
|---|---|---|---|
| Managed Airflow (MWAA) | $500-2,000 | Low ops overhead | AWS shops, medium scale |
| Self-hosted Airflow | $200-500 (compute) | High ops overhead | Large teams, customization needs |
| Prefect Cloud | $0-1,200 | Minimal | Fast-moving teams |
| Dagster Cloud | $0-2,000 | Minimal | Data-centric teams |
| Kubeflow (self-hosted) | $300-1,000 | K8s expertise required | GPU-heavy ML teams |
| Temporal Cloud | $200-5,000 | Workflow complexity | Long-running workflows |
The Real Cost Formula
Section titled “The Real Cost Formula”Total Cost = Infrastructure + Engineering Time + Failure Cost
Infrastructure:- Managed service fees OR self-hosted compute- Database (metadata store)- Storage (logs, artifacts)
Engineering Time:- Initial setup: 40-200 hours- Ongoing maintenance: 4-20 hours/month- Debugging failures: 10-40 hours/month (without orchestration: 3x more)
Failure Cost:- Revenue loss per hour of downtime- Data quality issues reaching customers- Engineer time for manual recoveryROI Calculation Example
Section titled “ROI Calculation Example”Scenario: E-commerce company running daily price optimization
| Without Orchestration | With Airflow |
|---|---|
| 4 hours/week debugging cron | 1 hour/week pipeline maintenance |
| $50K/month in pricing errors | $5K/month (90% reduction) |
| 3 engineers on-call rotation | 1 engineer with auto-alerting |
| Manual retraining triggers | Automatic daily retraining |
Annual savings: ~$500K in error reduction + ~$200K in engineering time = $700K
Annual cost: ~$30K (managed service + compute)
ROI: 23x in first year
Analogies for Understanding ML Orchestration
Section titled “Analogies for Understanding ML Orchestration”The Orchestra Conductor Analogy
Section titled “The Orchestra Conductor Analogy”An ML pipeline orchestrator is like a symphony conductor. Individual musicians (tasks) are experts at their instruments—the violinist doesn’t need the conductor to play notes. But without coordination:
- The brass might start before the strings are ready
- The percussion could overpower the woodwinds
- Nobody knows when the finale arrives
The conductor (orchestrator) doesn’t play any instrument—they coordinate timing, manage dependencies, and ensure the whole performance comes together. When a musician misses a cue, the conductor signals adjustments. When the venue changes, the conductor adapts the performance.
Just as a conductor uses a score (DAG definition) to coordinate musicians (tasks), an orchestrator uses workflow definitions to ensure data flows correctly through each processing stage.
The Air Traffic Control Analogy
Section titled “The Air Traffic Control Analogy”Think of ML orchestration like managing an airport:
- Planes = ML tasks (training, inference, data processing)
- Runways = Compute resources (GPUs, CPU clusters)
- Flight schedules = Pipeline schedules
- Control tower = Orchestrator (Airflow, Prefect)
- Radar = Monitoring and observability
- Fuel trucks = Data dependencies
Without air traffic control, you’d have planes circling forever waiting for runways, crashes on the tarmac, and no one knowing where any flight is. With proper orchestration, every plane knows its slot, runways are efficiently utilized, and problems are spotted before they become crashes.
The Restaurant Kitchen Analogy
Section titled “The Restaurant Kitchen Analogy”A production ML pipeline is like a busy restaurant kitchen:
Without orchestration (chaos):
- The chef starts the steak before the salad prep is done
- The dessert is ready before the appetizer
- Nobody knows if the special is available
- Customers wait two hours or get cold food
With orchestration (Michelin star kitchen):
- Tickets (DAG runs) define what needs to be cooked and in what order
- Each station (task) knows its dependencies
- The expeditor (scheduler) times everything to arrive hot
- If the grill breaks (task failure), the kitchen manager (alerting) is notified immediately
- Historical data shows which dishes take longest (observability)
The Assembly Line Analogy
Section titled “The Assembly Line Analogy”Henry Ford revolutionized manufacturing with assembly lines—and ML orchestration applies the same principle to data:
Manual ML development is like hand-building cars:
- One expert does everything
- Inconsistent quality
- Can’t scale
- Knowledge lives in one person’s head
Orchestrated ML is like Ford’s assembly line:
- Specialized stations (tasks) for each step
- Standardized interfaces (data contracts)
- Continuous flow (scheduled runs)
- Quality checks at each stage (validation tasks)
- Easy to scale (add more workers)
- Easy to improve (optimize one station without rebuilding everything)
The assembly line analogy also explains why orchestration enables team scaling. Just as Ford could hire specialists for each station, teams can have data engineers build ingestion, ML engineers build training, and platform engineers build deployment—all coordinated by the orchestrator.
The Future of ML Orchestration
Section titled “The Future of ML Orchestration”Trend 1: AI-Powered Pipeline Management
Section titled “Trend 1: AI-Powered Pipeline Management”The next generation of orchestrators will use AI to optimize themselves:
Self-tuning pipelines that automatically adjust:
- Resource allocation based on historical usage patterns
- Retry strategies based on failure mode classification
- Scheduling based on cost optimization (spot instance availability)
Did You Know? Google’s internal ML platform (Vertex AI) already uses reinforcement learning to optimize pipeline scheduling. Their AI scheduler reduced training costs by 23% by learning when GPU prices drop and queueing non-urgent jobs accordingly.
Intelligent alerting that distinguishes:
- “This failure always resolves with a retry” (auto-retry, no alert)
- “This failure pattern preceded a major outage last month” (immediate escalation)
- “This task is slow but within normal variance” (no alert)
Trend 2: Platform Engineering Integration
Section titled “Trend 2: Platform Engineering Integration”The line between ML orchestration and platform engineering is blurring:
GitOps for ML: Pipelines defined entirely in Git, deployed through pull requests, with preview environments for testing DAG changes before production.
Service mesh integration: Orchestrators connecting directly with Istio/Linkerd for intelligent traffic routing, A/B testing, and canary deployments of models.
Developer platforms: Backstage and similar platforms integrating orchestrator UIs, making ML pipelines first-class citizens alongside microservices.
Trend 3: Real-Time ML Operations
Section titled “Trend 3: Real-Time ML Operations”Batch orchestration is mature; real-time is the frontier:
Stream-batch unification: Tools like Apache Flink and Bytewax are bridging batch and streaming. Future orchestrators will treat “run every minute” and “run on every event” as the same abstraction.
Feature platforms: Feature stores (Feast, Tecton) are becoming orchestration-aware. Your pipeline doesn’t just compute features—it registers them for real-time serving.
Online learning pipelines: Models that update continuously require different orchestration patterns—smaller, faster, more frequent runs with continuous validation.
Trend 4: Multi-Cloud and Edge Orchestration
Section titled “Trend 4: Multi-Cloud and Edge Orchestration”The future isn’t “cloud vs. on-prem”—it’s everywhere simultaneously:
Hybrid execution: Train on-prem (data residency requirements), deploy to cloud (global scale), serve at edge (low latency).
Orchestrator federation: One DAG that spans AWS, GCP, and an on-prem cluster—with intelligent task placement based on cost, latency, and data locality.
Edge ML pipelines: Kubeflow and similar tools are extending to Kubernetes clusters running on edge devices. Your pipeline might preprocess on the factory floor, train in the cloud, and deploy back to the floor.
What This Means for Your Career
Section titled “What This Means for Your Career”If you’re learning ML orchestration today, focus on:
-
Fundamentals first: Airflow isn’t going away. Master it before chasing shiny objects.
-
Kubernetes literacy: Even if you use managed services, understanding K8s concepts (pods, services, volumes) makes you more effective.
-
Platform thinking: The best MLOps engineers think beyond individual pipelines to the platform that runs them all.
-
Cost awareness: Cloud bills matter. Engineers who can optimize costs while maintaining reliability are invaluable.
-
Observability skills: Knowing how to instrument, monitor, and debug distributed systems is more valuable than knowing any specific tool.
Debugging ML Pipelines: A Practical Guide
Section titled “Debugging ML Pipelines: A Practical Guide”When pipelines fail at 3 AM, you need systematic debugging approaches. Here’s the methodology used by experienced MLOps engineers.
Step 1: Identify the Failure Scope
Section titled “Step 1: Identify the Failure Scope”First, determine what failed and how badly:
FAILURE SCOPE ASSESSMENT========================
SINGLE TASK FAILURE:- Check task logs first- Usually: bad data, resource exhaustion, external dependency- Fix: Patch and re-run from failed task
CASCADING FAILURE:- Multiple downstream tasks failed- Usually: upstream data issue or resource contention- Fix: Fix root cause, clear downstream, re-run
SCHEDULER FAILURE:- No tasks running at all- Usually: orchestrator health issue- Fix: Check scheduler logs, restart if needed
SILENT FAILURE:- Pipeline "succeeded" but results are wrong- Usually: data validation missing- Fix: Add validation tasks, investigate dataStep 2: Analyze Task Logs
Section titled “Step 2: Analyze Task Logs”Every orchestrator provides task logs. Know how to access them quickly:
Airflow: airflow tasks logs <dag_id> <task_id> <execution_date>
Prefect: Flow runs page → Click run → Task logs panel
Dagster: Asset details → Materialization history → Log viewer
Step 3: Reproduce Locally
Section titled “Step 3: Reproduce Locally”The fastest debugging happens locally. Extract the failing task’s logic and run it standalone:
# Instead of running the whole pipeline# Extract the function and test directly
def debug_failing_task(): """Run the failing task logic locally.""" # Copy the task's inputs from the failed run input_data = pd.read_parquet('/tmp/debug/input.parquet')
# Run the transformation result = transform_data(input_data)
# Inspect intermediate state print(f"Input shape: {input_data.shape}") print(f"Output shape: {result.shape}") print(f"Null counts: {result.isnull().sum()}")
if __name__ == "__main__": debug_failing_task()Step 4: Check Data Assumptions
Section titled “Step 4: Check Data Assumptions”Most ML pipeline failures are data issues. Validate assumptions:
def data_health_check(df: pd.DataFrame, context: str) -> None: """Standard data health check for debugging.""" print(f"\n=== Data Health Check: {context} ===") print(f"Shape: {df.shape}") print(f"Memory: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB") print(f"\nNull counts:") print(df.isnull().sum()) print(f"\nData types:") print(df.dtypes) print(f"\nNumeric statistics:") print(df.describe())Common Failure Patterns and Fixes
Section titled “Common Failure Patterns and Fixes”| Symptom | Likely Cause | Fix |
|---|---|---|
| OOM (Out of Memory) | Data larger than expected | Add chunking, increase memory, or sample |
| Timeout | Slow external dependency | Add timeout handling, use async, cache |
| Schema mismatch | Upstream data changed | Add schema validation, alert on drift |
| Empty results | Filter too aggressive | Add row count assertions, log filter stats |
| Duplicate data | Idempotency missing | Implement upsert pattern, add dedup |
| Stale data | Sensor misconfigured | Check sensor timeout and poke interval |
Did You Know? Netflix’s ML platform team found that 67% of production pipeline failures were caused by data issues (schema changes, null values, volume spikes), not code bugs. They built an automated data validation framework that reduced debugging time by 80% and implemented “data contracts” between teams to catch breaking changes before production.
Pro Tips from Production Engineers
Section titled “Pro Tips from Production Engineers”“The Log Line That Saved Us Hours”
Always log context at task start:
@taskdef process_batch(batch_id: str): logger.info(f"Starting batch {batch_id} at {datetime.now()}") logger.info(f"Memory available: {psutil.virtual_memory().available / 1e9:.2f} GB") logger.info(f"CPU count: {os.cpu_count()}") # ... your logicThis simple pattern has saved countless hours of debugging. When something fails, you immediately know the resource state when it started. Senior engineers at companies like Uber and Spotify swear by this approach—they estimate it cuts initial debugging time by at least fifty percent because you never have to guess what environment the task ran in or what resources were available at the time of failure.
“The Checkpoint Pattern”
For long-running tasks, checkpoint progress so retries don’t start from zero:
@taskdef train_model_with_checkpoints(config: dict): """Training with checkpoint support.""" checkpoint_path = f"/checkpoints/{config['run_id']}.pt"
# Resume if checkpoint exists if os.path.exists(checkpoint_path): model.load_state_dict(torch.load(checkpoint_path)) logger.info(f"Resumed from checkpoint")
for epoch in range(config['epochs']): # Training logic...
# Save checkpoint every N epochs if epoch % 10 == 0: torch.save(model.state_dict(), checkpoint_path)“The Canary Test Pattern”
Before running on all data, test on a sample:
@taskdef safe_transform(data_path: str): """Transform with canary testing.""" df = pd.read_parquet(data_path)
# Canary: run on 1% sample first sample = df.sample(frac=0.01) try: result_sample = transform(sample) assert len(result_sample) > 0, "Transform produced empty result" assert result_sample.isnull().sum().sum() == 0, "Transform produced nulls" except Exception as e: raise ValueError(f"Canary test failed: {e}")
# Full run only if canary passes return transform(df)Building Your First Production Pipeline
Section titled “Building Your First Production Pipeline”Architecture Reference
Section titled “Architecture Reference”PRODUCTION ML PIPELINE ARCHITECTURE===================================
┌─────────────────────────────────────────────────────────────────────────────┐│ ORCHESTRATION LAYER ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ TRIGGER │───▶│ VALIDATE │───▶│ TRAIN │───▶│ DEPLOY │ ││ │ (Sensor) │ │ (Data) │ │ (Model) │ │ (Conditional)│ ││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ ││ │ │ │ │ ││ │ │ │ │ │└─────────┼──────────────────┼──────────────────┼──────────────────┼──────────┘ │ │ │ │ ▼ ▼ ▼ ▼┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ DATA LAKE │ │ FEATURE │ │ EXPERIMENT │ │ MODEL ││ (S3/GCS) │ │ STORE │ │ TRACKER │ │ REGISTRY ││ │ │ (Feast) │ │ (MLflow) │ │ (MLflow) │└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘Implementation Checklist
Section titled “Implementation Checklist”Before launching your first production pipeline:
Infrastructure
- Orchestrator deployed (Airflow/Prefect/Dagster)
- Metadata database provisioned (PostgreSQL for Airflow)
- Artifact storage configured (S3/GCS bucket)
- Worker nodes scaled appropriately
- Network policies allowing task communication
Pipeline Code
- DAG defined with clear dependencies
- All tasks are idempotent
- Data validation at pipeline start
- Model evaluation before deployment
- Conditional deployment (only if metrics pass)
- Cleanup tasks for temporary files
Observability
- SLA alerts configured
- Failure notifications (email/Slack/PagerDuty)
- Logging to centralized system
- Metrics exported (task duration, success rate)
- Dashboard for pipeline health
Operations
- Runbook for common failures
- On-call rotation defined
- Escalation path documented
- Recovery procedures tested
- Cost monitoring in place
Key Takeaways
Section titled “Key Takeaways”-
Orchestration is not optional for production ML. The question isn’t whether to use it, but which tool fits your needs.
-
Start with Airflow if you don’t know what you need. It’s the industry standard with the largest community and job market.
-
Design for failure from day one. Idempotent tasks, data validation, retries with backoff, and meaningful alerts.
-
Don’t pass data through orchestration. Pass references (paths, URLs, IDs). Let storage systems handle the bytes.
-
DAGs should be version-controlled like any other code. Include tests for critical paths.
-
Timezone bugs are real and painful. Standardize on UTC everywhere, convert at display time only.
-
Cross-DAG dependencies are dangerous. Document them explicitly and audit regularly for circular dependencies.
-
Visual tools (n8n) enable non-engineers to build AI workflows. Consider hybrid approaches where data engineers build core pipelines and business users build integrations.
-
Kubeflow shines for GPU workloads but adds Kubernetes complexity. Don’t adopt it just because it’s “cloud-native.”
-
Temporal is overkill for most use cases but essential for multi-day workflows with human approval steps.
Summary
Section titled “Summary”ORCHESTRATION TOOLS SUMMARY===========================
AIRFLOW: Industry standard, battle-tested, large communityPREFECT: Modern, Python-native, better developer experienceDAGSTER: Asset-based, data-centric, great for analyticsKUBEFLOW: Kubernetes-native, ML-focused, GPU supportTEMPORAL: Durable execution, long-running, reliablen8n: Visual, low-code, AI-native nodes
WHEN TO USE:────────────Data Engineering → Airflow, DagsterML Training → Kubeflow, AirflowQuick Prototypes → n8n, PrefectLong-running Jobs → TemporalModern Teams → Prefect, DagsterNext Steps
Section titled “Next Steps”Module 51 will cover Model Deployment & Serving Patterns, including FastAPI, gRPC, canary deployments, and A/B testing.
Module 50 Complete! “The best pipeline is one that runs without you thinking about it.”