BlogData Engineering

Apache Flink: Stateful Stream Processing for Real-Time Analytics

James Okafor
James Okafor
Lead Data Engineer
·December 22, 202713 min read

Apache Flink is the leading open-source framework for stateful stream processing — true event-time semantics, exactly-once guarantees, scalable state management, and sub-second latency. This guide covers the Flink execution model, DataStream API, stateful operations, watermarks, windowing, and when Flink is the right choice for real-time data pipelines.

What Apache Flink Is

Apache Flink is an open-source stream processing framework designed for stateful, fault-tolerant processing of unbounded (streaming) and bounded (batch) data at scale. Where Spark Streaming micro-batches data in small intervals, Flink processes truly one event at a time — giving sub-second end-to-end latency with strong consistency guarantees.

Flink's distinguishing capabilities: true event-time semantics, exactly-once state consistency via distributed snapshots, scalable managed state (persisted, versioned, queryable), and expressive windowing APIs for complex temporal aggregations.

Flink Execution Model

**TaskManagers and JobManager**: The JobManager coordinates execution — receives jobs, splits them into tasks, distributes tasks to TaskManagers, and coordinates checkpoints. TaskManagers are the worker processes running task slots. Each slot can run one parallel task. A TaskManager with 4 slots can run 4 parallel pipeline operators simultaneously.

**Parallelism**: Each operator in a Flink job runs with a configurable parallelism — the number of parallel instances. A source with parallelism 4 reads from 4 Kafka partitions simultaneously. Downstream operators can have different parallelism; Flink handles the data redistribution between operators.

**Checkpoints**: Flink achieves exactly-once semantics by periodically snapshotting all operator state and source offsets to durable storage (S3, HDFS). If a task fails, Flink restores all operators to the last successful checkpoint state and replays source events from the checkpointed offsets. Checkpoint interval trades recovery time against processing overhead — 30-second intervals are common for production.

**Savepoints**: Manually triggered checkpoints, used for planned maintenance: upgrade Flink version, change parallelism, redeploy job with code changes. Savepoints preserve job state across redeployments. Unlike checkpoints, savepoints are not automatically deleted.

DataStream API

The DataStream API is Flink's primary interface for streaming computation:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(30000); // 30-second checkpoint interval

DataStream source = env

.addSource(new FlinkKafkaConsumer("events-topic", new JSONKeyValueDeserializer(), kafkaProps));

DataStream result = source

.filter(event -> event.get("amount").asDouble() > 0)

.keyBy(event -> event.get("customer_id").asText())

.window(TumblingEventTimeWindows.of(Time.hours(1)))

.aggregate(new SumAggregator());

result.addSink(new FlinkKafkaProducer("output-topic", new JSONSerializer(), kafkaProps));

env.execute("Customer Revenue Pipeline");

**Key operations**:

- filter: discard events not matching predicate

- map: one-to-one transformation

- flatMap: one-to-many transformation

- keyBy: logically partition stream by key — all events with the same key go to the same parallel instance; enables per-key state

- window: group keyed stream events into time buckets

- process: general-purpose ProcessFunction with access to state, timers, and side outputs

Event Time and Watermarks

Flink processes events using their embedded event timestamp, not the time they arrive at Flink. This handles out-of-order and late events correctly — an event from 14:58 processed at 15:05 due to network delay correctly contributes to the 14:00-15:00 window.

**WatermarkStrategy**: Defines how Flink assigns timestamps to events and generates watermarks.

WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(30))

.withTimestampAssigner((event, timestamp) -> event.getLong("event_time_ms"))

forBoundedOutOfOrderness(30s) means: assume events are at most 30 seconds late. Flink advances the watermark 30 seconds behind the maximum event timestamp seen, allowing events up to 30 seconds late to be included in windows before they close.

**Late data handling**: Events arriving after the watermark has passed their window can be:

- Discarded (default)

- Forwarded to a side output DataStream for separate handling

- Allowed via allowedLateness(Time.minutes(5)) — keep the window open for 5 more minutes after watermark passage

State in Flink

Flink's state management is a core differentiator. State is co-located with the processing operator, persisted to checkpoints, and can grow to hundreds of GB via RocksDB state backend.

**State types**:

- ValueState: single value per key

- ListState: list of values per key

- MapState: key-value map per key

- ReducingState: single value aggregated by reduce function

- AggregatingState: generalised aggregation

Example — count events per customer using keyed ValueState:

public class CountFn extends KeyedProcessFunction {

private ValueState countState;

public void open(Configuration config) {

countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Long.class));

}

public void processElement(Event event, Context ctx, Collector out) throws Exception {

Long count = countState.value();

if (count == null) count = 0L;

count++;

countState.update(count);

out.collect(new Tuple2(event.customerId, count));

}

}

State is automatically included in checkpoints. If the job restarts after a failure, countState is restored to its checkpointed value and processing resumes from the checkpointed Kafka offsets.

Windowing

Windows group stream events into finite buckets for aggregation:

**Tumbling windows**: Non-overlapping fixed-size buckets. TumblingEventTimeWindows.of(Time.hours(1)) groups events into consecutive 1-hour buckets.

**Sliding windows**: Overlapping buckets. SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15)) produces a window result every 15 minutes, each covering the last hour.

**Session windows**: Activity-based. EventTimeSessionWindows.withGap(Time.minutes(30)) groups events into sessions separated by gaps longer than 30 minutes. Session windows are variable-length — user activity determines window boundaries.

**Global windows**: Single unbounded window — used with custom triggers, not time-based aggregation.

Flink SQL and Table API

Flink provides a SQL interface over both bounded and unbounded streams:

TableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql(

"CREATE TABLE orders (" +

" order_id STRING, customer_id STRING, amount DOUBLE, event_time TIMESTAMP(3)," +

" WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND" +

") WITH ('connector' = 'kafka', 'topic' = 'orders', ...)"

);

tEnv.executeSql(

"INSERT INTO revenue_hourly " +

"SELECT customer_id, TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start, SUM(amount) " +

"FROM orders GROUP BY customer_id, TUMBLE(event_time, INTERVAL '1' HOUR)"

);

Flink SQL enables analytics engineers familiar with SQL to write streaming pipelines without Java or Python. The TUMBLE, HOP (sliding), and SESSION window table-valued functions expose windowing directly in SQL.

Flink vs Spark Streaming

**Latency**: Flink processes events one-at-a-time — sub-second latency. Spark Structured Streaming micro-batches in intervals of seconds to minutes — latency bounded by batch interval.

**State**: Flink has first-class stateful processing primitives (ValueState, MapState, timers). Spark state management via mapGroupsWithState is less expressive.

**Fault tolerance**: Flink exactly-once via distributed Chandy-Lamport snapshots. Spark exactly-once via write-ahead logs and idempotent sinks — implementation-dependent.

**SQL**: Flink SQL includes windowed aggregations over streams. Spark SQL streaming lacks native window functions in streaming mode.

**Ecosystem**: Spark has a larger ML ecosystem (MLlib, Spark ML) and broader batch processing adoption. Flink is the preferred choice when streaming correctness, latency, and stateful processing are primary requirements.

Managed Flink on AWS: Amazon Managed Service for Apache Flink

AWS offers fully managed Flink as Amazon Managed Service for Apache Flink (formerly Kinesis Data Analytics). No cluster management — specify parallelism, upload JAR or Python file, Flink handles the rest. Integrates with Kinesis Data Streams as source and Kinesis Firehose, S3, Redshift as sinks. Appropriate for teams on AWS who want Flink correctness without cluster operations.

Google Cloud offers Dataflow with Flink Runner support. Confluent Cloud includes Flink SQL as a managed service on top of Confluent Kafka clusters.

Our data architecture practice designs and builds real-time streaming architectures using Flink, Kafka, and cloud-native services — contact us to discuss your streaming pipeline requirements.

Get your data architecture audit in 30 minutes.

A former Microsoft data architect audits your data foundation, identifies your top priorities, and sends you a written plan. Free. No pitch.

Book a Call →