Apache Spark Streaming: Micro-Batch Processing

Spark Streaming uses micro-batches for real-time processing. Learn about DStreams, Structured Streaming, watermarking, and exactly-once semantics.

published: reading time: 12 min read

Apache Spark Streaming: Micro-Batch Processing with DStreams and Structured Streaming

Apache Spark started as a batch processing engine. Adding streaming to Spark meant rethinking the streaming problem as fast batches. Instead of processing one event at a time, Spark Streaming processes micro-batches: small batches of events collected over short intervals (seconds or sub-seconds).

This micro-batch approach gives Spark Streaming the full power of the Spark ecosystem for streaming workloads. If your team already knows Spark, Spark Streaming is a natural extension. The tradeoff is latency. True streaming frameworks process event-by-event with sub-second latency. Spark Streaming processes micro-batches with typical latencies of 1-5 seconds.

Two Streaming APIs

Spark Streaming has two APIs:

DStreams (Discretized Streams): The original streaming API built on RDDs. DStreams break the stream into micro-batches represented as sequences of RDDs.

Structured Streaming: The newer API built on the Spark SQL engine. Structured Streaming treats streaming data as an unbounded table, letting you write streaming queries using the same SQL API you use for batch queries.

Structured Streaming is the recommended API for new development. It is more expressive, better optimized, and has richer support for event-time processing and watermarking.

Structured Streaming Fundamentals

Structured Streaming represents a stream as an unbounded table. New data arrives as new rows appended to the table. Queries on the table run continuously, producing results incrementally.

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col

spark = SparkSession.builder \
    .appName("streaming-analytics") \
    .getOrCreate()

# Define streaming source
lines = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Parse JSON events
events = lines.select(
    col("value").cast("string").alias("json")
).select(
    from_json(col("json"), event_schema).alias("data")
).select("data.*")

# Windowed aggregation
windowed_counts = events \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("user_id")
    ) \
    .count()

# Write to sink
query = windowed_counts.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "aggregated-events") \
    .option("checkpointLocation", "s3://spark-checkpoints/") \
    .outputMode("complete") \
    .start()

query.awaitTermination()

The query runs continuously. Every trigger interval (default: 500ms), Spark checks for new data in Kafka, processes it, updates the result, and writes to the sink.

Output Modes and Triggers

Structured Streaming supports different output modes:

Append mode: Only new rows appended to the result table are written to the sink. Appropriate for stateless queries where rows are never updated.

Complete mode: The entire result table is written to the sink on every trigger. Appropriate for aggregations where the complete result changes on every trigger.

Update mode: Only rows that changed since the last trigger are written to the sink.

# Append mode: stateless stream processing
query = events \
    .filter(col("event_type") == "purchase") \
    .writeStream \
    .format("parquet") \
    .option("path", "s3://warehouse/purchases/") \
    .option("checkpointLocation", "s3://spark-checkpoints/purchases/") \
    .outputMode("append") \
    .trigger(processingTime="2 seconds") \
    .start()

# Complete mode: streaming aggregation with complete result written each time
aggregated = events \
    .groupBy("product_id") \
    .agg(sum("amount").alias("total_sales"))

query = aggregated \
    .writeStream \
    .format("memory") \
    .queryName("product_sales") \
    .outputMode("complete") \
    .start()

Event Time and Watermarking

Like Flink, Structured Streaming handles event time and late-arriving data through watermarks.

from pyspark.sql.functions import window, timestamp_seconds

# Define watermark on event time
events_with_watermark = events \
    .select(
        col("user_id"),
        col("event_time").cast("timestamp").alias("event_time"),
        col("amount")
    ) \
    .withWatermark("event_time", "30 seconds")  # Allow 30 seconds late

# Windowed aggregation with watermark
# Late data after watermark is dropped
windowed_revenue = events_with_watermark \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("product_id")
    ) \
    .agg(sum("amount").alias("revenue"))

# Use append mode with watermark to allow late data updates
query = windowed_revenue \
    .writeStream \
    .format("parquet") \
    .option("path", "s3://warehouse/windowed-revenue/") \
    .option("checkpointLocation", "s3://spark-checkpoints/windowed-revenue/") \
    .outputMode("append") \
    .start()

The watermark tells Spark when it can drop state for old event times. Without watermarks, Spark must maintain state forever for aggregations over unbounded streams. With watermarks, Spark drops state once it is confident no more late data will arrive for that time window.

State Management

Structured Streaming manages state automatically for aggregations. For more complex stateful operations, you can use the mapGroupsWithState or flatMapGroupsWithState APIs.

from pyspark.sql.functions import col
from pyspark.sql.streaming import GroupState

def update_session_info(key, rows, state):
    """
    Maintain session state for each user.
    Sessions timeout after 10 minutes of inactivity.
    """
    if state.exists:
        session = state.get
        last_time = session["last_time"]
    else:
        session = {"events": [], "total_amount": 0.0}
        last_time = None

    for row in rows:
        event_time = row["event_time"]
        amount = row["amount"]

        # If more than 10 minutes since last event, start new session
        if last_time and (event_time - last_time).seconds > 600:
            # Emit old session
            yield session
            session = {"events": [], "total_amount": 0.0}

        session["events"].append(row["event_id"])
        session["total_amount"] += amount
        last_time = event_time

    state.update(session)
    return session

sessionized = events \
    .groupBy("user_id") \
    .applyInPysparkWithState(
        update_session_info,
        GroupStateTimeout.eventTimeTimeout(),
        "updateSessionInfo"
    )

Integration with Kafka

Structured Streaming integrates deeply with Kafka for both sources and sinks.

# Read from Kafka with starting offsets
kafka_source = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input-topic") \
    .option("startingOffsets", "earliest") \
    .load()

# Write to Kafka with exactly-once
kafka_sink = output.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output-topic") \
    .option("checkpointLocation", "s3://spark-checkpoints/kafka-sink/") \
    .option("kafka.enable.idempotence", "true") \
    .start()

Exactly-Once in Spark Streaming

Spark Streaming provides exactly-once processing when:

  • The source is Kafka with offset committed to Kafka (not Spark checkpoints)
  • The sink supports transactions or idempotent writes
  • Exactly-once semantics are enabled in the configuration
spark = SparkSession.builder \
    .appName("exactly-once-pipeline") \
    .config("spark.sql.streaming.checkpointLocation", "s3://spark-checkpoints/") \
    .getOrCreate()

# Read from Kafka with offset management in Kafka (not Spark)
# This ensures offsets are committed after successful processing
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input-topic") \
    .option("kafka.startingOffsets", "latest") \
    .option("kafka.enable.auto.commit", "false") \
    .load()

# Process and write to a sink that supports exactly-once
# (Kafka with idempotence, or transactional databases)
output_df = process(kafka_df)

# Idempotent Kafka sink ensures exactly-once end-to-end
query = output_df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output-topic") \
    .option("kafka.enable.idempotence", "true") \
    .option("checkpointLocation", "s3://spark-checkpoints/exactly-once/") \
    .start()

For deeper discussion of exactly-once semantics, see Exactly-Once Semantics.

Performance Tuning

Spark Streaming performance depends on:

Batch interval: Smaller batches mean lower latency but higher overhead. The sweet spot is typically 1-5 seconds for most workloads.

Partitioning: The number of Spark partitions controls parallelism. More partitions mean more parallelism but also more overhead.

State store backend: By default, Spark uses RocksDB for state stores in structured streaming. This can be tuned for memory vs. performance trade-offs.

spark = SparkSession.builder \
    .config("spark.sql.streaming.stateStore.stateSchema", "schema") \
    .config("spark.sql.streaming RocksDB.store.state-schema", "schema") \
    .getOrCreate()

When to Use Spark Streaming

Spark Streaming is appropriate when:

  • Your team already knows Spark and wants to use the same APIs for streaming and batch
  • You need tight integration with the Spark ecosystem (MLlib, Spark SQL)
  • Latency in the 1-5 second range is acceptable
  • You need to process both streaming and batch data with the same engine

Spark Streaming is not ideal when:

  • Sub-second latency is required (use Flink or Kafka Streams)
  • Your workload is primarily event-by-event processing (not micro-batch aggregations)
  • You need very sophisticated windowing (session windows with complex gap logic)

Structured Streaming Architecture

Structured Streaming processes data as a series of micro-batches. Each batch triggers on a schedule, reads new data from sources, applies transformations, and writes outputs:

flowchart TD
    subgraph Sources[Streaming Sources]
        Kafka[Kafka Topics]
        Kinesis[Amazon Kinesis]
        Socket[Socket Source]
    end
    subgraph MicroBatch[Micro-Batch Trigger]
        Query[Continuous Query on Unbounded Table]
    end
    subgraph Sinks[Streaming Sinks]
        KafkaOut[Kafka]
        Parquet[S3 / Parquet]
        Memory[Memory / Delta]
    end
    subgraph State[State Store]
        RocksDB[(RocksDB)]
        Checkpoint[(Checkpoint S3)]
    end
    Kafka --> Query
    Kinesis --> Query
    Query -->|state| RocksDB
    Query -->|checkpoint| Checkpoint
    Query --> KafkaOut
    Query --> Parquet
    Query --> Memory

Every trigger interval, Spark reads new data from sources, runs the query plan, updates state in RocksDB, writes outputs to sinks, and commits offsets. The checkpoint on S3 stores offsets, state, and watermark positions for recovery.

Production Failure Scenarios

Checkpoint corruption

Spark checkpoints store JVM state, deserialized objects, and offsets. Change the query plan in an application update and the checkpoint format changes. Spark throws an exception on restart and refuses to resume.

The fix: treat checkpoints as tightly coupled to the exact query that created them. After any plan change, start fresh from source or use explicit checkpoint migration tooling. Checkpoints do not survive arbitrary upgrades.

State store OOM with unbounded aggregations

Structured Streaming maintains state for aggregations with watermarks. Set the watermark too permissive and state accumulates faster than it purges. A traffic spike can fill the state store to OOM.

Monitor numRowsInStateStore and lastCommitNumStateEntries. Set watermark bounds conservatively. Alert on state size growing beyond expected ranges between triggers.

Silent data loss from misconfigured watermarks

Once the watermark advances past a window’s event time, Spark drops that window’s late data. If the watermark is driven by processing time instead of event time, or set too aggressively, valid data disappears with no error raised.

Always use event time for watermark assignment. Validate watermark behavior with test data before going to production. If lateRowsDroppedByWatermark is non-zero, something is wrong.

Offset drift from dual commit strategies

With both Spark checkpointing and Kafka auto-commit enabled, the two mechanisms track independently. On restart, they disagree on what was committed. Records get skipped or duplicated.

Set kafka.enable.auto.commit to false when using Spark checkpointing. One mechanism only.

S3 eventual consistency during recovery

S3’s list-after-write consistency means new checkpoint files may not appear in listings immediately. Spark lists S3 to find checkpoint files on restart and may miss files, reprocessing from the beginning.

Use S3Guard on AWS, or HDFS for checkpoint storage if you need strong consistency.

AspectSpark StreamingApache FlinkKafka Streams
Processing modelMicro-batchNative streamingNative streaming
Latency1–5 secondsSub-secondSub-second
State backendRocksDBDistributed RocksDBLocal RocksDB
Exactly-onceMicro-batch guaranteeEnd-to-endKafka-only
Event timeWatermarksNativeSupported
WindowingTime windowsTime, session, count, globalTime, session, sliding
SQL supportSpark SQLNative Table API + SQLNo
Operational complexityMediumHighLow
ScalingPartition-basedFine-grainedPartition count ceiling
Best forBatch-to-stream migration, Spark teamsComplex event processingKafka-native microservices

Spark Streaming wins for teams already on Spark who want to extend to streaming. Flink wins for low latency and complex event patterns. Kafka Streams wins for pure Kafka pipelines where operational overhead matters most.

Observability Checklist

Track these metrics on every production Spark Streaming deployment:

Throughput: inputRowsPerSecond, processedRowsPerSecond. If processed is significantly below input, the batch interval is too short or the query is overloaded.

State metrics: numRowsInStateStore, lastCommitNumStateEntries. State size growing without bound means watermark is not purging old data.

Latency: latestOffset Latency, currentProcessingTime minus currentEventTime. Large gaps indicate processing lag behind event time.

Watermark: watermarkMs per query. lateRowsDroppedByWatermark — non-zero means data is being silently lost.

# Checkpoint and state store metrics via Spark UI or API
# StreamingQuery progress shows:
# - "statefulOperators" -> "numRowsInStateStore"
# - "eventTimeWatermark"
# - "duration" per pipeline stage

for query in spark.streams.active:
    progress = query.lastProgress()
    print(f"Query: {query.name}")
    print(f"  State store rows: {progress['statefulOperators'][0]['numRowsInStateStore']}")
    print(f"  Watermark: {progress['eventTimeWatermark']}")
    print(f"  Duration: {progress['duration']}")

Alert on: state store rows growing more than 10% between triggers, lateRowsDroppedByWatermark above zero, processing lag exceeding 2x the batch interval, any query in FAILED state.

Common Anti-Patterns

Setting batch interval by habit

A 500ms batch interval on a query that processes millions of rows per second creates more overhead than throughput. Spark spends more time scheduling batches than doing actual work. Size the batch interval based on your data volume and SLA, not the default.

Using complete mode on high-cardinality aggregations

Complete mode writes the entire result table on every trigger. On a query that aggregates by user ID with millions of users, every trigger writes millions of rows to the sink. The sink becomes the bottleneck, and the pipeline falls behind.

Use complete mode only when the result set is small. For high-cardinality aggregations, use append or update mode.

Ignoring watermark behavior on restart

On restart from checkpoint, Spark replays the entire stateful query from the last committed offset. If the watermark advanced significantly before the crash, state is reprocessed but the watermark alignment may not match. This causes duplicate state entries or incorrect aggregations during the recovery batch.

Validate checkpoint state after any restart before treating the output as authoritative.

Mixing checkpoint-based and auto-commit offset management

Using both spark.sql.streaming.checkpointLocation and kafka.enable.auto.commit = true creates two independent offset tracks. On restart, Spark may skip or duplicate records depending on which committed to Kafka first.

Choose one offset management strategy: Spark checkpointing (recommended) or Kafka offset auto-commit, never both.

Quick Recap

  • Spark Streaming uses micro-batches — 1–5 seconds of latency is the floor, not a bug.
  • Structured Streaming is the modern API — DStreams is legacy.
  • Use event time watermarks, not processing time — otherwise you silently drop valid late data.
  • Size the batch interval to your data volume, not to the default.
  • Checkpoints store state and offsets — treat them as critical infrastructure, not an afterthought.
  • Choose one offset management strategy (Spark checkpoint OR Kafka auto-commit), never both.

Conclusion

Spark Streaming brings the Spark ecosystem to stream processing. The micro-batch model trades latency for throughput and operational simplicity. If your team knows Spark, you can extend that knowledge to streaming workloads.

Structured Streaming is the modern API, with a clean table-based mental model and good support for event time and watermarking. The integration with Kafka is solid, and exactly-once processing is achievable with proper configuration.

For related reading on other stream processing frameworks, see Apache Kafka and Apache Flink.

Category

Related Posts

Apache Flink: Advanced Stream Processing at Scale

Apache Flink provides advanced stream processing with sophisticated windowing and event-time handling. Learn its architecture, programming model, and use cases.

#data-engineering #apache-flink #stream-processing

Apache Spark: The Workhorse of Distributed Data Processing

A deep dive into Apache Spark's architecture, RDDs, DataFrames, and how it processes massive datasets across clusters at unprecedented scale.

#data-engineering #apache-spark #distributed-computing

Exactly-Once Semantics in Stream Processing

Exactly-once ensures each event processes without duplicates or loss. Learn how it works, when you need it, and the true cost of implementation.

#data-engineering #exactly-once #stream-processing