Apache Beam: Portable Batch and Streaming Pipelines

Discover how Apache Beam's unified programming model lets you write batch and streaming pipelines once and run them on Spark, Flink, or cloud runners.

published: reading time: 11 min read

Apache Beam: The Portable Framework for Batch and Streaming

Data pipelines come in two flavors: batch processes accumulated data, while streaming handles data as it arrives. Historically, you would write different code using different frameworks for each. Batch might use Spark, streaming might use Flink or Kafka Streams. When requirements changed or infrastructure evolved, you would port everything.

Apache Beam proposed a different approach: write your pipeline once using a unified model, then execute it on the runner that fits your needs. The same code runs on Spark, Flink, Google Cloud Dataflow, or portable runners. You choose the execution environment based on operational requirements, not lock-in.

The Core Abstraction: PCollections and Transforms

Beam pipelines center on PCollections (parallel collections) and Transforms (operations). A PCollection represents a distributed dataset—bounded for batch, unbounded for streaming. A Transform is a processing step that reads from one or more PCollections and outputs to another.

// Java SDK example: Count words in a collection of text
public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> lines = pipeline
        .apply("ReadLines", TextIO.read().from("gs://data-bucket/input/*.txt"));

    PCollection<String> words = lines
        .apply("ExtractWords", FlatMapElements.into(TypeDescriptors.strings())
            .via(line -> Arrays.asList(line.split("\\s+"))));

    PCollection<KV<String, Long>> wordCounts = words
        .apply("CountWords", Count.perElement());

    wordCounts.apply("WriteResults", TextIO.write().to("gs://data-bucket/output/wordcounts")
        .withNumShards(1));

    pipeline.run().waitUntilFinish();
}

The Python SDK reads similarly, trading Java’s verbosity for Python’s readability:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    (p
     | 'ReadLines' >> beam.io.ReadFromText('gs://data-bucket/input/*.txt')
     | 'ExtractWords' >> beam.FlatMap(lambda x: x.split())
     | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
     | 'GroupAndSum' >> beam.CombinePerKey(sum)
     | 'WriteResults' >> beam.io.WriteToText('gs://data-bucket/output/wordcounts'))

The key insight is that the pipeline definition is independent of execution. pipeline.run() targets whatever runner you’ve configured—Flink, Spark, Dataflow, or a local runner for testing.

Windowing: Handling Unbounded Data

Unbounded PCollections (streaming data) require windowing to divide time into finite chunks for aggregation. Beam provides several windowing strategies.

flowchart TD
    subgraph Fixed[Fixed Windows &#40;5-min tumbling&#41]
        F1[00:00-00:05]
        F2[00:05-00:10]
        F3[00:10-00:15]
    end
    subgraph Sliding[Sliding Windows &#40;1-hour, slide 5-min&#41]
        S1[00:00-01:00]
        S2[00:05-01:05]
        S3[00:10-01:10]
    end
    subgraph Session[Session Windows &#40;30-min gap&#41]
        E1[(event A&#41]
        E2[(event B&#41]
        E3[(event C&#41]
        GAP1[30-min gap]
        E4[(event D&#41]
        E5[(event E&#41]
        W1[Session 1&#58; A&#44;B&#44;C]
        W2[Session 2&#58; D&#44;E]
    end
    E1 --> W1
    E2 --> W1
    E3 --> W1
    GAP1 ---|no events| W2
    E4 --> W2
    E5 --> W2

Fixed windows create non-overlapping intervals of fixed duration. “Tumbling windows of 5 minutes” means every 5 minutes you emit the aggregate for that window. Fixed windows are the simplest model and match most alerting and monitoring use cases.

Sliding windows overlap. “Windows of 1 hour, sliding every 5 minutes” produces outputs every 5 minutes, each covering the preceding hour. Sliding windows smooth out fluctuations and are useful for moving averages.

Session windows group events by activity gaps. If no events arrive for a customer for 30 minutes, a session window closes. Session windows adapt to data patterns rather than fixed time boundaries.

# Sliding window: 1-hour windows updated every 5 minutes
(p
 | 'WindowIntoSliding' >> beam.WindowInto(
     beam.window.SlidingWindows(3600, 300)  # 1 hour, 5 min slide
 )
 | 'Aggregate' >> beam.CombinePerKey(average_revenue)

Watermarks handle late-arriving data. When a window closes, you might still receive events that belong to that window from before it closed. The watermark estimates when all data for a given window should have arrived. Events after the watermark but before garbage collection are considered late and can trigger a late-fire output or be dropped.

The Runner Landscape

Beam’s portability means you can run the same pipeline on different runners. Each runner has different strengths.

Apache Flink is a full-featured streaming engine with exactly-once semantics, sophisticated windowing, and checkpoint-based fault tolerance. Running Beam on Flink gives you Flink’s operational features with Beam’s programming model.

Apache Spark is the dominant batch processing runner. Beam on Spark handles batch workloads and structured streaming through Spark’s micro-batch engine.

Google Cloud Dataflow is a fully managed runner on Google Cloud. You write Beam code, submit it to Dataflow, and Google handles provisioning, scaling, and monitoring. No cluster management. Dataflow’s dynamic rebalancing automatically distributes work across workers, even when data is skewed.

Runner Comparison

AspectDirectRunnerFlinkRunnerSparkRunnerDataflowRunner
Use caseLocal testingProduction streamingBatch + micro-batchManaged cloud
Exactly-onceNoYesYes (with checkpointing)Yes
LatencyN/A (local)Sub-secondSeconds (micro-batch)Sub-second
ScalingNone (single node)Manual cluster configManual cluster configAuto-scaling managed
State managementIn-memoryRocksDB state backendCheckpointed RDDsStreaming engine
Windowing supportFullFullLimited (structured streaming)Full
Side inputsYesYesYesYes
Operational burdenLowHighHighLow (managed)
Cost modelLocal onlyYour infrastructureYour infrastructurePay-per-use GCS
Best forCI/CD, local devOn-prem streamingSpark shops migratingGCP-native teams
# Run same pipeline on different runners
# Local (for testing)
python my_pipeline.py --runner=DirectRunner

# Flink cluster
python my_pipeline.py --runner=FlinkRunner --flink_master=flink:8081

# Google Cloud Dataflow
python my_pipeline.py --runner=DataflowRunner \
    --project=my-project \
    --region=us-central1 \
    --temp_location=gs://my-temp-bucket/tmp

Side Inputs and Side Outputs

Beyond the main data flow, Beam supports side inputs (additional data available at runtime) and side outputs (routing data to different sinks based on content).

Side inputs let a DoFn access data from another PCollection at runtime. If you have a lookup table of customer attributes and you need to enrich events with those attributes, side input provides the lookup:

# Side input for customer enrichment
customer_lookup = (p
    | 'ReadCustomers' >> beam.io.ReadFromJdbc(
        driver_class_name='org.postgresql.Driver',
        jdbc_url='jdbc:postgresql://db:5432/warehouse',
        table_name='dim_customer')
    | 'AsDict' >> beam.Map(lambda row: (row['customer_id'], row)))

enriched_events = (
    events
    | 'EnrichWithCustomer' >> beam.Map(
        lambda event,
             cust: {**event, 'customer_name': cust.get(event['customer_id'], {}).get('name')},
             side_inputs=[beam.pvalue.AsDict(customer_lookup)])
)

Side outputs route records to different destinations based on logic:

# Side output: separate clean and dirty records
clean, dirty = (
    records
    | 'ParseRecords' >> beam.ParDo(ParseRecords()).with_outputs('clean', 'dirty')
)

clean | 'WriteClean' >> beam.io.WriteToBigQuery('project:dataset.clean')
dirty | 'WriteDirty' >> beam.io.WriteToGCS('gs://dirty-records-bucket/')

When Beam Makes Sense

Beam is valuable when portability across runners matters. If you’re running on-premises today but planning a cloud migration, you can write pipelines in Beam and avoid rewrites. If you’re comparing runners for performance or operational reasons, Beam lets you test without rewriting.

Beam’s unified model also clarifies thinking. Batch and streaming often have the same logical structure—read, transform, write—with different windowing semantics. Beam forces you to be explicit about windowing, watermarks, and triggering, which prevents subtle batch-versus-streaming bugs.

The cost is abstraction overhead. Beam’s model is more constrained than native Flink or Spark. If you need advanced features that Beam doesn’t expose, you end up fighting the abstraction or dropping to runner-specific APIs.

For most teams building new pipelines, Beam is worth considering if you value portability or if you’re already on Google Cloud and want managed execution through Dataflow. For teams deeply invested in Spark or Flink ecosystems, native APIs sometimes offer more capabilities.

Production Failure Scenarios

Watermark stalls causing window emission delays

A pipeline reads from Kafka with a watermark that estimates when all events for a time window should have arrived. A network partition causes a 10-minute delay in one Kafka partition. The watermark stalls, delaying window triggering for all partitions. On-time aggregations wait; late data accumulates. Dashboard queries show gaps until the watermark advances.

Mitigation: Set withAllowedLateness() explicitly to hold windows open longer. Use AfterWatermark.markFiringNewTriggers() to emit partial results before watermark closes. Monitor watermark lag per key with custom metrics.

Late-fire trigger firing multiple times

A session window uses AfterWatermark to fire on-time and AfterPane.elementCountAtLeast(1) for late data. When late elements arrive after watermark, the trigger fires once per element instead of once per window. Side effect outputs (BigQuery writes, Pub/Sub sends) accumulate duplicate actions.

Mitigation: Use Repeatedly.forever(AfterWatermark(...)).orFinally(AfterPane.elementCountAtLeast(N)) to ensure one final fire. Design side effects to be idempotent or use exactly-once delivery guarantees at the sink.

Checkpoint corruption on DirectRunner

A test pipeline uses DirectRunner for production (a mistake). Checkpoint state is stored in-memory and lost on restart. The pipeline reprocesses from the beginning, producing duplicate outputs for idempotent sinks and incorrect results for non-idempotent aggregations.

Mitigation: Use DirectRunner only for local development and CI. Configure checkpointingInterval explicitly and never run DirectRunner in production. Add pipeline start timestamps to outputs for duplicate detection.

Dataflow autoscaling causing memory pressure

Dataflow’s autoscaler adds workers mid-pipeline when CPU utilization exceeds 75%. A heavy GroupByKey operation batches data across new workers, exceeding memory limits on individual workers. The job fails with OOM rather than gracefully redistributing the state.

Mitigation: Estimate peak memory per worker before scaling events. Set machine_type explicitly rather than letting Dataflow choose. Monitor system_pass_progress_wait_time in Dataflow metrics—if it’s high, the pipeline is waiting on resources, not just scheduling.

Apache Beam Observability

Track these metrics to keep Beam pipelines healthy:

# Key Beam metrics to monitor via runner-specific monitoring
beam_metrics = {
    # Latency
    "element_count": "Input elements per bundle",
    "wall_time_ms": "Bundle processing time",
    "system_pass_progress_wait_time": "Time waiting for resources",

    # Watermark progress
    "watermark_delay_sec": "How far behind the event time watermark is",
    "unprocessed_records": "Records waiting in buffers",

    # Output freshness
    "output_watermark_delay_sec": "Latency between processing and output",
    "late_data_dropped_count": "Records dropped after watermark close",

    # Dataflow-specific
    "DataflowJob_currentCPI": "CPU utilization (alert if > 75%)",
    "DataflowJob_pendingness": "Bundles waiting for workers"
}

# Python: access Beam metrics in DoFn
class MetricsDoFn(beam.DoFn):
    def process(self, element):
        # Increment custom counter
        counter = beam.metrics.metrics.Metrics.counter(self.__class__, 'processed_elements')
        counter.inc()
        yield element

Log per bundle: bundle ID, start/end time, element count, watermark at start. Alert on: watermark lag > 5 minutes, late data rate > 1%, Dataflow pendingness sustained > 100 bundles.

Apache Beam Anti-Patterns

Using DirectRunner in production. DirectRunner is single-threaded, stores state in memory, and has no fault tolerance. It is designed only for local development and testing. Production DirectRunner pipelines will lose state on failure and cannot scale horizontally.

Ignoring watermark configuration. Default watermark behavior is conservative. If your data source has known latencies (e.g., mobile events batched overnight), set withAllowedLateness() explicitly or windows close before late-arriving data arrives.

Non-idempotent sinks with at-least-once runners. FlinkRunner with checkpointing gives exactly-once guarantees within the pipeline, but side effects (database writes, API calls) are at-least-once. Design sinks to handle duplicates, or use exactly-once sinks (BigQuery streaming inserts with deduplication).

GroupByKey before windowing. GroupByKey materializes all elements for a key in memory before passing to downstream transforms. For high-cardinality keys or unbounded groups, this causes memory pressure. Use CombinePerKey with incremental aggregation instead, or window before grouping.

Quick Recap

  • Beam’s unified model expresses batch and streaming with the same API—write once, run on Flink, Spark, Dataflow, or DirectRunner.
  • Windowing divides unbounded streams into finite chunks: fixed (non-overlapping), sliding (overlapping), or session (gap-based).
  • Watermarks estimate when all data for a window has arrived; withAllowedLateness() controls how late data is handled.
  • Use DirectRunner for local testing only; FlinkRunner for on-prem streaming; DataflowRunner for managed GCP execution.
  • Design all side effects to be idempotent since pipeline delivery guarantees differ from sink guarantees.

For distributed batch processing at scale, see Apache Spark. For real-time stream processing, Apache Flink provides native streaming with advanced state management.

Conclusion

Apache Beam landed as a response to a real problem: writing separate code for batch and streaming, then being stuck with whatever runner you chose. Its unified model means the same pipeline handles both, and the runner is a deployment choice rather than a design constraint.

The tradeoff is real. Beam cannot express everything that native Flink or Spark can do, and when you hit those edges, you end up either fighting the abstraction or reaching for runner-specific APIs. For GCP teams who want managed execution, DataflowRunner removes a lot of operational overhead. For teams already running Flink or Spark, the portability benefit shrinks.

What Beam did well was articulate what batch and streaming share. That framing helped teams reason about their pipelines more clearly, even if they ultimately used runner-specific APIs instead.

For distributed batch processing at scale, see Apache Spark. For single-node analytical queries, DuckDB offers excellent performance without cluster overhead.

Category

Related Posts

Backpressure Handling: Protecting Pipelines from Overload

Learn how to implement backpressure in data pipelines to prevent cascading failures, handle overload gracefully, and maintain system stability.

#data-engineering #backpressure #data-pipelines

Data Validation: Ensuring Reliability in Data Pipelines

Learn data validation techniques for catching errors early, defining constraints, and building reliable production data pipelines.

#data-engineering #data-quality #data-validation

Data Engineering Roadmap: From Pipelines to Data Warehouse Architecture

Master data engineering with this comprehensive learning path covering data pipelines, ETL/ELT processes, stream processing, data warehousing, and analytics infrastructure.

#data-engineering #data-pipelines #learning-path