Apache Flink: Advanced Stream Processing at Scale
Apache Flink provides advanced stream processing with sophisticated windowing and event-time handling. Learn its architecture, programming model, and use cases.
Apache Flink: Advanced Stream Processing at Scale
Kafka Streams handles filter, map, join, and aggregate. For simple pipelines, it is sufficient. But when you need sophisticated windowing, event-time processing with out-of-order data, or complex stateful computations across streams, you need something more powerful.
Apache Flink is a distributed processing engine for stateful computations over unbounded and bounded data streams. It runs on a cluster (YARN, Kubernetes, or standalone) and provides sophisticated windowing, event-time processing, and exactly-once guarantees across arbitrary data sources.
Flink’s Architecture
Flink follows the master-worker pattern. The JobManager is the master. It schedules tasks, coordinates checkpoints, and manages the distributed coordination. TaskManagers are the workers. They execute tasks and maintain state.
flowchart LR
subgraph JobManager[JobManager]
JM[JobManager]
CheckpointCoordinator[Checkpoint Coordinator]
end
subgraph TaskManagers[TaskManagers]
TM1[TaskManager 1]
TM2[TaskManager 2]
TM3[TaskManager 3]
end
CheckpointCoordinator -->|checkpoint barrier| TM1
CheckpointCoordinator -->|checkpoint barrier| TM2
CheckpointCoordinator -->|checkpoint barrier| TM3
Flink programs run as parallel stream processing jobs. Each operator runs as multiple parallel instances (subtasks), each processing a subset of the stream partitions.
DataStream API: The Foundation
Flink’s DataStream API is the low-level API for stream processing. It provides fine-grained control over time, state, and watermarks.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class EventProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000); // Checkpoint every 10 seconds
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/");
DataStream<Event> events = env
.fromSource(
KafkaSource.<Event>builder()
.setBootstrapServers("kafka:9092")
.setTopics("events")
.setGroupId("flink-consumer")
.build(),
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, timestamp) -> event.getEventTime()),
"Kafka Source"
);
DataStream<Alert> alerts = events
.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.process(new FraudDetectionProcessFunction());
alerts
.sinkTo(SinkWriter.newBuilder()
.setKafkaProducer(new FlinkKafkaProducer<>(
"alerts",
new AlertSerializer(),
KafkaSinkProperties.create()
))
.build());
env.execute("Fraud Detection Pipeline");
}
}
Event Time and Watermarks
The hardest problem in stream processing is handling data that arrives late or out of order. Network partitions, upstream retries, and scheduled batch jobs all cause events to arrive after they should have.
Flink solves this with event time and watermarks.
Event time is the time embedded in the event itself (when the event actually happened). Processing time is when the system processes the event. Using event time lets you reason about the data correctly regardless of when it arrived.
Watermarks are progress markers in the event time stream. A watermark with timestamp T tells Flink: “All events with event time less than T have arrived.” When a watermark arrives, Flink advances its view of time and closes windows that should be complete.
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30)) // Allow 30 seconds late
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1)); // Handle idle partitions
The watermark strategy defines how out-of-order you expect your data to be. If you set a 30-second watermark, Flink waits up to 30 seconds for late events before closing a window.
Late data handling
When events arrive after the watermark, they are considered late. Flink has three options for handling late data:
// Option 1: Side output late events
OutputTag<Event> lateEventsTag = new OutputTag<Event>("late-events"){};
DataStream<Alert> alerts = events
.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(5)))
.sideOutputLateData(lateEventsTag)
.process(new AggregationProcessFunction());
DataStream<Event> lateEvents = alerts
.getSideOutput(lateEventsTag);
// Option 2: Allow late events to update window results
DataStream<Alert> alerts = events
.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Duration.ofMinutes(2)) // Allow 2 minutes late
.process(new AggregationProcessFunction());
Windowing
Flink supports several window types:
Tumbling windows: Fixed-size, non-overlapping windows. Every 5 minutes. Events belong to exactly one window.
Sliding windows: Fixed-size with overlap. Every 5 minutes, but windows overlap by 1 minute. Events can belong to multiple windows.
Session windows: Windows defined by gaps in activity. A session window closes when there is a gap of inactivity longer than the session timeout.
// Tumbling window: count events every 5 minutes
events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregator());
// Sliding window: rolling 10-minute average with 1-minute updates
events
.keyBy(Event::getMetricName)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.aggregate(new AverageAggregator());
// Session window: aggregate until 5 minutes of inactivity
events
.keyBy(Event::getSessionId)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.process(new SessionAggregator());
State and Fault Tolerance
Flink’s state is first-class. Every operator can maintain state, and Flink checkpoints that state to durable storage. On failure, Flink restores operator state from checkpoints and resumes processing.
public class RunningAverageFunction
extends ProcessFunction<Event, Double> {
private ValueState<Long> countState;
private ValueState<Double> sumState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("count", Long.class));
sumState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("sum", Double.class));
}
@Override
public void processEvent(Event event, Context ctx, Collector<Double> out) {
Long currentCount = countState.value();
Double currentSum = sumState.value();
if (currentCount == null) {
currentCount = 0L;
currentSum = 0.0;
}
countState.update(currentCount + 1);
sumState.update(currentSum + event.getValue());
double average = sumState.value() / countState.value();
out.collect(average);
}
}
Flink’s checkpointing uses the Chandy-Lamport algorithm for distributed snapshots. Checkpoint barriers flow through the stream graph. When all inputs have received the barrier for a checkpoint, the checkpoint is complete. This provides exactly-once processing without pausing the pipeline.
Table API and SQL
Flink provides a Table API and SQL interface for stream processing. This makes complex stream analytics accessible to anyone who knows SQL.
-- Flink SQL: compute running revenue per customer
SELECT
customer_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
SUM(amount) AS total_revenue,
COUNT(*) AS transaction_count
FROM transactions
GROUP BY
customer_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
-- Detect anomalies: flag customers with unusual spike in activity
WITH customer_activity AS (
SELECT
customer_id,
HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE) AS window_start,
COUNT(*) AS activity_count
FROM events
GROUP BY
customer_id,
HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
),
averages AS (
SELECT
AVG(activity_count) AS avg_activity,
STDDEV(activity_count) AS stddev_activity
FROM customer_activity
)
SELECT
ca.customer_id,
ca.window_start,
ca.activity_count
FROM customer_activity ca
CROSS JOIN averages a
WHERE ca.activity_count > a.avg_activity + 3 * a.stddev_activity;
Connectors
Flink connectors integrate with external systems:
Sources: Kafka, Kinesis, Pulsar, JDBC, file systems, custom sources
Sinks: Kafka, Kinesis, Pulsar, JDBC, file systems, Elasticsearch, HTTP, custom sinks
Async I/O: For integrating with external services that have high latency (REST APIs, databases), Flink provides async I/O operators that handle thousands of concurrent requests efficiently.
When to Use Flink
Flink is the right choice when:
- You need sophisticated windowing (session windows, multiple window types)
- Your data arrives out of order and you need event-time processing
- You need exactly-once processing with non-Kafka sources or sinks
- You need very large state (Flink scales to terabytes of state per TaskManager)
- You need SQL-based stream analytics
Kafka Streams is simpler for straightforward filter-map-join-aggregate pipelines where all sources and sinks are Kafka topics.
Production Failure Scenarios
Flink’s distributed nature creates failure modes that single-node processing sidesteps entirely.
JobManager single point of failure
The JobManager coordinates everything — task scheduling, checkpoints, failure recovery. If it dies without HA configured, the job stops dead. No coordination happens, no recovery triggers.
Fix: run JobManager HA with standby instances. On Kubernetes this is straightforward. On YARN, use the built-in HA mode. For anything that matters, a single JobManager is not acceptable.
Checkpoint overload
Checkpoints save state to S3 or HDFS. When state hits terabytes, checkpoint files get large. If checkpoint duration exceeds the checkpoint interval, the next checkpoint starts before the previous one finishes. The backlog grows, state falls behind, and you enter a death spiral.
Fix: target checkpoint duration at less than 60% of your checkpoint interval. If checkpoints are consistently taking too long, either shrink your state or widen the interval.
State backend mismatches production load
Flink has two main state backends: Heap (fast, memory-bound) and RocksDB (disk-backed, scalable). Teams pick Heap for speed and hit OOM in production. TaskManagers crash and restart in a loop, each time loading the same oversized state.
Fix: RocksDB for any state that could grow past a few gigabytes. Profile under realistic load before drawing conclusions about memory requirements.
Watermark stalls on idle partitions
A Kafka partition stops producing — upstream drained it or a rebalance is in progress. Other partitions keep going. The watermark stalls. Windows never close because one partition’s watermark never moves. Meanwhile, late events pile up.
Fix: use withIdleness() to exempt dead partitions from watermark advancement. Alert on watermark stalls per partition — a frozen watermark is a clear signal something is wrong upstream.
Backpressure cascading through the DAG
A sink starts throttling — Elasticsearch bulk rejection, a database lock, a HTTP endpoint returning 429. Input buffers fill. This backpressures upstream operators, which backpressures their upstream, all the way to the source.
Fix: isolate sinks that can throttle in their own operator chain with independent parallelism. Monitor buffer utilization and queue lengths. Design sinks to handle backpressure explicitly, not silently.
Trade-off Table: Apache Flink vs Spark Streaming vs Kafka Streams
| Aspect | Apache Flink | Spark Streaming | Kafka Streams |
|---|---|---|---|
| Processing model | Native streaming | Micro-batch | Native streaming |
| Latency | Sub-second | ~500ms minimum | Sub-second |
| State management | Distributed RocksDB | In-memory + disk spill | Local RocksDB |
| Exactly-once | End-to-end | Micro-batch guarantee | Kafka-only |
| Windowing | Time, session, count, global | Time (micro-batch) | Time, session, sliding |
| Event time | Native | Simulated (watermarks) | Supported |
| SQL support | Native Table API + SQL | Spark SQL | No (use ksqlDB) |
| Operational complexity | High | Medium | Low |
| Scaling | Fine-grained, dynamic | Coarse (partition-based) | Partition count ceiling |
| Connectors | Many | Many | Kafka only |
| Best for | Complex event processing, large state | Batch migration, simpler streaming | Kafka-native microservices |
Flink wins for complex event patterns and large-scale stateful processing. Spark Streaming wins for teams migrating from batch Spark. Kafka Streams wins when you want to avoid operating a processing cluster entirely.
Capacity Estimation
State sizing
Flink state lives in RocksDB (default for large state) or on the JVM heap.
// Estimate RocksDB state size per TaskManager
// Assumptions: 50M keys, 4 TaskManagers, 200 bytes per value, 2.5x RocksDB overhead
long totalKeys = 50_000_000L;
int numTaskManagers = 4;
long keysPerTM = totalKeys / numTaskManagers; // 12.5M keys
long bytesPerValue = 200L;
double rocksDBMultiplier = 2.5; // keys + indexes + bloom filters + tombstones
long stateBytesPerTM = keysPerTM * bytesPerValue * rocksDBMultiplier;
// ≈ 6.25GB per TaskManager
// Add 30% for incremental checkpoint overhead during recovery
long totalStatePerTM = stateBytesPerTM * 1.3; // ≈ 8.1GB
For heap-based state, account for JVM heap after operator memory, network buffers, and GC headroom. A 16GB TaskManager with 4GB for state, 4GB for network, and 4GB for operators leaves 4GB for heap — tight.
Checkpoint storage bandwidth
Checkpoints write full state periodically and incremental diffs between. Storage bandwidth matters for checkpoint duration.
| Checkpoint interval | State size | Write rate |
|---|---|---|
| 60 seconds | 100GB | ~1.7 GB/sec |
| 60 seconds | 1TB | ~17 GB/sec |
| 5 minutes | 100GB | ~0.33 GB/sec |
For S3, make sure your network link handles the write rate. Recovery reads are sequential and bulk — same bandwidth, same constraint.
Throughput
Per core, assuming CPU is the bottleneck:
| Operation | Records/sec per core |
|---|---|
| Stateless filter/map | 100–200K |
| Simple window aggregate | 50–100K |
| Stateful join | 20–50K |
| Complex CEP pattern | 5–20K |
A 4-core TaskManager doing a stateful join handles 80–200K records/sec. Profile with production data shapes before committing to a cluster size.
Observability Checklist
Flink exposes metrics via REST API, Prometheus, or Grafana.
Checkpoints: last-checkpoint-duration, last-checkpoint-size, number-of-inprogress-checkpoints. Checkpoint duration trending toward the checkpoint interval means the next checkpoint will start late.
State: state-per-task-gauge per operator. State growing without a corresponding business reason means a memory leak or missing TTL.
Backpressure: task_busy_time and output_throughput together. High busy time with low throughput means a bottleneck — either CPU-bound or a deadlock.
Watermarks: current-event-time-watermark per operator. Stalled watermark while others advance means an idle partition is blocking window closure.
# Checkpoint health
curl -s http://flink-jobmanager:8081/jobs/<job_id>/metrics?get=numFailedCheckpoints,lastCheckpointDuration,lastCheckpointSize
# Backpressure — high ratio means TaskManager is backed up
curl -s http://flink-taskmanager:8082/taskmanagers/<tm_id>/metrics?get=busyTimeMs,busyTimeMsPerSecond
Alert on: any failed checkpoint, checkpoint duration above 80% of interval, watermark stall longer than 5 minutes, TaskManager heap above 80% sustained.
Common Pitfalls and Anti-Patterns
Forgetting to set state TTL
State lives forever unless you configure a TTL. A session window that never closes because the session gap is never reached holds state indefinitely. Months later you discover tens of gigabytes of stale state on every TaskManager.
Always set TTL on state descriptors. The state.clear() pattern is not enough — it only clears on the happy path.
Using processing time for correctness-sensitive workloads
Processing time is easier. It is also wrong the moment there is any lag. A 30-second network hiccup shifts your window results. Always use event time when correctness matters; processing time only belongs in dashboards where approximate is fine.
Ignoring key distribution skew
State is partitioned by key. If a few keys dominate your data (a popular user, a common category), those partitions accumulate far more state than others. The overloaded TaskManager OOMs while neighbours sit half-empty.
Profile your key distribution before going to production. Use composite keys or add a hash prefix to redistribute skewed key spaces.
Checkpoint interval set by habit
A 1-second checkpoint interval on a 100GB state is 100GB written to storage every second. The pipeline spends more time checkpointing than processing. Size your checkpoint interval based on your recovery point objective. If losing a minute of work is acceptable, checkpoint every 60 seconds.
Quick Recap
- Flink runs as a cluster — size, monitor, and manage it as infrastructure.
- Use RocksDB for any state that could grow past a few GB.
- Set a checkpoint interval based on your recovery point objective, not habit.
- JobManager HA is not optional for any production job.
- Watch checkpoint duration, watermark progress, and heap usage as primary signals.
- Set state TTL on every stateful operator.
Conclusion
Apache Flink is the most capable open-source stream processing framework. Its support for event time, sophisticated windowing, and massive state makes it the right choice for complex real-time analytics and event processing.
The operational complexity is higher than Kafka Streams. Flink runs as a cluster that must be sized, monitored, and managed. The tradeoff is capability. For advanced stream processing requirements, Flink delivers what Kafka Streams cannot.
For related reading on stream processing patterns, see Apache Kafka and Kafka Streams.
Category
Related Posts
Kafka Streams: Real-Time Stream Processing Applications
Kafka Streams is a client library for real-time stream processing. Learn stream primitives, state stores, exactly-once processing, and scaling.
Apache Spark Streaming: Micro-Batch Processing
Spark Streaming uses micro-batches for real-time processing. Learn about DStreams, Structured Streaming, watermarking, and exactly-once semantics.
Change Data Capture: Real-Time Database Tracking with CDC
CDC tracks database changes and streams them to downstream systems. Learn how Debezium, log-based CDC, and trigger-based approaches work.