Apache Spark Streaming: Micro-Batch Processing
Spark Streaming uses micro-batches for real-time processing. Learn about DStreams, Structured Streaming, watermarking, and exactly-once semantics.
Apache Spark Streaming: Micro-Batch Processing with DStreams and Structured Streaming
Apache Spark started as a batch processing engine. Adding streaming to Spark meant rethinking the streaming problem as fast batches. Instead of processing one event at a time, Spark Streaming processes micro-batches: small batches of events collected over short intervals (seconds or sub-seconds).
This micro-batch approach gives Spark Streaming the full power of the Spark ecosystem for streaming workloads. If your team already knows Spark, Spark Streaming is a natural extension. The tradeoff is latency. True streaming frameworks process event-by-event with sub-second latency. Spark Streaming processes micro-batches with typical latencies of 1-5 seconds.
Two Streaming APIs
Spark Streaming has two APIs:
DStreams (Discretized Streams): The original streaming API built on RDDs. DStreams break the stream into micro-batches represented as sequences of RDDs.
Structured Streaming: The newer API built on the Spark SQL engine. Structured Streaming treats streaming data as an unbounded table, letting you write streaming queries using the same SQL API you use for batch queries.
Structured Streaming is the recommended API for new development. It is more expressive, better optimized, and has richer support for event-time processing and watermarking.
Structured Streaming Fundamentals
Structured Streaming represents a stream as an unbounded table. New data arrives as new rows appended to the table. Queries on the table run continuously, producing results incrementally.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
spark = SparkSession.builder \
.appName("streaming-analytics") \
.getOrCreate()
# Define streaming source
lines = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Parse JSON events
events = lines.select(
col("value").cast("string").alias("json")
).select(
from_json(col("json"), event_schema).alias("data")
).select("data.*")
# Windowed aggregation
windowed_counts = events \
.groupBy(
window(col("event_time"), "5 minutes"),
col("user_id")
) \
.count()
# Write to sink
query = windowed_counts.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "aggregated-events") \
.option("checkpointLocation", "s3://spark-checkpoints/") \
.outputMode("complete") \
.start()
query.awaitTermination()
The query runs continuously. Every trigger interval (default: 500ms), Spark checks for new data in Kafka, processes it, updates the result, and writes to the sink.
Output Modes and Triggers
Structured Streaming supports different output modes:
Append mode: Only new rows appended to the result table are written to the sink. Appropriate for stateless queries where rows are never updated.
Complete mode: The entire result table is written to the sink on every trigger. Appropriate for aggregations where the complete result changes on every trigger.
Update mode: Only rows that changed since the last trigger are written to the sink.
# Append mode: stateless stream processing
query = events \
.filter(col("event_type") == "purchase") \
.writeStream \
.format("parquet") \
.option("path", "s3://warehouse/purchases/") \
.option("checkpointLocation", "s3://spark-checkpoints/purchases/") \
.outputMode("append") \
.trigger(processingTime="2 seconds") \
.start()
# Complete mode: streaming aggregation with complete result written each time
aggregated = events \
.groupBy("product_id") \
.agg(sum("amount").alias("total_sales"))
query = aggregated \
.writeStream \
.format("memory") \
.queryName("product_sales") \
.outputMode("complete") \
.start()
Event Time and Watermarking
Like Flink, Structured Streaming handles event time and late-arriving data through watermarks.
from pyspark.sql.functions import window, timestamp_seconds
# Define watermark on event time
events_with_watermark = events \
.select(
col("user_id"),
col("event_time").cast("timestamp").alias("event_time"),
col("amount")
) \
.withWatermark("event_time", "30 seconds") # Allow 30 seconds late
# Windowed aggregation with watermark
# Late data after watermark is dropped
windowed_revenue = events_with_watermark \
.groupBy(
window(col("event_time"), "5 minutes"),
col("product_id")
) \
.agg(sum("amount").alias("revenue"))
# Use append mode with watermark to allow late data updates
query = windowed_revenue \
.writeStream \
.format("parquet") \
.option("path", "s3://warehouse/windowed-revenue/") \
.option("checkpointLocation", "s3://spark-checkpoints/windowed-revenue/") \
.outputMode("append") \
.start()
The watermark tells Spark when it can drop state for old event times. Without watermarks, Spark must maintain state forever for aggregations over unbounded streams. With watermarks, Spark drops state once it is confident no more late data will arrive for that time window.
State Management
Structured Streaming manages state automatically for aggregations. For more complex stateful operations, you can use the mapGroupsWithState or flatMapGroupsWithState APIs.
from pyspark.sql.functions import col
from pyspark.sql.streaming import GroupState
def update_session_info(key, rows, state):
"""
Maintain session state for each user.
Sessions timeout after 10 minutes of inactivity.
"""
if state.exists:
session = state.get
last_time = session["last_time"]
else:
session = {"events": [], "total_amount": 0.0}
last_time = None
for row in rows:
event_time = row["event_time"]
amount = row["amount"]
# If more than 10 minutes since last event, start new session
if last_time and (event_time - last_time).seconds > 600:
# Emit old session
yield session
session = {"events": [], "total_amount": 0.0}
session["events"].append(row["event_id"])
session["total_amount"] += amount
last_time = event_time
state.update(session)
return session
sessionized = events \
.groupBy("user_id") \
.applyInPysparkWithState(
update_session_info,
GroupStateTimeout.eventTimeTimeout(),
"updateSessionInfo"
)
Integration with Kafka
Structured Streaming integrates deeply with Kafka for both sources and sinks.
# Read from Kafka with starting offsets
kafka_source = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "input-topic") \
.option("startingOffsets", "earliest") \
.load()
# Write to Kafka with exactly-once
kafka_sink = output.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output-topic") \
.option("checkpointLocation", "s3://spark-checkpoints/kafka-sink/") \
.option("kafka.enable.idempotence", "true") \
.start()
Exactly-Once in Spark Streaming
Spark Streaming provides exactly-once processing when:
- The source is Kafka with offset committed to Kafka (not Spark checkpoints)
- The sink supports transactions or idempotent writes
- Exactly-once semantics are enabled in the configuration
spark = SparkSession.builder \
.appName("exactly-once-pipeline") \
.config("spark.sql.streaming.checkpointLocation", "s3://spark-checkpoints/") \
.getOrCreate()
# Read from Kafka with offset management in Kafka (not Spark)
# This ensures offsets are committed after successful processing
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "input-topic") \
.option("kafka.startingOffsets", "latest") \
.option("kafka.enable.auto.commit", "false") \
.load()
# Process and write to a sink that supports exactly-once
# (Kafka with idempotence, or transactional databases)
output_df = process(kafka_df)
# Idempotent Kafka sink ensures exactly-once end-to-end
query = output_df \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output-topic") \
.option("kafka.enable.idempotence", "true") \
.option("checkpointLocation", "s3://spark-checkpoints/exactly-once/") \
.start()
For deeper discussion of exactly-once semantics, see Exactly-Once Semantics.
Performance Tuning
Spark Streaming performance depends on:
Batch interval: Smaller batches mean lower latency but higher overhead. The sweet spot is typically 1-5 seconds for most workloads.
Partitioning: The number of Spark partitions controls parallelism. More partitions mean more parallelism but also more overhead.
State store backend: By default, Spark uses RocksDB for state stores in structured streaming. This can be tuned for memory vs. performance trade-offs.
spark = SparkSession.builder \
.config("spark.sql.streaming.stateStore.stateSchema", "schema") \
.config("spark.sql.streaming RocksDB.store.state-schema", "schema") \
.getOrCreate()
When to Use Spark Streaming
Spark Streaming is appropriate when:
- Your team already knows Spark and wants to use the same APIs for streaming and batch
- You need tight integration with the Spark ecosystem (MLlib, Spark SQL)
- Latency in the 1-5 second range is acceptable
- You need to process both streaming and batch data with the same engine
Spark Streaming is not ideal when:
- Sub-second latency is required (use Flink or Kafka Streams)
- Your workload is primarily event-by-event processing (not micro-batch aggregations)
- You need very sophisticated windowing (session windows with complex gap logic)
Structured Streaming Architecture
Structured Streaming processes data as a series of micro-batches. Each batch triggers on a schedule, reads new data from sources, applies transformations, and writes outputs:
flowchart TD
subgraph Sources[Streaming Sources]
Kafka[Kafka Topics]
Kinesis[Amazon Kinesis]
Socket[Socket Source]
end
subgraph MicroBatch[Micro-Batch Trigger]
Query[Continuous Query on Unbounded Table]
end
subgraph Sinks[Streaming Sinks]
KafkaOut[Kafka]
Parquet[S3 / Parquet]
Memory[Memory / Delta]
end
subgraph State[State Store]
RocksDB[(RocksDB)]
Checkpoint[(Checkpoint S3)]
end
Kafka --> Query
Kinesis --> Query
Query -->|state| RocksDB
Query -->|checkpoint| Checkpoint
Query --> KafkaOut
Query --> Parquet
Query --> Memory
Every trigger interval, Spark reads new data from sources, runs the query plan, updates state in RocksDB, writes outputs to sinks, and commits offsets. The checkpoint on S3 stores offsets, state, and watermark positions for recovery.
Production Failure Scenarios
Checkpoint corruption
Spark checkpoints store JVM state, deserialized objects, and offsets. Change the query plan in an application update and the checkpoint format changes. Spark throws an exception on restart and refuses to resume.
The fix: treat checkpoints as tightly coupled to the exact query that created them. After any plan change, start fresh from source or use explicit checkpoint migration tooling. Checkpoints do not survive arbitrary upgrades.
State store OOM with unbounded aggregations
Structured Streaming maintains state for aggregations with watermarks. Set the watermark too permissive and state accumulates faster than it purges. A traffic spike can fill the state store to OOM.
Monitor numRowsInStateStore and lastCommitNumStateEntries. Set watermark bounds conservatively. Alert on state size growing beyond expected ranges between triggers.
Silent data loss from misconfigured watermarks
Once the watermark advances past a window’s event time, Spark drops that window’s late data. If the watermark is driven by processing time instead of event time, or set too aggressively, valid data disappears with no error raised.
Always use event time for watermark assignment. Validate watermark behavior with test data before going to production. If lateRowsDroppedByWatermark is non-zero, something is wrong.
Offset drift from dual commit strategies
With both Spark checkpointing and Kafka auto-commit enabled, the two mechanisms track independently. On restart, they disagree on what was committed. Records get skipped or duplicated.
Set kafka.enable.auto.commit to false when using Spark checkpointing. One mechanism only.
S3 eventual consistency during recovery
S3’s list-after-write consistency means new checkpoint files may not appear in listings immediately. Spark lists S3 to find checkpoint files on restart and may miss files, reprocessing from the beginning.
Use S3Guard on AWS, or HDFS for checkpoint storage if you need strong consistency.
Trade-off Table: Spark Streaming vs Flink vs Kafka Streams
| Aspect | Spark Streaming | Apache Flink | Kafka Streams |
|---|---|---|---|
| Processing model | Micro-batch | Native streaming | Native streaming |
| Latency | 1–5 seconds | Sub-second | Sub-second |
| State backend | RocksDB | Distributed RocksDB | Local RocksDB |
| Exactly-once | Micro-batch guarantee | End-to-end | Kafka-only |
| Event time | Watermarks | Native | Supported |
| Windowing | Time windows | Time, session, count, global | Time, session, sliding |
| SQL support | Spark SQL | Native Table API + SQL | No |
| Operational complexity | Medium | High | Low |
| Scaling | Partition-based | Fine-grained | Partition count ceiling |
| Best for | Batch-to-stream migration, Spark teams | Complex event processing | Kafka-native microservices |
Spark Streaming wins for teams already on Spark who want to extend to streaming. Flink wins for low latency and complex event patterns. Kafka Streams wins for pure Kafka pipelines where operational overhead matters most.
Observability Checklist
Track these metrics on every production Spark Streaming deployment:
Throughput: inputRowsPerSecond, processedRowsPerSecond. If processed is significantly below input, the batch interval is too short or the query is overloaded.
State metrics: numRowsInStateStore, lastCommitNumStateEntries. State size growing without bound means watermark is not purging old data.
Latency: latestOffset Latency, currentProcessingTime minus currentEventTime. Large gaps indicate processing lag behind event time.
Watermark: watermarkMs per query. lateRowsDroppedByWatermark — non-zero means data is being silently lost.
# Checkpoint and state store metrics via Spark UI or API
# StreamingQuery progress shows:
# - "statefulOperators" -> "numRowsInStateStore"
# - "eventTimeWatermark"
# - "duration" per pipeline stage
for query in spark.streams.active:
progress = query.lastProgress()
print(f"Query: {query.name}")
print(f" State store rows: {progress['statefulOperators'][0]['numRowsInStateStore']}")
print(f" Watermark: {progress['eventTimeWatermark']}")
print(f" Duration: {progress['duration']}")
Alert on: state store rows growing more than 10% between triggers, lateRowsDroppedByWatermark above zero, processing lag exceeding 2x the batch interval, any query in FAILED state.
Common Anti-Patterns
Setting batch interval by habit
A 500ms batch interval on a query that processes millions of rows per second creates more overhead than throughput. Spark spends more time scheduling batches than doing actual work. Size the batch interval based on your data volume and SLA, not the default.
Using complete mode on high-cardinality aggregations
Complete mode writes the entire result table on every trigger. On a query that aggregates by user ID with millions of users, every trigger writes millions of rows to the sink. The sink becomes the bottleneck, and the pipeline falls behind.
Use complete mode only when the result set is small. For high-cardinality aggregations, use append or update mode.
Ignoring watermark behavior on restart
On restart from checkpoint, Spark replays the entire stateful query from the last committed offset. If the watermark advanced significantly before the crash, state is reprocessed but the watermark alignment may not match. This causes duplicate state entries or incorrect aggregations during the recovery batch.
Validate checkpoint state after any restart before treating the output as authoritative.
Mixing checkpoint-based and auto-commit offset management
Using both spark.sql.streaming.checkpointLocation and kafka.enable.auto.commit = true creates two independent offset tracks. On restart, Spark may skip or duplicate records depending on which committed to Kafka first.
Choose one offset management strategy: Spark checkpointing (recommended) or Kafka offset auto-commit, never both.
Quick Recap
- Spark Streaming uses micro-batches — 1–5 seconds of latency is the floor, not a bug.
- Structured Streaming is the modern API — DStreams is legacy.
- Use event time watermarks, not processing time — otherwise you silently drop valid late data.
- Size the batch interval to your data volume, not to the default.
- Checkpoints store state and offsets — treat them as critical infrastructure, not an afterthought.
- Choose one offset management strategy (Spark checkpoint OR Kafka auto-commit), never both.
Conclusion
Spark Streaming brings the Spark ecosystem to stream processing. The micro-batch model trades latency for throughput and operational simplicity. If your team knows Spark, you can extend that knowledge to streaming workloads.
Structured Streaming is the modern API, with a clean table-based mental model and good support for event time and watermarking. The integration with Kafka is solid, and exactly-once processing is achievable with proper configuration.
For related reading on other stream processing frameworks, see Apache Kafka and Apache Flink.
Category
Related Posts
Apache Flink: Advanced Stream Processing at Scale
Apache Flink provides advanced stream processing with sophisticated windowing and event-time handling. Learn its architecture, programming model, and use cases.
Apache Spark: The Workhorse of Distributed Data Processing
A deep dive into Apache Spark's architecture, RDDs, DataFrames, and how it processes massive datasets across clusters at unprecedented scale.
Exactly-Once Semantics in Stream Processing
Exactly-once ensures each event processes without duplicates or loss. Learn how it works, when you need it, and the true cost of implementation.