Перейти до вмісту

Module 1.3: Stream Processing with Apache Flink

Цей контент ще не доступний вашою мовою.

Discipline Module | Complexity: [COMPLEX] | Time: 3.5 hours

Before starting this module:

  • Required: Module 1.2 — Apache Kafka on Kubernetes — Kafka fundamentals, topics, partitions, consumer groups
  • Required: Basic Java or Python programming knowledge
  • Recommended: Understanding of SQL (SELECT, GROUP BY, JOIN, window functions)
  • Recommended: Familiarity with event-driven architecture concepts

After completing this module, you will be able to:

  • Implement Apache Flink on Kubernetes using the Flink Operator for stream processing workloads
  • Design Flink job architectures with proper checkpointing, savepoints, and state backend configuration
  • Configure Flink cluster scaling policies that adjust parallelism based on event throughput
  • Build monitoring dashboards that track Flink job health, backpressure, and processing latency

Kafka gives you a firehose of data. But raw data flowing through topics is not insight — it is noise. You need something that can read millions of events per second, transform them, aggregate them, join them with other streams, and output results in real time.

That something is Apache Flink.

Flink is not the only stream processor (Spark Structured Streaming, Kafka Streams, and Apache Beam all exist), but it has won the stream processing war for a simple reason: it was built for streaming from day one. While other frameworks bolted streaming onto batch engines, Flink treats bounded data (batch) as a special case of unbounded data (streaming). This seemingly philosophical difference gives Flink unique capabilities: exactly-once processing, event-time semantics, and millisecond-latency state access that other engines struggle to match.

LinkedIn, Alibaba, Uber, Netflix, and Stripe all run Flink at massive scale. Alibaba processes over 40 billion events per day through Flink during Singles’ Day. When you need to detect fraud in real time, compute live dashboards, or react to events as they happen, Flink is the tool.

This module teaches you to deploy Flink on Kubernetes, understand its execution model, and build real streaming pipelines that consume from Kafka and produce results in real time.


  • Flink started as a research project at TU Berlin called Stratosphere. It was donated to Apache in 2014 and graduated to a top-level project in just 8 months — one of the fastest graduations in Apache history.
  • Flink can maintain terabytes of application state while processing millions of events per second. It uses RocksDB as an embedded state backend, storing state on local disk with in-memory caching, and takes asynchronous snapshots without stopping processing.
  • Alibaba modified Flink so heavily that they open-sourced their version as “Blink.” Many of Blink’s optimizations (including the unified batch/streaming SQL engine) were later merged back into mainline Flink.

Every data processing system must answer one question: does the data have an end?

flowchart TD
subgraph Bounded["Bounded Data (Batch)"]
direction LR
B1[record] --> B2[record] --> B3[record] --> B4[END]
end
B_Desc["Process all records, then output.<br/>Example: Last month's sales CSV."]
subgraph Unbounded["Unbounded Data (Streaming)"]
direction LR
U1[event] --> U2[event] --> U3[event] --> U4[...]
end
U_Desc["Process each event as it arrives.<br/>Example: Live clickstream from website."]
Bounded --- B_Desc
Unbounded --- U_Desc
style Bounded fill:#f9f9f9,stroke:#333,stroke-width:2px
style Unbounded fill:#f9f9f9,stroke:#333,stroke-width:2px
style B_Desc fill:none,stroke:none
style U_Desc fill:none,stroke:none

Traditional batch systems (MapReduce, Spark) were designed for bounded data. They read all input, process it, and write output. Clean, simple, and completely useless for real-time applications.

Flink’s insight: batch is just streaming with an end. Build your engine for unbounded data, and bounded data becomes trivial. The reverse is not true — bolting streaming onto a batch engine produces awkward compromises.

Stop and think: If Flink processes unbounded data continuously, how does it know when to output a result for an aggregation like ‘average purchase amount’?

Consider computing the average purchase amount per customer:

  • Batch approach: Wait for all data, compute average, output. Easy but delayed by hours.
  • Micro-batch approach (Spark Streaming): Collect events for 1-30 seconds, compute average over that window. Lower latency but introduces artificial boundaries.
  • True streaming approach (Flink): Maintain a running average for each customer, update it with every event, output continuously. Lowest latency, most accurate, but requires sophisticated state management.

Flink handles the hard case natively, which is why it excels at streaming.


flowchart TD
subgraph FlinkCluster["FLINK CLUSTER"]
direction LR
subgraph JobManager["JobManager"]
JS[Job Scheduling]
CC[Checkpoint Coord.]
RM[Resource Management]
FR[Failure Recovery]
end
subgraph TaskManagers["TaskManagers"]
direction TB
subgraph TM1["TaskManager 1"]
direction LR
T1[Slot 1]
T2[Slot 2]
T3[Slot 3]
end
subgraph TM2["TaskManager 2"]
direction LR
T4[Slot 4]
T5[Slot 5]
T6[Slot 6]
end
end
JobManager -->|Coordinates| TaskManagers
end

JobManager (the brain):

  • Accepts job submissions and creates execution graphs
  • Coordinates checkpoints across all TaskManagers
  • Manages resource allocation and failover
  • Runs as a Deployment (1 replica, or 3 for HA)

TaskManager (the muscle):

  • Executes the actual data processing tasks
  • Manages local state (in-memory or RocksDB)
  • Exchanges data with other TaskManagers via network buffers
  • Runs as Pods managed by the Flink Kubernetes Operator

Each TaskManager has a fixed number of task slots. A slot is a unit of resource isolation — it gets a fraction of the TaskManager’s memory and can run one parallel pipeline.

flowchart TD
subgraph TM["TaskManager (8 GB memory, 4 slots)"]
direction LR
subgraph S1["Slot 1 (2GB)"]
Src1[Source] --> M1[Map]
end
subgraph S2["Slot 2 (2GB)"]
Src2[Source] --> M2[Map]
end
subgraph S3["Slot 3 (2GB)"]
Src3[Source] --> M3[Map]
end
subgraph S4["Slot 4 (2GB)"]
Src4[Source] --> M4[Map]
end
end

Pause and predict: If you have 24 Kafka partitions but set your Flink source operator’s parallelism to 36, what will the remaining 12 slots do?

Parallelism determines how many slots a job uses. A job with parallelism 8 running on 2 TaskManagers with 4 slots each uses all 8 slots.


The Flink Kubernetes Operator is the official CNCF way to run Flink on Kubernetes. It manages:

FeatureWhat It Does
Job lifecycleSubmit, cancel, suspend, and resume Flink jobs
SavepointsTrigger savepoints before upgrades, restore after
AutoscalingScale TaskManagers based on backpressure or lag
Rolling upgradesUpdate job code with automatic savepoint/restore
Health monitoringRestart failed jobs automatically
Resource managementDynamic resource allocation per job
Terminal window
# Add the Flink Helm repository
helm repo add flink-operator https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm repo update
# Install the operator
kubectl create namespace flink
helm install flink-kubernetes-operator flink-operator/flink-kubernetes-operator \
--namespace flink \
--set webhook.create=true \
--set metrics.port=9999
# Verify installation
kubectl -n flink wait --for=condition=Available \
deployment/flink-kubernetes-operator --timeout=120s

The operator supports two deployment modes:

Application Mode (recommended for production): Each Flink application runs in its own dedicated cluster. The JobManager starts the application’s main() method directly.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: fraud-detector
namespace: flink
spec:
image: my-registry.io/fraud-detector:v2.1.0
flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend.type: rocksdb
state.checkpoints.dir: s3://flink-state/fraud-detector/checkpoints
state.savepoints.dir: s3://flink-state/fraud-detector/savepoints
execution.checkpointing.interval: "60000"
execution.checkpointing.min-pause: "30000"
restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 60s
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
replicas: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 3
job:
jarURI: local:///opt/flink/usrlib/fraud-detector.jar
entryClass: com.example.FraudDetector
parallelism: 12
upgradeMode: savepoint
state: running
savepointTriggerNonce: 0

Session Mode (for development and ad-hoc queries): A long-running Flink cluster accepts multiple job submissions.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-session
namespace: flink
spec:
image: flink:1.20-java17
flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend.type: hashmap
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 2

Stateless transformations (filter, map) are easy. The hard problems — deduplication, windowed aggregation, pattern detection, joins — all require state.

Consider counting page views per URL in the last 5 minutes. For every incoming event, you need to:

  1. Look up the current count for that URL
  2. Increment it
  3. Remove expired entries older than 5 minutes

This requires maintaining a continuously-updated data structure — that is state.

Flink offers two state backends:

BackendStorageBest For
HashMapStateBackendJVM heapSmall state (< 1 GB), development, low latency
EmbeddedRocksDBStateBackendLocal disk + memory cacheLarge state (TB+), production
# In FlinkDeployment spec.flinkConfiguration
flinkConfiguration:
state.backend.type: rocksdb
state.backend.rocksdb.memory.managed: "true"
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: "4"

RocksDB is the production choice because it can handle state far larger than available memory. It stores data on local SSD with an in-memory cache, and Flink manages its memory usage through the managed memory framework.


The Problem: What Happens When Things Crash?

Section titled “The Problem: What Happens When Things Crash?”

Flink processes millions of events per second while maintaining state. If a TaskManager crashes, all the state on that process is lost. Without checkpointing, you would have to reprocess everything from the beginning.

Checkpoints are periodic, consistent snapshots of the entire job’s state. They are taken automatically and stored on durable storage (S3, HDFS, GCS).

flowchart LR
subgraph Stream["Time"]
direction LR
E1[e1] --> E2[e2] --> E3[e3] --> E4[e4] --> E5[e5] --> E6[e6] --> E7[e7] --> E8[e8] --> E9[e9] --> E10[e10]
end
C1((CP 1)) -.-> E2
C2((CP 2)) -.-> E5
C3((CP 3)) -.-> E9
Crash[Crash after e7] -.-> E7
Restore[1. Restore from CP 2] --> C2
Replay[2. Replay e6, e7] --> E6
Continue[3. Continue processing e8+] --> E8

The barrier mechanism:

Flink uses a clever algorithm called aligned checkpointing (inspired by the Chandy-Lamport algorithm):

flowchart LR
Source --> E1[e1] --> E2[e2] --> Barrier[BARRIER] --> E3[e3] --> E4[e4] --> Operator
SnapshotNote["Snapshot your state now,<br/>then forward the barrier"] -.-> Barrier

The barrier flows through the dataflow graph like a regular event. When an operator receives a barrier, it snapshots its state. This ensures the checkpoint is a consistent cut across all operators without stopping processing.

Configuration:

flinkConfiguration:
# Take a checkpoint every 60 seconds
execution.checkpointing.interval: "60000"
# Minimum 30 seconds between checkpoints
execution.checkpointing.min-pause: "30000"
# Exactly-once semantics
execution.checkpointing.mode: EXACTLY_ONCE
# Tolerate 3 consecutive checkpoint failures
execution.checkpointing.tolerable-failed-checkpoints: "3"
# Checkpoint timeout
execution.checkpointing.timeout: "600000"
# Store on S3
state.checkpoints.dir: s3://flink-state/checkpoints
# Keep last 3 checkpoints
state.checkpoints.num-retained: "3"

Savepoints are like checkpoints but manually triggered. Use them for:

  • Job upgrades: Savepoint, deploy new code, restore from savepoint
  • A/B testing: Fork a job into two versions from the same state
  • Migration: Move a job between clusters
Terminal window
# Trigger a savepoint via the Flink Kubernetes Operator
# Update the FlinkDeployment CR:
kubectl -n flink patch flinkdeployment fraud-detector --type merge \
-p '{"spec":{"job":{"savepointTriggerNonce": 1}}}'
# The operator triggers a savepoint and stores it at state.savepoints.dir
# Check status:
kubectl -n flink get flinkdeployment fraud-detector -o yaml | grep -A5 savepointInfo

flowchart LR
subgraph Time["Time in Stream Processing"]
direction TB
ET["Event Time<br/>(Embedded in event)<br/>When it actually happened<br/>e.g., 14:05:03"]
IT["Ingestion Time<br/>When Flink received it<br/>e.g., 14:05:07"]
PT["Processing Time<br/>When Flink processes it<br/>e.g., 14:05:09"]
ET -->|Network Delay| IT -->|Queuing/Buffering| PT
end

Why event time matters: Events arrive out of order. A mobile app might batch events and send them minutes later. A network partition might delay events. If you use processing time, your windowed aggregations will include events in the wrong window.

A watermark is Flink’s way of saying: “I believe all events with timestamps up to time T have arrived.”

flowchart LR
subgraph Arriving Events
direction LR
E1["14:05:01"] --> E2["14:05:03"] --> E3["14:05:02"] --> E4["14:05:05"] --> E5["14:05:04"] --> E6["14:05:07"]
end
subgraph Watermarks
W1["W(14:05:00)<br/>All events before 14:05:00<br/>have arrived"]
W2["W(14:05:01)<br/>All events before 14:05:01<br/>have arrived"]
W3["W(14:05:05)<br/>All events before 14:05:05<br/>have arrived"]
end
E1 -.-> W1
E3 -.-> W2
E5 -.-> W3

Watermark strategies:

// Bounded out-of-orderness: allow up to 5 seconds of late data
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Monotonously increasing timestamps (no late data expected)
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

When the watermark passes the end of a window, Flink fires that window and emits results. Late events (events with timestamps before the watermark) are either dropped or handled by a side output.


Windows group events into finite chunks for aggregation.

gantt
title Tumbling vs Sliding Windows
dateFormat X
axisFormat %s
section Tumbling
0-5 min :0, 5
5-10 min :5, 10
10-15 min :10, 15
section Sliding
0-10 min :0, 10
5-15 min :5, 15
10-20 min :10, 20
  • Session Windows: Dynamic, gap-based windows. A new window starts when an event arrives, and closes when a specified period of inactivity (the gap timeout) passes.
  • Global Windows: A single window per key that encompasses all events. Requires a custom trigger to determine when to compute and emit results.

Flink SQL makes windowed aggregations accessible without Java/Scala code:

-- Tumbling window: count events per URL every 5 minutes
SELECT
url,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS page_views,
COUNT(DISTINCT user_id) AS unique_visitors
FROM page_events
GROUP BY
url,
TUMBLE(event_time, INTERVAL '5' MINUTE);
-- Sliding window: moving average over 1 hour, updated every 5 minutes
SELECT
sensor_id,
HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) AS window_start,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp
FROM sensor_readings
GROUP BY
sensor_id,
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR);

MistakeWhy It HappensWhat To Do Instead
Using processing time for business logicSimpler to implementUse event time with watermarks. Processing time gives inconsistent results on replays
Setting parallelism higher than Kafka partitions”More parallelism = faster”Flink can only read from as many Kafka partitions as you have parallel readers. Extra parallelism is wasted
Not configuring checkpoint storage durablyWorks on local disk in devUse S3/GCS/HDFS for checkpoints. Local disk means state is lost on Pod restart
Putting too much state on heap (HashMapStateBackend)Default backend, works fine initiallySwitch to RocksDB for any state larger than 500 MB. JVM GC pauses will destroy latency
Skipping savepoints during upgrades”Checkpoints will handle it”Checkpoints are tied to a specific job version. Savepoints are portable across versions
Ignoring backpressure signals”Everything seems to be running”Monitor backpressure metrics. Sustained backpressure means your sink or an operator is the bottleneck
Using a single global parallelism for all operatorsSimplicitySet parallelism per operator. Sources might need 12 but sinks only 4
Not handling late data”Events always arrive on time”Define a late data side output. Even 0.1% late events corrupt aggregation results over time
Deploying in Session Mode for production jobsEasy to submit jobs interactivelyUse Application Mode. Session clusters share resources and a bad job can crash the whole cluster

Question 1: You are planning to roll out a new version of your Flink job that changes the business logic of an operator. You need to stop the current job and start the new one without losing state or reprocessing events. Would you rely on a checkpoint or a savepoint for this operation, and why?

Show Answer

You must use a savepoint for this planned upgrade. Savepoints are manually triggered, full snapshots designed specifically for operational tasks like job upgrades, A/B testing, and migrations. They are portable across job versions, provided the state schema remains compatible. While checkpoints also capture state, they are automatic, lightweight snapshots strictly intended for crash recovery and are intimately tied to the specific job graph, meaning they often cannot be used to restore a modified job version.

Question 2: Your e-commerce system uses Flink to compute daily active users. Due to a major cloud outage, mobile client events generated on Tuesday were buffered on devices and didn’t reach Kafka until Wednesday. If your Flink job was configured to use processing time, what would happen to these events?

Show Answer

If the job uses processing time, Tuesday’s events would be incorrectly counted towards Wednesday’s daily active users metric. Processing time evaluates events based on when the Flink TaskManager executes them, completely ignoring when the event actually occurred. This leads to wildly inaccurate windowed aggregations during outages, network delays, or whenever data is replayed from historical storage. To fix this, you must use event time with watermarks, ensuring events are bucketed into Tuesday’s window regardless of when they arrive.

Question 3: You have a Kafka topic with 12 partitions feeding into a Flink job. A junior engineer notices the job is falling behind and increases the parallelism of the Flink source operator to 24, expecting it to process data twice as fast. What will actually happen when the job restarts?

Show Answer

The processing speed will not double, and 12 of the parallel source instances will sit completely idle. A single Kafka partition can only be consumed by exactly one Flink source instance at a time to maintain ordering guarantees. Because the source parallelism (24) exceeds the available Kafka partitions (12), the extra instances will have no data to read, wasting cluster resources. To actually increase throughput in this scenario, the engineer would need to first increase the number of Kafka partitions to 24, and then scale the Flink job.

Question 4: Your team deployed a new real-time fraud detection Flink job that maintains a massive historical profile for every user, accumulating over 2 TB of state. The job is currently crashing repeatedly with OutOfMemoryError in the TaskManagers. What state backend misconfiguration is likely causing this, and how do you fix it?

Show Answer

The job is almost certainly using the default HashMapStateBackend, which stores all in-flight state directly on the JVM heap. When state grows to terabytes, it inevitably exhausts the available heap memory, triggering massive Garbage Collection pauses and eventual out-of-memory crashes. To fix this, you must switch to the EmbeddedRocksDBStateBackend. RocksDB stores state on the local disk (SSD) and only uses memory for caching, allowing Flink to reliably manage state sizes far larger than the TaskManager’s available RAM.

Question 5: You are building a live dashboard for a ride-sharing app. You need to show the total number of rides requested in the last hour, and the dashboard must update every minute to feel “live.” Which windowing strategy should you implement, and why?

Show Answer

You should implement a sliding window configured with a 1-hour size and a 1-minute slide. A sliding window is designed for fixed-size intervals that overlap, meaning a single event will participate in multiple consecutive windows. In this scenario, every minute Flink will emit a new result covering the previous 60 minutes, perfectly matching the requirement for a continuously updating 1-hour metric. A tumbling window would not work here, as it would only emit a single update once per hour.

Question 6: A TaskManager running a critical Flink job suddenly loses power and dies. The job uses EXACTLY_ONCE checkpointing every 60 seconds. Walk through exactly what happens to the events that were being processed during the crash.

Show Answer

When the JobManager detects the lost TaskManager, it immediately cancels all running tasks and initiates a recovery process from the most recent successful checkpoint. The entire job state is restored from durable storage (like S3) to the exact moment that checkpoint was taken. Crucially, the Kafka source operators rewind their read offsets to the positions recorded in that checkpoint. Flink then replays all events from Kafka that occurred after the checkpoint, ensuring no events are skipped and no internal state is double-counted.


Section titled “Hands-On Exercise: Flink Consuming from Kafka with Windowed Aggregations”

Deploy a Flink job that reads events from a Kafka topic, performs windowed aggregations using event time, and writes results to an output topic. You will observe checkpointing, watermark progression, and the effect of late data.

Terminal window
# Create cluster
kind create cluster --name flink-lab
# Install Strimzi and create a Kafka cluster
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl -n kafka wait --for=condition=Available \
deployment/strimzi-cluster-operator --timeout=180s
kafka-for-flink.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: combined
namespace: kafka
labels:
strimzi.io/cluster: flink-lab
spec:
replicas: 1
roles:
- controller
- broker
storage:
type: ephemeral
resources:
requests:
cpu: 250m
memory: 1Gi
limits:
memory: 1Gi
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: flink-lab
namespace: kafka
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
spec:
kafka:
version: 3.9.0
metadataVersion: "3.9"
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
auto.create.topics.enable: false
num.partitions: 3
default.replication.factor: 1
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
entityOperator:
topicOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: sensor-readings
namespace: kafka
labels:
strimzi.io/cluster: flink-lab
spec:
partitions: 3
replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: sensor-aggregates
namespace: kafka
labels:
strimzi.io/cluster: flink-lab
spec:
partitions: 3
replicas: 1
Terminal window
kubectl apply -f kafka-for-flink.yaml
kubectl -n kafka wait kafka/flink-lab --for=condition=Ready --timeout=300s
Section titled “Step 1: Install the Flink Kubernetes Operator”
Terminal window
# Install cert-manager (required by Flink Operator webhooks)
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.16.3/cert-manager.yaml
kubectl -n cert-manager wait --for=condition=Available deployment --all --timeout=120s
# Install Flink Operator
kubectl create namespace flink
helm repo add flink-operator https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm repo update
helm install flink-kubernetes-operator flink-operator/flink-kubernetes-operator \
--namespace flink \
--set webhook.create=true
kubectl -n flink wait --for=condition=Available \
deployment/flink-kubernetes-operator --timeout=120s
Section titled “Step 2: Create the Flink Session Cluster and Submit a SQL Job”

Since the Flink Kubernetes Operator manages job lifecycle, we deploy a session cluster and then use the Flink SQL Client to submit our streaming query.

flink-session.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: sensor-aggregator
namespace: flink
spec:
image: flink:1.20-java17
flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend.type: hashmap
execution.checkpointing.interval: "30000"
execution.checkpointing.mode: EXACTLY_ONCE
state.checkpoints.num-retained: "3"
rest.flamegraph.enabled: "true"
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "2048m"
cpu: 1
replicas: 2
Terminal window
# Create RBAC for Flink
kubectl -n flink create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding \
--clusterrole=edit --serviceaccount=flink:flink
kubectl apply -f flink-session.yaml
# Wait for the session cluster to be ready
kubectl -n flink get flinkdeployment sensor-aggregator -w
# Wait until READY status shows True

Next, download the Kafka SQL connector JAR into the Flink cluster and submit the SQL job:

Terminal window
# Copy the Kafka connector into the running JobManager
FLINK_JM=$(kubectl -n flink get pod -l component=jobmanager,app=sensor-aggregator -o jsonpath='{.items[0].metadata.name}')
# Download the Flink SQL Kafka connector into the JobManager
kubectl -n flink exec $FLINK_JM -- bash -c '
wget -q -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.3.0-1.20/flink-sql-connector-kafka-3.3.0-1.20.jar &&
echo "Kafka connector downloaded"
'
# Submit the SQL job via the SQL Client
kubectl -n flink exec -it $FLINK_JM -- /opt/flink/bin/sql-client.sh embedded -e "
CREATE TABLE sensor_readings (
sensor_id STRING,
temperature DOUBLE,
humidity DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sensor-readings',
'properties.bootstrap.servers' = 'flink-lab-kafka-bootstrap.kafka.svc.cluster.local:9092',
'properties.group.id' = 'flink-sensor-aggregator',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
CREATE TABLE sensor_aggregates (
sensor_id STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
avg_temperature DOUBLE,
max_temperature DOUBLE,
min_temperature DOUBLE,
avg_humidity DOUBLE,
reading_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'sensor-aggregates',
'properties.bootstrap.servers' = 'flink-lab-kafka-bootstrap.kafka.svc.cluster.local:9092',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
SET 'parallelism.default' = '3';
SET 'pipeline.name' = 'sensor-aggregator';
INSERT INTO sensor_aggregates
SELECT
sensor_id,
window_start,
window_end,
AVG(temperature) AS avg_temperature,
MAX(temperature) AS max_temperature,
MIN(temperature) AS min_temperature,
AVG(humidity) AS avg_humidity,
COUNT(*) AS reading_count
FROM TABLE(
TUMBLE(TABLE sensor_readings, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY
sensor_id, window_start, window_end;
"
Terminal window
# Generate sensor readings with event-time timestamps
kubectl -n kafka run producer --rm -it --restart=Never \
--image=quay.io/strimzi/kafka:latest-kafka-3.9.0 -- bash -c '
NOW=$(date +%s)
for i in $(seq 1 200); do
SENSOR="sensor-$((RANDOM % 5 + 1))"
TEMP=$(echo "20 + $((RANDOM % 15))" | bc)
HUMID=$(echo "40 + $((RANDOM % 40))" | bc)
# Vary event times within a 5-minute window
EVENT_TS=$((NOW - RANDOM % 300))
ISO_TS=$(date -u -d @$EVENT_TS +"%Y-%m-%dT%H:%M:%S.000" 2>/dev/null || date -u -r $EVENT_TS +"%Y-%m-%dT%H:%M:%S.000")
echo "{\"sensor_id\":\"$SENSOR\",\"temperature\":$TEMP,\"humidity\":$HUMID,\"event_time\":\"$ISO_TS\"}"
done | bin/kafka-console-producer.sh \
--bootstrap-server flink-lab-kafka-bootstrap:9092 \
--topic sensor-readings
echo "Produced 200 events"
'
Terminal window
# Read the aggregated output
kubectl -n kafka run consumer --rm -it --restart=Never \
--image=quay.io/strimzi/kafka:latest-kafka-3.9.0 -- \
bin/kafka-console-consumer.sh \
--bootstrap-server flink-lab-kafka-bootstrap:9092 \
--topic sensor-aggregates \
--from-beginning \
--max-messages 20
# You should see JSON objects with per-sensor, per-minute aggregations
Terminal window
# Port-forward to the Flink Web UI
kubectl -n flink port-forward svc/sensor-aggregator-rest 8081:8081 &
# Open http://localhost:8081 in your browser
# Explore:
# - Running Jobs → click on your job → see the execution graph
# - Checkpoints tab → verify checkpoints are completing
# - Backpressure tab → check for bottlenecks
Terminal window
kubectl -n flink delete flinkdeployment sensor-aggregator
helm -n flink uninstall flink-kubernetes-operator
kubectl delete -f https://github.com/cert-manager/cert-manager/releases/download/v1.16.3/cert-manager.yaml
kubectl -n kafka delete kafka flink-lab
kubectl -n kafka delete kafkanodepool combined
kubectl delete -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl delete namespace kafka flink
kind delete cluster --name flink-lab

You have completed this exercise when you:

  • Deployed Kafka with input and output topics
  • Installed the Flink Kubernetes Operator
  • Deployed a Flink SQL job that reads from Kafka
  • Produced 200+ test events with event-time timestamps
  • Consumed and verified windowed aggregation results
  • Observed the Flink Web UI (execution graph, checkpoints)

  1. Flink is streaming-first — Batch is treated as a special case of streaming, not the other way around. This gives Flink natural advantages for unbounded data processing.
  2. Event time and watermarks are essential — Without them, out-of-order events produce incorrect results. Always use event time for business-critical aggregations.
  3. Checkpoints provide exactly-once guarantees — Combined with Kafka’s offset management, Flink can guarantee that every event is processed exactly once, even across failures.
  4. The Flink Kubernetes Operator handles lifecycle — Savepoints, upgrades, scaling, and recovery are automated through Custom Resources, enabling GitOps workflows.
  5. State backend choice matters — RocksDB for production (handles TB of state), HashMapStateBackend for development (lower latency, limited by heap).
  6. Parallelism must match your sources — Setting parallelism higher than Kafka partitions wastes resources.

Books:

  • “Stream Processing with Apache Flink” — Fabian Hueske, Vasiliki Kalavri (O’Reilly)
  • “Streaming Systems” — Tyler Akidau, Slava Chernyak, Reuven Lax (O’Reilly) — The theoretical foundation

Articles:

  • “Flink Kubernetes Operator Documentation” — Apache Flink (nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/)
  • “A Practical Guide to Broadcast State in Flink” — Flink blog (flink.apache.org/posts)

Talks:

  • “Flink Forward” — Annual conference with deep-dive talks (youtube.com/c/FlinkForward)
  • “Stateful Functions: Building General-Purpose Applications with Flink” — Stephan Ewen, Flink Forward 2023

Apache Flink is the gold standard for stream processing because it was designed for streaming from the ground up. Its event-time processing, watermark-based progress tracking, and checkpoint-based exactly-once guarantees make it the right choice for any application where timeliness and correctness both matter.

On Kubernetes, the Flink Operator transforms Flink from a complex distributed system into a declarative workload. You describe what you want — a streaming job with specific parallelism, state backend, and checkpointing configuration — and the operator handles the rest: deployment, scaling, upgrades, and failure recovery.

The combination of Kafka (for durable event transport) and Flink (for stateful stream processing) forms the backbone of modern real-time data platforms.


Continue to Module 1.4: Batch Processing & Apache Spark on Kubernetes to learn how to handle large-scale batch processing — the other half of the data processing story.


“Streaming is not the future of data processing. It is the present. Batch is just streaming that waits.” — Tyler Akidau