Kafka Streams: Real-Time Stream Processing Applications
Kafka Streams is a client library for real-time stream processing. Learn stream primitives, state stores, exactly-once processing, and scaling.
Kafka Streams: Building Real-Time Stream Processing Applications
Kafka Streams is not a separate processing cluster you operate. It is a library that you embed in your application. Your application reads from Kafka, processes, and writes back to Kafka. No separate cluster, no resource allocation for processing, no additional operational overhead.
This makes Kafka Streams unusual among stream processing frameworks. Most frameworks (Flink, Spark Streaming) run on dedicated clusters. Kafka Streams runs on application infrastructure you already manage.
Stream Processing Basics
A stream is an unbounded sequence of records. A stream processor reads from input streams, applies transformations, and writes to output streams.
flowchart LR
Input[Input Topic] --> KS[Kafka Streams App]
KS --> Output1[Output Topic 1]
KS --> Output2[Output Topic 2]
Kafka Streams programs define processing logic using the Streams DSL or the Processor API. The DSL provides functional-style operations on streams. The Processor API provides lower-level control.
The Streams DSL
The Streams DSL exposes KStream and KTable abstractions.
KStream: An unbounded sequence of record events. Each event is an independent occurrence. The same key can appear multiple times.
KTable: A mutable view of a changelog stream. Updates to a key replace the previous value. A KTable is backed by a state store that maintains the latest value for each key.
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Grouped;
StreamsBuilder builder = new StreamsBuilder();
// KStream: unbounded sequence of page view events
KStream<String, PageViewEvent> pageViews = builder.stream("page-views");
// KTable: latest customer info per customer ID
KTable<String, CustomerInfo> customers = builder.table("customer-info",
Materialized.as("customer-info-store"));
// Join page views to customer info
KStream<String, EnrichedPageView> enrichedPageViews = pageViews
.filter((userId, event) -> event.getDuration() > 10) // filter long sessions
.join(customers, (event, info) -> new EnrichedPageView(event, info));
// Aggregate page views per user
KTable<String, Long> viewCounts = pageViews
.groupBy((userId, event) -> userId)
.count(Materialized.as("view-counts-store"));
// Write to output topics
enrichedPageViews.to("enriched-page-views");
viewCounts.toStream().to("view-counts");
Stateful Operations
Kafka Streams maintains state in state stores. State stores are local RocksDB databases (by default) that store the current state for each stream partition your application instance processes.
Stateful operations include:
- join: Join two streams or a stream with a table
- aggregate: Maintain running aggregates (count, sum, max)
- reduce: Combine records with the same key
// Maintain a running count per user with session timeout
KStream<String, Event> events = builder.stream("events");
events
.groupByKey(Grouped.with(Serdes.String(), eventSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
() -> new SessionState(0, Instant.MIN),
(key, event, state) -> state.addEvent(event),
(key, left, right) -> left.merge(right),
Materialized.as("session-store")
)
.toStream()
.to("session-output");
The state store is partitioned by key. Each Kafka Streams instance hosts a subset of partitions and thus a subset of state store partitions. When a Kafka Streams instance fails, another instance takes over its partitions and rebuilds its state store from the changelog topic.
Exactly-Once Processing
Kafka Streams provides exactly-once processing within the Kafka ecosystem. A record is processed exactly once even if failures occur.
The key mechanism is the transactional outbox pattern. Kafka Streams uses Kafka transactions to coordinate output with offset commits. If a processor crashes after writing output but before committing its offset, the transaction aborts and the output is rolled back.
// Enable exactly-once processing
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-exactly-once-app");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("input-topic");
KStream<String, String> output = input.mapValues(value -> process(value));
output.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
Exactly-once within Kafka means exactly-once relative to Kafka. If your processor reads from Kafka, writes to an external database, and then crashes before committing the Kafka offset, the external database write happened but the offset was not committed. On restart, the record is reprocessed and the external write repeats.
For true end-to-end exactly-once with external systems, you need the transactional outbox pattern. See Exactly-Once Semantics.
Scaling Kafka Streams Applications
Kafka Streams scales by running multiple instances of the same application. Each instance processes a subset of partitions. More instances means more parallelism up to the number of partitions.
flowchart LR
subgraph Kafka[Kafka Cluster]
P0[Partition 0]
P1[Partition 1]
P2[Partition 2]
end
subgraph Instances[Kafka Streams App]
I1[Instance 1: P0, P1]
I2[Instance 2: P2]
end
P0 --> I1
P1 --> I1
P2 --> I2
Adding instances is handled automatically. When an instance joins, partitions are rebalanced. When an instance leaves, its partitions are reassigned. The state store for each partition is recreated from the changelog.
The scaling ceiling is the number of partitions. If you have 6 partitions, you can have at most 6 Kafka Streams instances processing in parallel. This is one reason partition count matters for Kafka applications.
Interactive Queries
Kafka Streams exposes state stores via interactive queries. Your application can query its local state store directly without going through Kafka.
// Expose state store as a queryable API
ReadOnlyKeyValueStore<String, SessionState> store =
streams.store(
QueryableStoreTypes.keyValueStore(),
StoreQueryParameters.fromNameWithType("session-store", SessionState.class)
);
SessionState state = store.get("user-123");
This is powerful for building real-time applications that need low-latency access to computed state. A sessionization application can serve session data directly from its local state store without querying a database.
Anti-Patterns and Common Mistakes
Storing too much in state stores
State stores are local RocksDB databases. They are designed for incremental state updates, not for storing millions of records per partition. If your state store grows unbounded, your application runs out of disk and crashes.
Always use windowed state stores with appropriate retention. Set grace periods correctly so that late arriving data does not create infinitely growing windows.
Not handling out-of-order data
Kafka guarantees ordering within a partition. If your processing logic depends on temporal ordering across partitions, you must handle out-of-order arrivals explicitly.
Using global state stores when partitioned would work
Global state stores replicate all data to every Kafka Streams instance. They are convenient but do not scale. Use partitioned state stores whenever possible.
When to Use Kafka Streams
Kafka Streams is appropriate when:
- Your entire pipeline is Kafka-based (sources and sinks are Kafka topics)
- You want to embed processing in your application rather than operate a separate cluster
- You need exactly-once within the Kafka ecosystem
- Your processing is relatively straightforward (filter, map, join, aggregate)
Kafka Streams is not appropriate when:
- You need complex event processing (CEP) with patterns like sliding windows, sessionization with complex rules
- You need to integrate with multiple heterogeneous sources (Kafka, Kinesis, Pub/Sub, file systems)
- You need advanced windowing (count windows, multiple windows, late triggers)
Trade-off Table: Kafka Streams vs Flink vs Spark Streaming
| Aspect | Kafka Streams | Apache Flink | Spark Streaming |
|---|---|---|---|
| Deployment model | Embedded library | Separate cluster | Separate cluster |
| Operational complexity | Low | High | High |
| State management | Local RocksDB | Distributed RocksDB | In-memory or tiered |
| Exactly-once guarantee | Within Kafka only | End-to-end | Micro-batch exactly-once |
| Latency | Sub-millisecond | Sub-millisecond | ~500ms (micro-batch) |
| Windowing support | Time, session, sliding | Time, session, count, global | Time only (micro-batch) |
| CEP / pattern detection | Limited | Rich (MATCH_RECOGNIZE) | No |
| Job recovery | Task restart from checkpoint | Fine-grained restart | Checkpoint restart |
| Scaling | Partition count ceiling | Rescales dynamically | Partition count ceiling |
| Kafka dependency | Required | Optional (many connectors) | Optional |
| Best for | Kafka-native microservices | Complex event processing, large-scale streaming | Batch-oriented streaming, legacy migration |
Kafka Streams works best when everything lives in Kafka and you want to keep operations simple. Flink handles complex event patterns and large-scale state. Spark Streaming suits teams moving from batch Spark who want a familiar API.
Production Failure Scenarios
Stateful Kafka Streams applications fail in ways stateless ones do not. These are predictable failure modes — plan for them before they happen.
State store corruption
RocksDB can corrupt after a disk write failure or an unclean shutdown. When the application restarts, it crashes trying to read the corrupted store.
Mitigation: enable RocksDB checksums (options.verify_checksums = true). Monitor disk health. If corruption happens, the store rebuilds from the changelog topic. Recovery time is changelog_lag / restore_throughput — a 10M row changelog restoring at 50K rows/sec means about three minutes before the app can serve traffic again.
Rebalance deadlock
An unclean shutdown (SIGKILL, OOM kill) triggers a rebalance. During rebalance, no instance processes the affected partitions. If the shutdown itself was caused by a processing bug — say, a join that deadlocked on a shared resource — new instances taking over hit the same bug and also crash.
Mitigation: idempotent processing so reprocessing is safe. Set max.poll.interval.ms high enough to absorb processing spikes. Use exactly_once_v2 so offsets and output commit atomically.
Duplicate output from crash during stateful join
A stateful join holds state in a repartition topic and co-partitioned state store. If the instance crashes after writing to the output topic but before committing the offset, the offset rolls back. On restart the join re-executes and produces duplicates.
Mitigation: exactly_once_v2 guarantee. Design downstream consumers idempotently — use primary key upserts, not inserts.
Silent data loss from grace period violations
Late-arriving data beyond the grace period drops silently. If your watermark config mixes event time and wall clock time, you may silently lose events with no exception raised.
Mitigation: monitor late-record-dropping per task. Alert on watermark stalling — a frozen watermark means late events are piling up and being dropped.
Capacity Estimation
State store sizing
Each state store partition holds keys distributed by your partition count. Work out the per-partition size before production.
// State store size estimation per partition
// Assumptions: 100M total keys, 100 partitions, 500 bytes per value, 2x RocksDB overhead
long totalKeys = 100_000_000L;
int numPartitions = 100;
long keysPerPartition = totalKeys / numPartitions; // 1M keys per partition
long bytesPerValue = 500L;
long rocksDBOverheadFactor = 2L; // indexes, bloom filters, tombstones
long stateStoreBytes = keysPerPartition * bytesPerValue * rocksDBOverheadFactor;
// ≈ 1GB per partition per instance
// With 3 instances (co-partitioned), each instance holds ~1GB of state
// Add 50% for changelog backlog during recovery
long totalLocalStorageNeeded = stateStoreBytes * 1.5; // ~1.5GB per instance
Sizing checklist: divide total keys by partitions to get keys per partition per instance, multiply by average value size and 2x for RocksDB overhead, then add 1.5-2x buffer for the changelog backlog. On disk, RocksDB is typically 1.5-3x its in-memory size. Use the state-store-size metric to validate against your estimate.
Throughput
Kafka Streams scales linearly with partition count until you hit a bottleneck — CPU, network, or disk IO.
| Operation | Records/sec per core |
|---|---|
| Stateless map/filter | 50–100K |
| Stateful join/aggregate | 10–30K |
| Windowed sessionization | 5–15K |
An 8-core machine hits 400–800K records/sec stateless. Stateful work drops that to 80–240K records/sec. Profile with your actual data shapes — the variance is significant.
Observability Checklist
Monitor these metrics in production:
Throughput: records-consumed-rate, records-produced-rate, bytes-consumed-rate. Sudden drops mean a rebalance is in progress or an upstream producer failed.
State store: state-store-size per task. Growing stores without a corresponding business reason mean missing cleanup or an expanding key set.
Lag: consumer-lag per partition. Lag that grows without bound means the app cannot keep up with incoming data.
Rebalance: commit-latency and rebalance-total-count. Frequent rebalances without instance churn point to a processing deadlock or timeout.
Watermark: watermark-entered-latest-timestamp and late-record-dropping-per-second. Non-zero late drops means your grace period is misconfigured or event time is lagging behind wall clock.
Alert thresholds: rebalance count above 3 per hour outside planned restarts, consumer lag growing for more than 10 minutes, state store size growing more than 10% day-over-day without explanation.
Quick Recap
- Kafka Streams is a library, not a cluster — runs inside your application.
- Size state stores before production. RocksDB grows until it hits disk limits.
- Exactly-once only covers Kafka-to-Kafka. External database writes need the transactional outbox pattern.
- Partition count is your scaling ceiling — pick it with growth in mind.
- Rebalances pause processing. Avoid unclean shutdowns to keep them rare.
- Watch state store size, consumer lag, and rebalance frequency as your primary signals.
Conclusion
Kafka Streams is a powerful library for building stream processing applications that integrate tightly with Kafka. It provides exactly-once processing, stateful operations, and elastic scaling within the Kafka ecosystem.
The operational simplicity of Kafka Streams is its biggest advantage. There is no processing cluster to manage, no resource allocation to tune. The application is just another Kafka consumer group.
The limitation is the Kafka boundary. When your processing must span multiple systems, or when you need advanced windowing features, dedicated stream processing frameworks like Apache Flink become necessary.
Category
Related Posts
Apache Flink: Advanced Stream Processing at Scale
Apache Flink provides advanced stream processing with sophisticated windowing and event-time handling. Learn its architecture, programming model, and use cases.
Apache Spark Streaming: Micro-Batch Processing
Spark Streaming uses micro-batches for real-time processing. Learn about DStreams, Structured Streaming, watermarking, and exactly-once semantics.
Change Data Capture: Real-Time Database Tracking with CDC
CDC tracks database changes and streams them to downstream systems. Learn how Debezium, log-based CDC, and trigger-based approaches work.