Module 10.4: Building Custom AIOps
Цей контент ще не доступний вашою мовою.
Toolkit Track | Complexity:
[COMPLEX]| Time: 50-60 minutes
Prerequisites
Section titled “Prerequisites”Before starting this module:
- AIOps Discipline — Complete conceptual foundation
- Module 10.1: Anomaly Detection Tools — Detection libraries
- Python proficiency (pandas, scikit-learn basics)
- Kubernetes basics (Deployments, Services, ConfigMaps)
- Basic understanding of Kafka or similar streaming platforms
What You’ll Be Able to Do
Section titled “What You’ll Be Able to Do”After completing this module, you will be able to:
- Implement custom AIOps pipelines using Python and ML libraries for Kubernetes operational intelligence
- Configure feature engineering pipelines for Prometheus metrics and Kubernetes event data
- Deploy custom anomaly detection models with feedback loops for continuous accuracy improvement
- Evaluate build-versus-buy trade-offs for AIOps capabilities based on team skills and data maturity
Why This Module Matters
Section titled “Why This Module Matters”Platform AI (Datadog Watchdog, Dynatrace Davis) covers 80% of use cases. But sometimes you need custom AIOps because:
- Domain-specific detection — Your anomalies are unique to your business
- Data sovereignty — Data cannot leave your infrastructure
- Cost optimization — Platform AI pricing doesn’t scale for you
- Integration requirements — Need to integrate with proprietary systems
- Competitive advantage — AIOps as a differentiator
Building custom AIOps is a significant investment. This module shows you how to do it right.
Did You Know?
Section titled “Did You Know?”-
Netflix’s anomaly detection system processes billions of time series with custom algorithms. They open-sourced many components (Surus, Argus) but the full system remains proprietary because it’s tightly integrated with their streaming architecture.
-
Uber built Argos for real-time anomaly detection across their microservices. At peak, it processes 500+ million metrics per minute with sub-second detection latency—impossible with off-the-shelf tools at their scale.
-
LinkedIn’s ThirdEye is open-sourced from their internal AIOps platform. It was built after they realized commercial tools couldn’t handle their 10,000+ services generating 30+ million metrics.
-
Pinterest’s Anomaly Detection system uses isolation forest ensembles trained on 2 years of historical data. They found that domain-specific training data improved accuracy by 40% compared to generic models—the key insight that drove them to build custom.
War Story: The $8M Custom AIOps That Saved $50M
Section titled “War Story: The $8M Custom AIOps That Saved $50M”A major e-commerce company was losing $2M per hour during peak shopping periods when their systems had issues. Their off-the-shelf monitoring tools detected problems, but the correlation was too slow—by the time they figured out the root cause, an hour had passed.
The challenge:
- 4,000+ microservices
- 2 million metrics per minute
- Complex dependencies (payment → inventory → shipping → notifications)
- Platform AI tools took 5-15 minutes to correlate events
The decision to build custom:
After a $12M incident during a flash sale (6 hours of degraded checkout), they decided commercial tools weren’t cutting it. They spent $8M over 18 months building a custom AIOps platform.
Architecture decisions:
- Kafka backbone — Every metric, log, and trace flowed through Kafka topics
- Pre-computed topology — Service dependencies updated every 5 minutes, cached in Redis
- Domain-specific detectors — Different ML models for checkout flow vs. browse flow
- Human feedback loop — Engineers rated every alert, continuously improving models
The breakthrough:
Their custom system achieved 45-second detection-to-root-cause time. The key innovation wasn’t fancier ML—it was pre-computed correlation. Instead of correlating events in real-time, they maintained a constantly-updated dependency graph that instantly showed blast radius.
Results after 2 years:
- MTTR dropped from 45 minutes to 6 minutes
- Incident frequency dropped 60% (predictive alerting caught issues before impact)
- Engineering hours on incidents dropped from 400/month to 80/month
- ROI: $50M saved annually vs. $8M investment
The lesson: Custom AIOps makes sense at scale. The break-even point for this company was around 1,000 services. Below that, commercial tools win on cost. Above that, custom wins on accuracy and speed.
Custom AIOps Architecture
Section titled “Custom AIOps Architecture”CUSTOM AIOPS ARCHITECTURE ON KUBERNETES────────────────────────────────────────────────────────────────
DATA SOURCES INGESTION┌─────────┐ ┌─────────────────┐│Prometheus│───────────────────▶│ │├─────────┤ │ Kafka ││ Loki │───────────────────▶│ (Ingest) │├─────────┤ │ ││ Tempo │───────────────────▶│ Topics: │├─────────┤ │ - metrics ││ Events │───────────────────▶│ - logs │└─────────┘ │ - traces │ │ - events │ └────────┬────────┘ │PROCESSING ▼┌─────────────────────────────────────────────────────────┐│ AIOPS PIPELINE ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ Anomaly │ │ Event │ │ Root │ ││ │ Detection │──│ Correlation │──│ Cause │ ││ │ (Python) │ │ (Python) │ │ Analysis │ ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ │ │ ││ └──────────────┬───────────────────┘ ││ ▼ ││ ┌─────────────┐ ││ │ Incident │ ││ │ Manager │ ││ └─────────────┘ │└────────────────────────┬────────────────────────────────┘ │ACTIONS ▼ ┌─────────────────┐ │ - PagerDuty │ │ - Slack │ │ - Auto-Remediate │ - Runbooks │ └─────────────────┘Project Structure
Section titled “Project Structure”custom-aiops/├── docker-compose.yml # Local development├── k8s/ # Kubernetes manifests│ ├── namespace.yaml│ ├── kafka/│ │ ├── kafka-deployment.yaml│ │ └── zookeeper-deployment.yaml│ ├── aiops/│ │ ├── detector-deployment.yaml│ │ ├── correlator-deployment.yaml│ │ └── configmap.yaml│ └── monitoring/│ └── prometheus-servicemonitor.yaml├── src/│ ├── detector/ # Anomaly detection service│ │ ├── main.py│ │ ├── detectors/│ │ │ ├── prophet_detector.py│ │ │ ├── isolation_forest.py│ │ │ └── luminaire_detector.py│ │ └── config.py│ ├── correlator/ # Event correlation service│ │ ├── main.py│ │ ├── correlators/│ │ │ ├── time_based.py│ │ │ ├── topology_based.py│ │ │ └── text_based.py│ │ └── config.py│ ├── incident_manager/ # Incident management│ │ ├── main.py│ │ ├── actions/│ │ │ ├── pagerduty.py│ │ │ ├── slack.py│ │ │ └── remediation.py│ │ └── config.py│ └── common/│ ├── kafka_utils.py│ ├── prometheus_client.py│ └── models.py├── tests/├── requirements.txt└── README.mdStep 1: Data Ingestion Layer
Section titled “Step 1: Data Ingestion Layer”Kafka Setup on Kubernetes
Section titled “Kafka Setup on Kubernetes”apiVersion: apps/v1kind: Deploymentmetadata: name: kafka namespace: aiopsspec: replicas: 1 # Use 3 for production selector: matchLabels: app: kafka template: metadata: labels: app: kafka spec: containers: - name: kafka image: confluentinc/cp-kafka:7.5.0 ports: - containerPort: 9092 env: - name: KAFKA_BROKER_ID value: "1" - name: KAFKA_ZOOKEEPER_CONNECT value: "zookeeper:2181" - name: KAFKA_ADVERTISED_LISTENERS value: "PLAINTEXT://kafka:9092" - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR value: "1" resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m"---apiVersion: v1kind: Servicemetadata: name: kafka namespace: aiopsspec: ports: - port: 9092 selector: app: kafkaPrometheus Metrics Exporter
Section titled “Prometheus Metrics Exporter”"""Export Prometheus metrics to Kafka for AIOps processing."""import jsonimport timefrom datetime import datetimefrom kafka import KafkaProducerfrom prometheus_api_client import PrometheusConnect
class PrometheusExporter: """Export Prometheus metrics to Kafka."""
def __init__( self, prometheus_url: str, kafka_bootstrap: str, topic: str = "metrics" ): self.prom = PrometheusConnect(url=prometheus_url) self.producer = KafkaProducer( bootstrap_servers=kafka_bootstrap, value_serializer=lambda v: json.dumps(v).encode('utf-8') ) self.topic = topic
def export_metrics(self, queries: list[dict]): """ Export metrics to Kafka.
Args: queries: List of {"name": str, "query": str} dicts """ timestamp = datetime.utcnow().isoformat()
for query_def in queries: try: result = self.prom.custom_query(query_def["query"])
for metric in result: message = { "timestamp": timestamp, "name": query_def["name"], "labels": metric["metric"], "value": float(metric["value"][1]) } self.producer.send(self.topic, message)
except Exception as e: print(f"Error exporting {query_def['name']}: {e}")
self.producer.flush()
def run_continuous(self, queries: list[dict], interval: int = 60): """Run continuous export.""" while True: self.export_metrics(queries) time.sleep(interval)
# ConfigurationMETRICS_TO_EXPORT = [ { "name": "http_request_duration_p99", "query": "histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))" }, { "name": "http_request_rate", "query": "sum(rate(http_requests_total[5m])) by (service)" }, { "name": "error_rate", "query": "sum(rate(http_requests_total{status=~'5..'}[5m])) by (service) / sum(rate(http_requests_total[5m])) by (service)" }, { "name": "cpu_usage", "query": "avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)" }, { "name": "memory_usage", "query": "avg(container_memory_usage_bytes) by (pod)" }]
if __name__ == "__main__": exporter = PrometheusExporter( prometheus_url="http://prometheus:9090", kafka_bootstrap="kafka:9092" ) exporter.run_continuous(METRICS_TO_EXPORT, interval=60)Step 2: Anomaly Detection Service
Section titled “Step 2: Anomaly Detection Service”Main Detection Service
Section titled “Main Detection Service”"""Anomaly Detection Service
Consumes metrics from Kafka, detects anomalies, publishes to anomalies topic."""import jsonimport osfrom datetime import datetime, timedeltafrom collections import defaultdictfrom kafka import KafkaConsumer, KafkaProducerfrom .detectors.prophet_detector import ProphetDetectorfrom .detectors.isolation_forest import IsolationForestDetectorfrom .detectors.luminaire_detector import LuminaireDetectorfrom .config import Config
class AnomalyDetectionService: """Main anomaly detection service."""
def __init__(self, config: Config): self.config = config
# Kafka setup self.consumer = KafkaConsumer( config.metrics_topic, bootstrap_servers=config.kafka_bootstrap, value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id="anomaly-detector", auto_offset_reset="latest" )
self.producer = KafkaProducer( bootstrap_servers=config.kafka_bootstrap, value_serializer=lambda v: json.dumps(v).encode('utf-8') )
# Detectors self.detectors = { "prophet": ProphetDetector(), "isolation_forest": IsolationForestDetector(), "luminaire": LuminaireDetector() }
# Metric history for batch detection self.metric_history: dict[str, list] = defaultdict(list) self.history_window = timedelta(hours=24)
def get_metric_key(self, metric: dict) -> str: """Generate unique key for metric series.""" labels = metric.get("labels", {}) sorted_labels = sorted(labels.items()) return f"{metric['name']}:{sorted_labels}"
def add_to_history(self, metric: dict): """Add metric to history buffer.""" key = self.get_metric_key(metric) self.metric_history[key].append({ "timestamp": metric["timestamp"], "value": metric["value"] })
# Trim old data cutoff = datetime.utcnow() - self.history_window self.metric_history[key] = [ m for m in self.metric_history[key] if datetime.fromisoformat(m["timestamp"]) > cutoff ]
def detect_anomaly(self, metric: dict) -> dict | None: """ Run anomaly detection on metric.
Returns anomaly dict if detected, None otherwise. """ key = self.get_metric_key(metric) history = self.metric_history[key]
if len(history) < self.config.min_history_points: return None
# Choose detector based on metric type detector_name = self.config.metric_detector_map.get( metric["name"], "isolation_forest" # default ) detector = self.detectors[detector_name]
# Run detection is_anomaly, score, details = detector.detect( history, metric["value"] )
if is_anomaly: return { "timestamp": metric["timestamp"], "metric_name": metric["name"], "metric_labels": metric.get("labels", {}), "value": metric["value"], "anomaly_score": score, "detector": detector_name, "details": details }
return None
def run(self): """Main processing loop.""" print("Starting Anomaly Detection Service...")
for message in self.consumer: metric = message.value
# Add to history self.add_to_history(metric)
# Detect anomalies anomaly = self.detect_anomaly(metric)
if anomaly: print(f"Anomaly detected: {anomaly['metric_name']}") self.producer.send( self.config.anomalies_topic, anomaly ) self.producer.flush()
if __name__ == "__main__": config = Config.from_env() service = AnomalyDetectionService(config) service.run()Prophet Detector Implementation
Section titled “Prophet Detector Implementation”"""Prophet-based anomaly detector for metrics with seasonality."""import pandas as pdfrom prophet import Prophet
class ProphetDetector: """Anomaly detection using Facebook Prophet."""
def __init__( self, seasonality_mode: str = "multiplicative", interval_width: float = 0.95 ): self.seasonality_mode = seasonality_mode self.interval_width = interval_width self.models: dict = {}
def detect( self, history: list[dict], current_value: float ) -> tuple[bool, float, dict]: """ Detect if current value is anomalous.
Returns: (is_anomaly, score, details) """ if len(history) < 100: return False, 0.0, {"reason": "insufficient_data"}
# Prepare data for Prophet df = pd.DataFrame(history) df.columns = ["ds", "y"] df["ds"] = pd.to_datetime(df["ds"])
# Train Prophet model model = Prophet( seasonality_mode=self.seasonality_mode, interval_width=self.interval_width, daily_seasonality=True, weekly_seasonality=True ) model.fit(df)
# Get forecast for current point future = model.make_future_dataframe(periods=1, freq="min") forecast = model.predict(future) last_row = forecast.iloc[-1]
# Check if current value is outside confidence interval lower = last_row["yhat_lower"] upper = last_row["yhat_upper"] expected = last_row["yhat"]
is_anomaly = current_value < lower or current_value > upper
# Calculate anomaly score (how far outside the interval) if is_anomaly: if current_value < lower: score = (lower - current_value) / (upper - lower) else: score = (current_value - upper) / (upper - lower) else: score = 0.0
details = { "expected": expected, "lower_bound": lower, "upper_bound": upper, "deviation": current_value - expected }
return is_anomaly, min(score, 1.0), detailsIsolation Forest Detector
Section titled “Isolation Forest Detector”"""Isolation Forest anomaly detector for high-dimensional data."""import numpy as npfrom sklearn.ensemble import IsolationForest
class IsolationForestDetector: """Anomaly detection using Isolation Forest."""
def __init__( self, contamination: float = 0.01, n_estimators: int = 100 ): self.contamination = contamination self.n_estimators = n_estimators
def detect( self, history: list[dict], current_value: float ) -> tuple[bool, float, dict]: """ Detect if current value is anomalous.
Returns: (is_anomaly, score, details) """ if len(history) < 50: return False, 0.0, {"reason": "insufficient_data"}
# Extract values values = np.array([h["value"] for h in history]).reshape(-1, 1) current = np.array([[current_value]])
# Train Isolation Forest model = IsolationForest( contamination=self.contamination, n_estimators=self.n_estimators, random_state=42 ) model.fit(values)
# Predict prediction = model.predict(current)[0] score = -model.decision_function(current)[0] # Higher = more anomalous
is_anomaly = prediction == -1
# Calculate statistics for context mean = np.mean(values) std = np.std(values) z_score = (current_value - mean) / std if std > 0 else 0
details = { "mean": float(mean), "std": float(std), "z_score": float(z_score), "isolation_score": float(score) }
return is_anomaly, min(abs(score), 1.0), detailsStep 3: Event Correlation Service
Section titled “Step 3: Event Correlation Service”"""Event Correlation Service
Consumes anomalies from Kafka, correlates related events, creates incidents."""import jsonimport osfrom datetime import datetime, timedeltafrom collections import defaultdictfrom kafka import KafkaConsumer, KafkaProducerfrom .correlators.time_based import TimeBasedCorrelatorfrom .correlators.topology_based import TopologyBasedCorrelatorfrom .correlators.text_based import TextBasedCorrelatorfrom .config import Config
class EventCorrelationService: """Main event correlation service."""
def __init__(self, config: Config): self.config = config
# Kafka setup self.consumer = KafkaConsumer( config.anomalies_topic, bootstrap_servers=config.kafka_bootstrap, value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id="event-correlator", auto_offset_reset="latest" )
self.producer = KafkaProducer( bootstrap_servers=config.kafka_bootstrap, value_serializer=lambda v: json.dumps(v).encode('utf-8') )
# Correlators self.time_correlator = TimeBasedCorrelator( window_seconds=config.correlation_window ) self.topology_correlator = TopologyBasedCorrelator( topology_file=config.topology_file ) self.text_correlator = TextBasedCorrelator()
# Active incidents self.active_incidents: dict = {} self.incident_counter = 0
def correlate_event(self, anomaly: dict) -> str | None: """ Try to correlate anomaly with existing incident.
Returns incident_id if correlated, None otherwise. """ for incident_id, incident in self.active_incidents.items(): # Time correlation if self.time_correlator.correlates(anomaly, incident["events"]): return incident_id
# Topology correlation if self.topology_correlator.correlates(anomaly, incident["events"]): return incident_id
# Text similarity if self.text_correlator.correlates(anomaly, incident["events"]): return incident_id
return None
def create_incident(self, anomaly: dict) -> str: """Create new incident from anomaly.""" self.incident_counter += 1 incident_id = f"INC-{self.incident_counter:06d}"
self.active_incidents[incident_id] = { "id": incident_id, "created_at": anomaly["timestamp"], "updated_at": anomaly["timestamp"], "status": "open", "severity": self.calculate_severity(anomaly), "events": [anomaly], "root_cause": None }
return incident_id
def add_to_incident(self, incident_id: str, anomaly: dict): """Add anomaly to existing incident.""" incident = self.active_incidents[incident_id] incident["events"].append(anomaly) incident["updated_at"] = anomaly["timestamp"]
# Recalculate severity incident["severity"] = max( incident["severity"], self.calculate_severity(anomaly) )
# Try to identify root cause if len(incident["events"]) >= 3: incident["root_cause"] = self.identify_root_cause( incident["events"] )
def calculate_severity(self, anomaly: dict) -> int: """Calculate severity (1-5) based on anomaly.""" score = anomaly.get("anomaly_score", 0)
if score > 0.9: return 5 # Critical elif score > 0.7: return 4 # High elif score > 0.5: return 3 # Medium elif score > 0.3: return 2 # Low else: return 1 # Info
def identify_root_cause(self, events: list[dict]) -> dict | None: """ Identify probable root cause from correlated events. """ if not events: return None
# Sort by timestamp sorted_events = sorted(events, key=lambda e: e["timestamp"])
# First event is often the root cause first_event = sorted_events[0]
# Look for infrastructure-level events infra_events = [ e for e in sorted_events if any(k in e.get("metric_name", "").lower() for k in ["cpu", "memory", "disk", "network"]) ]
if infra_events: root_event = infra_events[0] else: root_event = first_event
return { "event": root_event, "confidence": 0.7, # Would use ML in production "reasoning": "First occurring event in correlation window" }
def publish_incident(self, incident_id: str): """Publish incident to Kafka.""" incident = self.active_incidents[incident_id] self.producer.send(self.config.incidents_topic, incident) self.producer.flush()
def cleanup_old_incidents(self): """Close incidents that haven't had events recently.""" cutoff = datetime.utcnow() - timedelta( minutes=self.config.incident_timeout_minutes )
for incident_id, incident in list(self.active_incidents.items()): last_update = datetime.fromisoformat(incident["updated_at"]) if last_update < cutoff: incident["status"] = "resolved" self.publish_incident(incident_id) del self.active_incidents[incident_id]
def run(self): """Main processing loop.""" print("Starting Event Correlation Service...")
for message in self.consumer: anomaly = message.value
# Try to correlate with existing incident incident_id = self.correlate_event(anomaly)
if incident_id: self.add_to_incident(incident_id, anomaly) print(f"Added to incident: {incident_id}") else: incident_id = self.create_incident(anomaly) print(f"Created incident: {incident_id}")
# Publish updated incident self.publish_incident(incident_id)
# Periodic cleanup self.cleanup_old_incidents()
if __name__ == "__main__": config = Config.from_env() service = EventCorrelationService(config) service.run()Step 4: Incident Manager Service
Section titled “Step 4: Incident Manager Service”"""Incident Manager Service
Consumes incidents from Kafka, takes actions (alert, remediate)."""import jsonimport osfrom kafka import KafkaConsumerfrom .actions.pagerduty import PagerDutyActionfrom .actions.slack import SlackActionfrom .actions.remediation import RemediationActionfrom .config import Config
class IncidentManagerService: """Main incident management service."""
def __init__(self, config: Config): self.config = config
# Kafka setup self.consumer = KafkaConsumer( config.incidents_topic, bootstrap_servers=config.kafka_bootstrap, value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id="incident-manager", auto_offset_reset="latest" )
# Actions self.pagerduty = PagerDutyAction(config.pagerduty_key) self.slack = SlackAction(config.slack_webhook) self.remediation = RemediationAction(config.runbook_dir)
# Track notified incidents self.notified_incidents: set = set()
def should_alert(self, incident: dict) -> bool: """Determine if incident should trigger alert.""" # Only alert on new or escalated incidents if incident["id"] in self.notified_incidents: # Check for escalation return incident["severity"] >= 4
return incident["severity"] >= 3
def should_remediate(self, incident: dict) -> bool: """Determine if auto-remediation should run.""" if not self.config.auto_remediation_enabled: return False
# Only auto-remediate known issues if not incident.get("root_cause"): return False
# Check if runbook exists root_cause_type = self.get_root_cause_type(incident) return self.remediation.has_runbook(root_cause_type)
def get_root_cause_type(self, incident: dict) -> str: """Extract root cause type for runbook matching.""" root_cause = incident.get("root_cause", {}) event = root_cause.get("event", {}) return event.get("metric_name", "unknown")
def handle_incident(self, incident: dict): """Process incident and take appropriate actions."""
# Always notify Slack for visibility self.slack.send(incident)
# Alert on-call if severity warrants if self.should_alert(incident): self.pagerduty.create_incident(incident) self.notified_incidents.add(incident["id"])
# Auto-remediate if possible if self.should_remediate(incident): root_cause_type = self.get_root_cause_type(incident) result = self.remediation.execute(root_cause_type, incident)
# Notify about remediation self.slack.send_remediation_result(incident, result)
def run(self): """Main processing loop.""" print("Starting Incident Manager Service...")
for message in self.consumer: incident = message.value
print(f"Processing incident: {incident['id']}") self.handle_incident(incident)
if __name__ == "__main__": config = Config.from_env() service = IncidentManagerService(config) service.run()Slack Action
Section titled “Slack Action”"""Slack notification action."""import requests
class SlackAction: """Send notifications to Slack."""
def __init__(self, webhook_url: str): self.webhook_url = webhook_url
def send(self, incident: dict): """Send incident notification to Slack.""" severity_emoji = { 1: "ℹ️", 2: "⚠️", 3: "🟠", 4: "🔴", 5: "🚨" }
emoji = severity_emoji.get(incident["severity"], "❓") event_count = len(incident["events"])
# Build message message = { "blocks": [ { "type": "header", "text": { "type": "plain_text", "text": f"{emoji} Incident: {incident['id']}" } }, { "type": "section", "fields": [ { "type": "mrkdwn", "text": f"*Severity:* {incident['severity']}/5" }, { "type": "mrkdwn", "text": f"*Events:* {event_count}" }, { "type": "mrkdwn", "text": f"*Status:* {incident['status']}" }, { "type": "mrkdwn", "text": f"*Created:* {incident['created_at']}" } ] } ] }
# Add root cause if identified if incident.get("root_cause"): rc = incident["root_cause"] message["blocks"].append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*Probable Root Cause:* {rc['event']['metric_name']}\n" f"Confidence: {rc['confidence']*100:.0f}%" } })
requests.post(self.webhook_url, json=message)
def send_remediation_result(self, incident: dict, result: dict): """Send remediation result to Slack.""" status = "✅ Success" if result["success"] else "❌ Failed"
message = { "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": f"*Auto-Remediation {status}*\n" f"Incident: {incident['id']}\n" f"Action: {result['action']}\n" f"Output: ```{result.get('output', 'N/A')}```" } } ] }
requests.post(self.webhook_url, json=message)Step 5: Kubernetes Deployment
Section titled “Step 5: Kubernetes Deployment”apiVersion: apps/v1kind: Deploymentmetadata: name: anomaly-detector namespace: aiops labels: app: anomaly-detectorspec: replicas: 2 selector: matchLabels: app: anomaly-detector template: metadata: labels: app: anomaly-detector spec: containers: - name: detector image: your-registry/anomaly-detector:latest env: - name: KAFKA_BOOTSTRAP value: "kafka:9092" - name: METRICS_TOPIC value: "metrics" - name: ANOMALIES_TOPIC value: "anomalies" - name: MIN_HISTORY_POINTS value: "100" envFrom: - configMapRef: name: aiops-config resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8080 initialDelaySeconds: 5 periodSeconds: 5---apiVersion: apps/v1kind: Deploymentmetadata: name: event-correlator namespace: aiops labels: app: event-correlatorspec: replicas: 1 # Single instance for correlation consistency selector: matchLabels: app: event-correlator template: metadata: labels: app: event-correlator spec: containers: - name: correlator image: your-registry/event-correlator:latest env: - name: KAFKA_BOOTSTRAP value: "kafka:9092" - name: ANOMALIES_TOPIC value: "anomalies" - name: INCIDENTS_TOPIC value: "incidents" - name: CORRELATION_WINDOW value: "300" envFrom: - configMapRef: name: aiops-config resources: requests: memory: "256Mi" cpu: "100m" limits: memory: "512Mi" cpu: "250m"---apiVersion: apps/v1kind: Deploymentmetadata: name: incident-manager namespace: aiops labels: app: incident-managerspec: replicas: 1 selector: matchLabels: app: incident-manager template: metadata: labels: app: incident-manager spec: containers: - name: manager image: your-registry/incident-manager:latest env: - name: KAFKA_BOOTSTRAP value: "kafka:9092" - name: INCIDENTS_TOPIC value: "incidents" envFrom: - secretRef: name: aiops-secrets - configMapRef: name: aiops-config resources: requests: memory: "256Mi" cpu: "100m" limits: memory: "512Mi" cpu: "250m"ConfigMap and Secrets
Section titled “ConfigMap and Secrets”apiVersion: v1kind: ConfigMapmetadata: name: aiops-config namespace: aiopsdata: AUTO_REMEDIATION_ENABLED: "true" INCIDENT_TIMEOUT_MINUTES: "30" RUNBOOK_DIR: "/etc/runbooks"
---# k8s/aiops/secrets.yaml (use sealed-secrets or external-secrets in production)apiVersion: v1kind: Secretmetadata: name: aiops-secrets namespace: aiopstype: OpaquestringData: PAGERDUTY_KEY: "your-pagerduty-integration-key" SLACK_WEBHOOK: "https://hooks.slack.com/services/xxx/xxx/xxx"Common Mistakes
Section titled “Common Mistakes”| Mistake | Impact | Solution |
|---|---|---|
| Training on too little data | High false positive rate | Wait for 2+ weeks of data before alerting |
| Not handling concept drift | Degraded accuracy over time | Retrain models regularly or use online learning |
| Single point of failure | Complete AIOps outage | Deploy with replicas, handle Kafka partitions |
| Ignoring seasonality | Weekend/holiday false positives | Use Prophet or similar for seasonal metrics |
| Alert on every anomaly | Alert fatigue | Require correlation before alerting |
| No observability for AIOps | Blind to AIOps issues | Monitor your monitoring |
Test your understanding of building custom AIOps:
Question 1
Section titled “Question 1”Why use Kafka between pipeline stages?
Show Answer
Kafka provides:
- Decoupling — Services can scale independently
- Durability — Messages persist if consumers are down
- Replay — Can reprocess historical data
- Backpressure — Handles traffic spikes gracefully
- Multiple consumers — Same data feeds multiple services
Question 2
Section titled “Question 2”Why does the correlator run as a single replica?
Show Answer
Event correlation requires global state to group related events. With multiple replicas, different anomalies might go to different instances, breaking correlation.
Solutions for scaling:
- Partition by service/region to isolate correlation domains
- Use distributed state (Redis) for shared correlation state
- Accept eventual consistency with cross-instance sync
Question 3
Section titled “Question 3”What’s the minimum data needed before anomaly detection works?
Show Answer
It depends on the algorithm:
- Prophet: 100+ data points covering at least one full seasonal cycle (1 week minimum for weekly patterns)
- Isolation Forest: 50+ data points for reasonable baseline
- Luminaire: 50+ points for structural break detection
Best practice: Wait 2+ weeks before enabling alerting to avoid false positives during the learning period.
Hands-On Exercise
Section titled “Hands-On Exercise”Objective
Section titled “Objective”Build a minimal custom AIOps pipeline using Python and Docker.
Exercise: Local AIOps Pipeline
Section titled “Exercise: Local AIOps Pipeline”Step 1: Create Project Structure
mkdir custom-aiops && cd custom-aiopsmkdir -p src/detector src/correlatortouch docker-compose.ymltouch src/detector/main.py src/correlator/main.pytouch requirements.txtStep 2: Create docker-compose.yml
version: '3.8'services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 environment: ZOOKEEPER_CLIENT_PORT: 2181
kafka: image: confluentinc/cp-kafka:7.5.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
detector: build: context: . dockerfile: Dockerfile.detector depends_on: - kafka environment: KAFKA_BOOTSTRAP: kafka:29092
correlator: build: context: . dockerfile: Dockerfile.correlator depends_on: - kafka environment: KAFKA_BOOTSTRAP: kafka:29092Step 3: Create Minimal Detector
import jsonimport randomimport timefrom kafka import KafkaProducer, KafkaConsumerimport numpy as np
def create_detector(): producer = KafkaProducer( bootstrap_servers='kafka:29092', value_serializer=lambda v: json.dumps(v).encode('utf-8') )
# Simulate metrics and detect anomalies baseline = 100 history = []
while True: # Generate metric (occasionally anomalous) if random.random() < 0.05: # 5% anomaly rate value = baseline * random.uniform(2, 5) is_anomaly = True else: value = baseline + random.gauss(0, 10) is_anomaly = False
history.append(value) if len(history) > 100: history.pop(0)
# Simple z-score detection if len(history) >= 20: mean = np.mean(history) std = np.std(history) z_score = abs(value - mean) / std if std > 0 else 0
if z_score > 3: anomaly = { "timestamp": time.time(), "metric": "cpu_usage", "value": value, "z_score": z_score } producer.send('anomalies', anomaly) print(f"Anomaly detected: {anomaly}")
time.sleep(1)
if __name__ == "__main__": time.sleep(10) # Wait for Kafka create_detector()Step 4: Run the Pipeline
docker-compose up --buildSuccess Criteria
Section titled “Success Criteria”- Kafka cluster running
- Detector producing anomalies to Kafka
- Can consume anomalies from Kafka topic
- Understand the data flow
Key Takeaways
Section titled “Key Takeaways”- Start with platforms — Build custom only when platform AI doesn’t fit
- Kafka is the backbone — Decouples services, provides durability
- Correlation is hard — Single-instance or distributed state required
- Models need data — Wait 2+ weeks before alerting
- Monitor your monitoring — AIOps needs observability too
Further Reading
Section titled “Further Reading”Open Source AIOps Projects
Section titled “Open Source AIOps Projects”- LinkedIn ThirdEye — Open-source anomaly detection
- Netflix Surus — Collection of anomaly detection tools
- Apache Flink — Stream processing for real-time ML
Building ML Systems
Section titled “Building ML Systems”- Designing Machine Learning Systems — Chip Huyen
- Machine Learning Engineering — Andriy Burkov
- Reliable Machine Learning — O’Reilly
Kafka and Streaming
Section titled “Kafka and Streaming”- Kafka: The Definitive Guide — Confluent
- Streaming Systems — O’Reilly
Summary
Section titled “Summary”Building custom AIOps is a significant investment requiring Kafka for data flow, Python ML libraries for detection, and Kubernetes for deployment. The architecture follows a pipeline pattern: ingest → detect → correlate → act. Start with platform AI and build custom only when you have unique requirements that justify the engineering effort. When you do build custom, ensure you have sufficient data, handle concept drift, and monitor your monitoring.
Toolkit Complete
Section titled “Toolkit Complete”Congratulations! You’ve completed the AIOps Tools Toolkit. You now understand:
- Anomaly Detection Tools — Prophet, Luminaire, PyOD
- Event Correlation Platforms — BigPanda, Moogsoft, PagerDuty
- Observability AI Features — Datadog Watchdog, Dynatrace Davis
- Building Custom AIOps — Python + Kafka + Kubernetes pipelines
Continue your learning:
- AIOps Discipline — Deepen conceptual understanding
- Observability Toolkit — The data collection layer
- SRE Discipline — Apply AIOps to reliability