Exactly-Once Semantics in Stream Processing
Exactly-once ensures each event processes without duplicates or loss. Learn how it works, when you need it, and the true cost of implementation.
Exactly-Once Semantics: Guaranteeing Data Integrity in Stream Processing
Every message processes exactly once. No duplicates. No data loss. This is exactly-once semantics, and it is the gold standard for stream processing correctness.
The name is misleading. Nothing processes exactly once in the physical sense. A message might be read, processed, written, and then a failure occurs before the offset is committed. On restart, the message is read again. That is two physical reads. Exactly-once semantics guarantee that the output is as if there was only one read.
Exactly-once is hard because processing involves multiple systems: the message broker, the processor, and the output system. A failure at any point can break the guarantee.
Why Exactly-Once Is Hard
Consider the canonical stream processing path:
Kafka (or other source) -> Processor -> Database (or other sink)
For exactly-once to hold, three things must be true:
- The source must not replay the same message after the processor has acknowledged it
- The processor must not produce output twice for the same message
- The sink must not write twice for the same message
Failure can happen anywhere. A crash after writing to the database but before committing the source offset means the message will be replayed. A crash after committing the offset but before writing to the database means the message is lost. Both are failures of exactly-once.
flowchart LR
Source -->|1. read| Processor[Processor]
Processor -->|2. process| DB[(Database)]
Processor -->|3. commit offset| Source
DB -->|4. ack| Processor
If step 3 happens before step 2 completes, the offset advances and the write is lost on restart. If step 2 completes before step 3, the write is duplicated on restart.
The Two-Phase Commit Pattern
The classic solution is two-phase commit (2PC). The processor writes to a staging area and coordinates a commit that covers both the output and the offset.
- Read the message
- Write to a staging area (not visible to consumers yet)
- Commit the offset
- Make the staged write visible (commit the output)
If the processor crashes at any point, the staged write is either not committed (output not visible, message will be replayed) or already committed (output visible, message will not be replayed).
Flink implements this with its checkpointing mechanism. Flink draws distributed snapshots using the Chandy-Lamport algorithm. Checkpoint barriers flow through the stream graph. When all inputs have received the barrier, the checkpoint is complete. The checkpoint includes both the stream position (offset) and the operator state.
// Flink: Enable exactly-once checkpointing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000); // Checkpoint every 10 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
On failure, Flink restores from the last successful checkpoint. State is restored, stream position (offset) is restored, and processing resumes from exactly that point.
Kafka Transactions
Kafka provides exactly-once semantics through transactions. A Kafka transaction atomically commits a consumer offset and a set of producer records.
// Kafka Streams: exactly-once with transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// The exactly-once_v2 setting uses Kafka transactions internally:
// - Consumer offsets are written to a consumer topic
// - Output records are written to output topics
// - Both are committed in a single transaction
The producer in exactly-once mode is idempotent by default. Each record includes a producer ID and sequence number. Duplicate records with the same sequence number are ignored by the broker.
// Idempotent producer (already enabled by default)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
TheTransactional Outbox Pattern
Exactly-once within Kafka is straightforward. Exactly-once with external systems is harder. When your processor writes to a database, Kafka transactions do not cover the database write.
The transactional outbox pattern solves this. Instead of writing directly to the database and to Kafka separately, write both to a local transaction log (the outbox). A separate process reads the outbox and publishes to Kafka AND updates the database.
flowchart LR
App[Application] -->|1. begin tx| DB[(Database)]
App -->|2. write to outbox| DB
App -->|3. commit tx| DB
DB -->|4. read outbox| Relay[Outbox Relay]
Relay -->|5. publish to Kafka| Kafka
Relay -->|6. mark published| DB
The database transaction guarantees atomicity of the business operation and the outbox write. The relay guarantees that the Kafka message is published only after the outbox record is committed. If the relay crashes after publishing but before marking the record, it republishes the same message (idempotent Kafka producer handles duplicates).
Idempotent Consumers
Most exactly-once implementations ultimately rely on idempotency. Exactly-once means: if you run the pipeline multiple times with the same input, you get the same output.
An idempotent consumer ignores duplicates:
def process_order(order_id, order_data):
"""
Idempotent order processing.
Uses the order_id as a deduplication key.
"""
existing = db.query("SELECT * FROM orders WHERE order_id = %s", order_id)
if existing:
return # Already processed, skip
db.execute(
"INSERT INTO orders (order_id, data) VALUES (%s, %s)",
order_id, order_data
)
db.commit()
Even if the consumer processes the same order_id twice, only one record is inserted. The database’s unique constraint on order_id enforces idempotency.
For stateful aggregation, idempotency is more complex. You cannot just insert or update. You need exactly-once aggregation, which requires the checkpoint-based approaches described above.
When You Need Exactly-Once
Exactly-once is the right guarantee for:
- Financial transactions: A payment must not be processed twice
- Inventory updates: An item must not be reserved twice
- Billing records: A customer must not be charged twice
Exactly-once is overkill for:
- Metrics aggregation: If you miss one page view in a count of millions, the error is negligible
- Click tracking: Same reasoning as metrics
- Non-critical logs: Missing or duplicating a log entry rarely matters
Exactly-once has a real cost. It requires coordination between systems (checkpointing, transactions, distributed consensus). This coordination adds latency and reduces throughput. For many pipelines, at-least-once with idempotent consumers is faster and simpler.
At-Least-Once vs Exactly-Once
At-least-once is the pragmatic middle ground. Every message is processed at least once. If failures occur, messages are replayed. The result may contain duplicates, but no message is lost.
For most use cases, at-least-once with idempotent consumers produces correct results. If your consumer is idempotent, replaying a message produces the same result as processing it once. The duplicates are harmless.
# This consumer is idempotent. At-least-once with this consumer == exactly-once in practice.
@task
def load_orders_to_warehouse(orders):
"""
Idempotent load: uses UPSERT (INSERT ON CONFLICT).
Processing the same order twice produces the same result.
"""
for order in orders:
warehouse.execute("""
INSERT INTO orders (order_id, data)
VALUES (%s, %s)
ON CONFLICT (order_id) DO UPDATE SET data = EXCLUDED.data
""", order.order_id, order.data)
Measuring Exactly-Once
How do you know if your pipeline is truly exactly-once? You test it.
Chaos testing: Kill processes mid-execution. Restart. Verify that output is correct and no duplicates exist.
Inject duplicates: Intentionally inject duplicate messages into the stream. Verify that output contains each record exactly once.
Check output cardinality: For a given input, verify that output count matches input count (no duplicates, no losses).
Common Exactly-Once Pitfalls
Assuming Kafka exactly-once covers everything
Kafka transactions guarantee exactly-once within Kafka. If your pipeline writes to an external system (database, HTTP endpoint), Kafka transactions do not cover those writes. You need additional idempotency or outbox patterns.
Checkpointing too infrequently
Long checkpoint intervals mean long recovery times. If a checkpoint takes 10 minutes and the last checkpoint was 9 minutes ago, recovery takes up to 10 minutes. For critical pipelines, choose checkpoint intervals that match your recovery time objective.
State store growth
With exactly-once checkpointing, state stores can grow unbounded if watermarks are not set correctly. Always configure state TTL and watermark grace periods to allow state cleanup.
Production Failure Scenarios
Duplicate output after crash during dual write
The processor writes to the database and commits the Kafka offset in a single transaction. The transaction commits but the acknowledgment fails to return. The processor crashes and restarts. The database write is committed. The offset is not committed. The message is replayed. The database write runs again — duplicate output.
The problem: the transaction boundary and the offset commit are not truly atomic from the processor’s perspective. Fix: the transactional outbox pattern, or a recovery validation step that reconciles database state with offset position after restart.
Checkpoint restore produces inconsistent window state
A stateful Flink job crashes. On restart, it restores from the last checkpoint. The checkpoint was taken mid-window computation. Restoring produces a window result that only reflects the pre-checkpoint state. Records between the checkpoint and the crash are reprocessed and combined with the restored partial aggregate — producing wrong numbers.
Test checkpoint restores explicitly. Inject failures during window computations and verify that restored state produces correct results. The timing of the checkpoint relative to window boundaries matters.
Unique constraint race in idempotent consumers
Two instances process the same order_id simultaneously. Both check — neither finds a match. Both attempt to insert. One succeeds. The other hits a unique constraint and fails.
If the failed instance retries, it succeeds. Net result: correct. But the race creates brief inconsistency windows. Use INSERT ... ON CONFLICT DO UPDATE instead of check-then-insert. The database resolves the race atomically.
Kafka exactly-once skipping writes to non-transactional sinks
Kafka transactions make the producer idempotent and the offset commit atomic with the record write. But appending to a Parquet file or writing to S3 via multipart upload is not part of the Kafka transaction. A crash after the Kafka commit but before the filesystem write completes means the write is missing. On restart, the offset has advanced and the record is skipped.
Use transactional sinks, or build a reconciliation process to detect and fill gaps.
Trade-off Table: Exactly-Once Approaches
| Aspect | Kafka Transactions | Flink Checkpointing | Spark Checkpointing | Idempotent Consumer |
|---|---|---|---|---|
| Scope | Kafka-to-Kafka only | End-to-end | End-to-end | Sink-side only |
| State management | Local RocksDB | Distributed RocksDB | RocksDB | No state |
| Recovery time | Milliseconds | Depends on checkpoint | Depends on checkpoint | Immediate |
| Throughput cost | Low | Medium | Medium | None |
| External sink support | No | Yes (if transactional) | Yes (if transactional) | Yes |
| Complexity | Low | High | Medium | Low |
| Exactly-once for aggregates | No | Yes | Yes | No |
| Best for | Kafka-native pipelines | Stateful Flink jobs | Stateful Spark jobs | External database sinks |
Kafka transactions for pure Kafka pipelines. Idempotent consumers with at-least-once delivery for external sinks. Flink or Spark checkpointing for stateful aggregations across arbitrary sources.
Capacity Estimation
Transaction overhead
Kafka transactions add latency. The producer gathers records and commits atomically. Overhead depends on transaction size and transaction.timeout.ms.
# Small transactions: lower latency, higher per-transaction overhead
transaction.timeout.ms=10000
batch.size=16384
# Large transactions: higher latency, lower per-transaction overhead
transaction.timeout.ms=300000
batch.size=524288
A transaction committing every 10 seconds at 100K records/sec commits 1M records. The commit is a round-trip to the broker leader. At 10ms per round-trip, that is fine. At 500ms, it becomes a bottleneck.
Checkpoint storage
Checkpoint size drives recovery time and storage cost.
# Estimate Flink checkpoint size
# 10M state entries, 500 bytes per entry, 2x serialization overhead
state_entries=10_000_000
bytes_per_entry=500
serialization_factor=2
checkpoint_size_mb=$((state_entries * bytes_per_entry * serialization_factor / 1024 / 1024))
# ≈ 9.5 GB per checkpoint
# 3 checkpoints retained (recovery + cleanup gap):
total_storage_gb=$((checkpoint_size_mb * 3 / 1024))
# ≈ 28.5 GB
A 10GB checkpoint writing to S3 in 60 seconds requires ~170 MB/sec sustained bandwidth. Profile this before production.
Recovery time
Recovery time is checkpoint restore time plus replay time.
# Restore: read checkpoint from storage
checkpoint_size_gb=10
storage_read_bandwidth_mbps=500 # S3 or HDFS
restore_seconds=$((checkpoint_size_gb * 1024 / storage_read_bandwidth_mbps))
# ≈ 20 seconds for 10GB at 500 MB/sec
# Replay: records from the offset gap
# depends on processing rate
Flink also redistributes state across TaskManagers on recovery. A 10TB state with 100 partitions means ~100GB per TaskManager over the network. At 1Gbps, that is ~800 seconds. Use incremental checkpoints to reduce this.
Quick Recap
- Exactly-once means “output is correct despite multiple physical processing attempts” — not one physical read.
- The hard part is crossing system boundaries. Kafka-to-Kafka is tractable. Kafka-to-database requires more.
- Kafka transactions only cover Kafka. External sinks need idempotency or outbox.
- At-least-once with idempotent consumers is the pragmatic default. Reserve exactly-once for financial or inventory pipelines.
- Recovery time is checkpoint_interval plus replay. Size your interval by your recovery time objective.
Conclusion
Exactly-once semantics are achievable but not free. The key is understanding where your pipeline crosses system boundaries and adding appropriate coordination at those points.
Within Kafka: Kafka transactions provide exactly-once. With external systems: idempotent consumers or the transactional outbox pattern. For stateful processing: Flink’s checkpointing or Spark’s checkpoint-based state management.
For most pipelines, at-least-once with idempotent consumers is the pragmatic choice. It is simpler to implement and performs better. Reserve exactly-once for pipelines where duplicate output has serious consequences.
For related reading, see Apache Kafka, Kafka Streams, and Apache Flink.
Category
Related Posts
Apache Flink: Advanced Stream Processing at Scale
Apache Flink provides advanced stream processing with sophisticated windowing and event-time handling. Learn its architecture, programming model, and use cases.
Apache Spark Streaming: Micro-Batch Processing
Spark Streaming uses micro-batches for real-time processing. Learn about DStreams, Structured Streaming, watermarking, and exactly-once semantics.
Change Data Capture: Real-Time Database Tracking with CDC
CDC tracks database changes and streams them to downstream systems. Learn how Debezium, log-based CDC, and trigger-based approaches work.