GCP Data Services: Dataflow, BigQuery, and Pub/Sub

Guide to Google Cloud data services for building pipelines. Compare Dataflow vs Kafka, leverage BigQuery for analytics, use Pub/Sub, and design data lakes.

published: reading time: 14 min read

GCP Data Services: Dataflow, BigQuery, Pub/Sub, and Cloud Storage

Google Cloud Platform has a mature set of data services that work well together. Pub/Sub handles messaging, Dataflow handles processing, and BigQuery handles analytics. This combination forms the backbone of many GCP data architectures.

This guide covers each service, explains how they work together, and shows implementation patterns.

Service Overview

ServiceTypeUse Case
Pub/SubMessagingAsync messaging, event streaming
DataflowStream/Batch ProcessingETL, stream analytics
BigQueryData WarehouseAnalytical queries, BI
Cloud StorageObject StorageData lake, raw storage
BigtableWide-column StoreHigh-throughput time series
DataprocManaged Spark/HadoopBatch processing

Google Cloud Pub/Sub

Pub/Sub gives you durable, at-least-once messaging. If you need at-most-once, that is also available, but at-least-once is the default and usually what you want.

Pub/Sub Topics and Subscriptions

from google.cloud import pubsub_v1
import json

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Create topic
topic_path = publisher.topic_path('project-id', 'events-topic')
publisher.create_topic(request={'name': topic_path})

# Publish message
future = publisher.publish(
    topic_path,
    data=json.dumps({'event': 'user_signup', 'user_id': '123'}).encode('utf-8'),
    attributes={'env': 'production'}
)

# Publish batch
with publisher.batch() as batch:
    for event in events:
        batch.publish(
            data=json.dumps(event).encode('utf-8'),
            attributes={'timestamp': event['timestamp']}
        )

Subscription Patterns

# Pull subscription (synchronous)
subscription_path = subscriber.subscription_path('project-id', 'events-sub')

def callback(message):
    try:
        data = json.loads(message.data.decode('utf-8'))
        process_event(data)
        message.ack()
    except Exception as e:
        logger.error(f"Failed to process: {e}")
        message.nack()

subscriber.subscribe(subscription_path, callback=callback)

# Pull with flow control
streaming_pull_future = subscriber.subscribe(
    subscription_path,
    callback=callback,
    flow_control=pubsub_v1.types.FlowControl(
        max_messages=10
    )
)

Push Subscriptions

from google.cloud import pubsub_v1

# Create push subscription
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Create topic
topic_path = publisher.topic_path('project-id', 'webhook-topic')
subscription_path = subscriber.subscription_path('project-id', 'webhook-sub')

# Create subscription with push endpoint
subscription = subscriber.create_subscription(
    request={
        'name': subscription_path,
        'topic': topic_path,
        'push_config': pubsub_v1.types.PushConfig(
            push_endpoint='https://my-service.example.com/webhook',
            attributes={'x-goog-version': 'v1'}
        ),
        'ack_deadline_seconds': 30
    }
)

Google Cloud Dataflow

Dataflow provides serverless, autoscaling stream and batch processing using Apache Beam.

Beam Programming Model

Apache Beam provides a unified programming model for batch and streaming:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = PipelineOptions([
    '--project=project-id',
    '--runner=DataflowRunner',
    '--staging_location=gs://dataflow-staging/',
    '--temp_location=gs://dataflow-temp/',
    '--region=us-central1'
])

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic='projects/project-id/topics/events')
     | 'ParseJSON' >> beam.Map(lambda x: json.loads(x))
     | 'FilterValid' >> beam.Filter(lambda x: x.get('event_type'))
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
         'project-id:analytics.events',
         schema='event_type:STRING, user_id:STRING, timestamp:TIMESTAMP, properties:STRING',
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
     ))

Stream Processing with Windows

import apache_beam as beam
from apache_beam.transforms import window

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'ReadFromKafka' >> beam.io.ReadFromKafka(
         consumer_config={
             'bootstrap.servers': 'kafka:9092',
             'group_id': 'dataflow-consumer',
             'auto.offset.reset': 'latest'
         },
         topics=['events']
     )
     | 'ParseEvents' >> beam.Map(parse_kafka_message)
     | 'FixedWindows' >> beam.WindowInto(
         window.FixedWindows(60),  # 1-minute windows
         trigger=window.AfterWatermark(max_lateness=30),
         accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
     )
     | 'CountPerWindow' >> beam.combiners.Count.PerElement()
     | 'FormatOutput' >> beam.Map(format_counts)
     | 'WriteToGCS' >> beam.io.WriteToText(
         'gs://data-lake/processed/aggregates',
         file_name_suffix='.json'
     )
    )

Side Inputs and Late Data

with beam.Pipeline(options=pipeline_options) as p:
    # Create side input from static data
    reference_data = (
        p
        | 'ReadReferenceData' >> beam.io.ReadFromText('gs://data/reference/*.csv')
        | 'ParseReference' >> beam.Map(parse_csv)
        | 'AsDict' >> beam.combiners.ToDict()
    )

    main_data = (
        p
        | 'ReadEvents' >> beam.io.ReadFromPubSub(topic='projects/project-id/topics/events')
        | 'ParseJSON' >> beam.Map(lambda x: json.loads(x))
    )

    # Join with side input
    result = (
        main_data
        | 'JoinWithReference' >> beam.Map(
            lambda elem, ref: {**elem, 'reference_info': ref.get(elem['user_id'])},
            side_input=beam.pvalue.AsDict(reference_data)
        )
        | 'WriteResults' >> beam.io.WriteToBigQuery(...)
    )

Google BigQuery

BigQuery is a serverless, highly scalable data warehouse.

BigQuery ML

from google.cloud import bigquery

client = bigquery.Client()

# Create ML model
query = """
CREATE MODEL IF NOT EXISTS `analytics.user_churn_model`
OPTIONS(
    model_type='logistic_reg',
    input_label_cols=['churned']
) AS
SELECT
    *,
    CASE WHEN last_login < DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
         THEN TRUE ELSE FALSE
    END AS churned
FROM `analytics.user_features`
WHERE signup_date >= '2025-01-01'
"""

job = client.query(query)
job.result()  # Wait for completion

# Predict with model
predict_query = """
SELECT
    user_id,
    predicted_churned,
    predicted_churned_probs[OFFSET(0)].prob AS churn_probability
FROM ML.PREDICT(MODEL `analytics.user_churn_model`,
    (
        SELECT * FROM `analytics.user_features`
        WHERE DATE_DIFF(CURRENT_DATE(), last_login, DAY) > 7
    )
)
ORDER BY churn_probability DESC
LIMIT 100
"""

Streaming Inserts

from google.cloud import bigquery

client = bigquery.Client()

# Insert rows one at a time (streaming)
rows_to_insert = [
    {'user_id': '123', 'event': 'page_view', 'timestamp': '2026-03-27T10:00:00'},
    {'user_id': '456', 'event': 'purchase', 'timestamp': '2026-03-27T10:05:00'}
]

errors = client.insert_rows_json('project-id.analytics.events', rows_to_insert)

if errors:
    print(f"Insert errors: {errors}")

Partitioning and Clustering

-- Create partitioned and clustered table
CREATE TABLE analytics.events (
    event_id STRING,
    user_id STRING,
    event_type STRING,
    properties STRING,
    timestamp TIMESTAMP
)
PARTITION BY DATE(timestamp)
CLUSTER BY user_id, event_type
OPTIONS(
    partition_expiration_days=90,
    description='User events partitioned by day'
);

-- Query that prunes partitions
SELECT
    user_id,
    COUNT(*) as event_count
FROM analytics.events
WHERE DATE(timestamp) BETWEEN '2026-03-01' AND '2026-03-27'
  AND event_type = 'purchase'
GROUP BY user_id;

Cloud Storage

Cloud Storage provides durable, globally available object storage.

Data Lake Structure

from google.cloud import storage

client = storage.Client()

# Create buckets
def create_data_lake_buckets():
    buckets = [
        ('raw-events', 'Raw event data'),
        ('processed-data', 'Processed and transformed data'),
        ('analytics-export', 'Exports for BI tools'),
        ('ml-training-data', 'Training datasets for ML')
    ]

    for bucket_name, description in buckets:
        bucket = client.bucket(bucket_name)
        bucket.location = 'US'
        bucket.storage_class = 'STANDARD'
        bucket.create()

        # Set lifecycle for raw bucket
        bucket.add_lifecycle_delete_rule(age=30, storage_class='STANDARD')
        bucket.add_lifecycle_delete_rule(age=90, storage_class='NEARLINE')
        bucket.add_lifecycle_delete_rule(age=365, storage_class='COLDLINE')

create_data_lake_buckets()

Transfer Service for S3 Migration

from google.cloud import storage_transfer

transfer_client = storage_transfer.StorageTransferServiceClient()

# Create transfer from S3 to GCS
transfer_project = 'project-id'
transfer_config = {
    'name': 's3-to-gcs-transfer',
    'description': 'Daily S3 to GCS sync',
    'transfer_spec': {
        'aws_s3_data_source': {
            'bucket_name': 'source-s3-bucket',
            'aws_access_key': {
                'access_key_id': 'AKIA...',
                'secret_access_key': 'secret...'
            }
        },
        'gcs_data_sink': {
            'bucket_name': 'raw-events',
            'path': 's3-import/'
        },
        'transfer_options': {
            'delete_objects_from_source_after_transfer': False,
            'overwrite_objects_already_existing_in_sink': True
        }
    },
    'schedule': {
        'enabled': True,
        'schedule_start_date': {
            'year': 2026,
            'month': 3,
            'day': 27
        },
        'start_time_of_day': {
            'hours': 2,
            'minutes': 0
        }
    }
}

result = transfer_client.create_transfer_job(transfer_config)

Architecture Patterns

Streaming Pipeline Pattern

flowchart TD
    A[App] -->|Pub/Sub| B[Pub/Sub Topic]
    B --> C[Dataflow Streaming]
    C --> D[BigQuery]
    C --> E[Cloud Storage]
    E --> F[Dataflow Batch]
    F --> D

Lambda Architecture on GCP

# Speed layer - streaming
speed_pipeline = (
    p
    | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic='events')
    | 'ProcessStream' >> beam.Map(process_event)
    | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
        'project:analytics.events_realtime'
    )
)

# Batch layer - batch processing
batch_pipeline = (
    p
    | 'ReadFromStorage' >> beam.io.ReadFromText('gs://raw-events/*.json')
    | 'BatchProcess' >> beam.Map(batch_aggregate)
    | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
        'project:analytics.events_daily'
    )
)

Integration with External Systems

BigQuery Connections

from google.cloud import bigquery

client = bigquery.Client()

# Create connection to external data source
connection = client.create_connection(
    connection_id='analytics.external_mysql',
    connection_type='CLOUD_SQL',
    properties={
        'instanceId': 'mysql-instance',
        'database': 'users',
        'driver': 'mysql',
        'name': 'external_mysql'
    }
)

# Query external database directly
external_query = """
SELECT u.user_id, u.email, o.total_purchases
FROM EXTERNAL_QUERY('analytics.external_mysql',
    'SELECT * FROM users WHERE active = true') u
JOIN (
    SELECT user_id, SUM(amount) as total_purchases
    FROM orders
    GROUP BY user_id
) o ON u.user_id = o.user_id
"""

Cost Optimization

Key GCP-specific cost optimization strategies:

  • Use BigQuery on-demand pricing (perTB scanned) for unpredictable workloads
  • Switch to flat-rate when query volume is predictable
  • Enable partition pruning to scan less data
  • Use Cloud Storage lifecycle policies to auto-tier data
  • Consider Preemptible VMs for Dataflow batch workers
  • Use BigQuery reservations for consistent, predictable workloads

GCP Services Production Failure Scenarios

Pub/Sub delivery failure causes event gap

A Pub/Sub subscription has ack_deadline_seconds: 10. A Dataflow job processes messages but takes 15 seconds on a slow record. The message is not acknowledged before the deadline. Pub/Sub redelivers it. The downstream system processes the same event twice. An order fulfillment system ships the same order twice.

Mitigation: Set ack_deadline_seconds to at least 2x your expected processing time. Use exactly-once delivery in Dataflow when available for sinks that support it. Add idempotency keys in your message processing.

Dataflow job fails silently with no monitoring alert

A Dataflow streaming job enters a retry loop due to a transient BigQuery error. Each retry succeeds but clears the watermark, causing a 30-minute data gap. No alert fires because the job stays in a “running” state. Dashboards show stale data. The issue is discovered only when a business user notices.

Mitigation: Monitor DataflowJob::JobMetrics::SystemLatencies and DataflowJob::RailroadDfaultProgress. Set Cloud Monitoring alerts on job state changes. Use Dataflow’s built-in availability and latency metrics.

BigQuery slot exhaustion during quarterly reports

Quarterly financial report queries run concurrently, consuming all available BigQuery slots. Interactive queries submitted by analysts start queuing. Query queuing time exceeds 5 minutes. Engineers scramble to cancel running queries or increase reserved slots mid-quarter.

Mitigation: Use BigQuery reservations to guarantee slot capacity for production workloads. Set query priority at submission time. Monitor SlotMillisAvailable and alert when utilization exceeds 80%.

Cloud Storage bucket deleted by accidental lifecycle rule

A lifecycle rule is configured with delete() action but incorrect prefix filter. All objects in the bucket are deleted within 24 hours. There is no cross-region replication enabled. The backup from 2 days ago is restored but 2 days of data are permanently lost.

Mitigation: Enable uniformBucketLevelAccess to prevent accidental deletions. Enable versioning and Object Lifecycle Management with age rules that require at least 2 versions to remain. Enable Soft Delete. Use storage.buckets.update IAM to restrict bucket deletion.

Dataflow late data watermark stalls permanently

Late data arrives after the watermark advances past a trigger threshold. The trigger fires with partial data. Because accumulation_mode=DISCARDING, late data is dropped. Quarterly financial aggregates are permanently undercounted. No error is raised. The wrong number ships to stakeholders.

Mitigation: Use accumulation_mode=ACCUMULATING during periods where late data is expected. Set max_lateness explicitly. Add monitoring on watermark progress vs expected data arrival. Add a validation step after triggers to check row counts against expected ranges.

GCP Services Trade-Offs

ServiceWhen to UseKey Risk
Pub/SubAt-least-once messaging, global messaging, IoTNo exactly-once; ordering guarantees limited
DataflowUnified batch/streaming, autoscaling, Beam portabilityVendor lock-in with DataflowRunner; debugging harder
BigQueryPetabyte-scale analytics, serverless SQL, MLOn-demand can surprise on large scans; slots needed for consistency
Cloud StorageData lake, archival, lifecycle managementNo real-time access tier (use Memorystore for that)
BigtableHigh-throughput time series, IoT sensor dataNo SQL; single-row lookups only
DataprocWhen you need Spark/Hadoop control, cost-sensitive batchMore ops overhead than Dataflow; not serverless

GCP vs AWS Data Services

CapabilityGCPAWS
MessagingPub/SubKinesis / SQS
Stream ProcessingDataflow (Apache Beam)Kinesis Data Analytics / Flink on EMR
Serverless ETLDataflow TemplatesAWS Glue
Data WarehouseBigQueryRedshift / Redshift Serverless
Object StorageCloud StorageS3
Time SeriesBigtableTimestream (limited)
Managed SparkDataprocEMR

GCP Services Capacity Estimation

Pub/Sub Throughput

def estimate_pubsub_throughput(
    messages_per_second: int,
    avg_message_size_kb: float,
    retention_hours: int = 24
) -> dict:
    """
    Estimate Pub/Sub capacity requirements.
    Throughput limits: ~1GB/s per project per region
    """
    throughput_mbps = (messages_per_second * avg_message_size_kb) / 1024

    return {
        'messages_per_second': messages_per_second,
        'throughput_mbps': round(throughput_mbps, 2),
        'retention_hours': retention_hours,
        'storage_per_topic_gb': round(messages_per_second * avg_message_size_kb * retention_hours * 3600 / (1024 * 1024), 2),
        'note': 'Pub/Sub auto-scales; no manual shard management needed'
    }

# Example:
result = estimate_pubsub_throughput(10000, 2, 24)
# ~19.2 GB storage per topic with 24h retention

Dataflow Worker Sizing

def estimate_dataflow_workers(
    streaming: bool,
    cpu_per_worker: int = 4,
    max_workers: int = 20
) -> dict:
    """
    Estimate Dataflow worker requirements.
    Default: n1-standard-4 per worker (4 vCPU, 15GB RAM)
    Streaming jobs need more memory per worker due to state.
    """
    workers_needed = max_workers  # Autoscaling handles actual need

    return {
        'cpu_per_worker': cpu_per_worker,
        'default_machine_type': 'n1-standard-4',
        'streaming_recommended_mem_gb': 15 if streaming else 15,
        'max_workers': max_workers,
        'estimated_hourly_cost_per_worker': 0.20 * cpu_per_worker,  # ~$0.80/hr for n1-standard-4
        'note': 'Dataflow autoscaling handles scaling automatically'
    }

# Example:
result = estimate_dataflow_workers(streaming=True, max_workers=50)
# Estimated max hourly cost: 50 * $0.80 = $40/hr

BigQuery Slot Planning

def estimate_bigquery_slots(
    queries_per_hour: int,
    avg_query_duration_seconds: int,
    avg_slots_per_query: int = 10
) -> dict:
    """
    Estimate BigQuery slot requirements for flat-rate pricing.
    1 slot = ~1 query at a time at low complexity
    """
    concurrent_queries = queries_per_hour * (avg_query_duration_seconds / 3600)
    avg_concurrent = concurrent_queries / 2  # Rough average

    slots_needed = int(avg_concurrent * avg_slots_per_query)

    return {
        'queries_per_hour': queries_per_hour,
        'avg_query_duration_seconds': avg_query_duration_seconds,
        'avg_concurrent_queries': round(avg_concurrent, 1),
        'slots_needed': slots_needed,
        'flat_rate_monthly_cost_per_slot': 80,  # Approximate
        'estimated_monthly': slots_needed * 80,
        'note': 'On-demand usually cheaper if < 100 slots needed consistently'
    }

# Example: 60 queries/hour, 120s each, 10 slots each
result = estimate_bigquery_slots(60, 120, 10)
# ~100 slots needed, ~$8,000/month flat-rate

GCP Services Observability Hooks

ServiceMetricAlert Threshold
Pub/Subsubscription/backlog_bytes> 1GB for > 10m
Pub/Subsubscription/ack_expiration_count> 0
Dataflowjob/elements_produced_countsudden drop to 0
Dataflowsystem_latency> 300s for streaming
BigQueryquery/execution_times> 30s p95
BigQueryslots/allocated> 90% of reservation
BigQueryquery/queueing_time> 60s
Cloud Storagestorage/object_countunexpected growth
Cloud Storagestorage/type_mismatch_count> 0 (integrity check)

GCP Services Security Checklist

  • IAM service accounts follow least-privilege — no broad permissions on production projects
  • Pub/Sub subscriptions use VPC Service Controls for boundary security
  • Cloud Storage buckets use uniform bucket-level access; no fine-grained ACLs
  • BigQuery dataset access controlled via IAM, not legacy ACLs
  • Dataflow runs in private networks with no public IPs
  • Cloud KMS used for encryption keys; no customer-managed keys stored in code
  • VPC Service Controls enabled for BigQuery to prevent data exfiltration
  • Cloud Audit Logs enabled on all services (Admin Activity, Data Access)
  • Cloud Storage objects encrypted by default (Google-managed keys); use CMEK for compliance requirements

GCP Services Anti-Patterns

Using Pub/Sub for ordered delivery. Pub/Sub does not guarantee ordering across partitions. If you need strict ordering, use a single partition or a different system. Assuming Pub/Sub preserves order across regions leads to data inconsistencies.

Running Dataflow without monitoring watermark progress. If you do not track whether your watermark is advancing, you will not know when late data is being silently dropped. Always monitor watermark metrics.

BigQuery streaming inserts for batch workloads. Streaming inserts have no minimum batch size but are significantly more expensive than batch loads. Using streaming for bulk data loads is a common cost mistake.

Not using BigQuery partitioning on time. An unpartitioned BigQuery table scanned by date-range queries costs the same as scanning the full table. Always partition on a timestamp column when your data has a time dimension.

Storing data in Cloud Storage without lifecycle rules. Objects default to Standard storage forever. Without lifecycle rules, costs grow indefinitely. Set lifecycle rules on day one.

Quick Recap

  • Pub/Sub provides at-least-once durable messaging with global reach; set ack_deadline_seconds to 2x expected processing time
  • Dataflow runs serverless Apache Beam pipelines with autoscaling; monitor watermark progress to catch late data issues
  • BigQuery is serverless SQL analytics at petabyte scale; use partitioning, clustering, and reservations for cost control
  • Cloud Storage is the foundation of GCP data architectures; always set lifecycle rules
  • Monitor Pub/Sub backlog, Dataflow system latency, BigQuery slot utilization, and Cloud Storage object counts
  • Use VPC Service Controls and IAM least-privilege to protect data boundaries

For more on event-driven patterns, see our Pub-Sub Patterns and Event-Driven Architecture guides.

Key Takeaways:

  • Pub/Sub for durable, at-least-once messaging with push or pull delivery
  • Dataflow for serverless, autoscaling stream and batch processing with Apache Beam
  • BigQuery for serverless analytics with ML built in
  • Cloud Storage for object storage with lifecycle management

Architecture Decision Guide:

  • Real-time streaming: Pub/Sub + Dataflow streaming
  • ETL pipelines: Dataflow batch with templates
  • Analytics warehouse: BigQuery with partitioning and clustering
  • Data lake: Cloud Storage with Dataflow for processing
  • Event-driven microservices: Pub/Sub with Cloud Functions

Integration Points:

  • Pub/Sub connects applications to Dataflow
  • Dataflow writes to BigQuery and Cloud Storage
  • BigQuery ML for machine learning on warehouse data
  • Cloud Storage as staging and archive layer

For more on event-driven patterns, see our Pub-Sub Patterns and Event-Driven Architecture guides.

Category

Related Posts

AWS Data Services: Kinesis, Glue, Redshift, and S3

Guide to AWS data services for building data pipelines. Compare Kinesis vs Kafka, use Glue for ETL, query with Athena, and design S3 data lakes.

#data-engineering #aws #kinesis

Azure Data Services: Data Factory, Synapse, and Event Hubs

Build data pipelines on Azure with Data Factory, Synapse Analytics, and Event Hubs. Learn integration patterns, streaming setup, and data architecture.

#data-engineering #azure #data-factory

Data Migration: Strategies and Patterns for Moving Data

Learn proven strategies for migrating data between systems with minimal downtime. Covers bulk migration, CDC patterns, validation, and rollback.

#data-engineering #data-migration #cdc