GCP Data Services: Dataflow, BigQuery, and Pub/Sub
Guide to Google Cloud data services for building pipelines. Compare Dataflow vs Kafka, leverage BigQuery for analytics, use Pub/Sub, and design data lakes.
GCP Data Services: Dataflow, BigQuery, Pub/Sub, and Cloud Storage
Google Cloud Platform has a mature set of data services that work well together. Pub/Sub handles messaging, Dataflow handles processing, and BigQuery handles analytics. This combination forms the backbone of many GCP data architectures.
This guide covers each service, explains how they work together, and shows implementation patterns.
Service Overview
| Service | Type | Use Case |
|---|---|---|
| Pub/Sub | Messaging | Async messaging, event streaming |
| Dataflow | Stream/Batch Processing | ETL, stream analytics |
| BigQuery | Data Warehouse | Analytical queries, BI |
| Cloud Storage | Object Storage | Data lake, raw storage |
| Bigtable | Wide-column Store | High-throughput time series |
| Dataproc | Managed Spark/Hadoop | Batch processing |
Google Cloud Pub/Sub
Pub/Sub gives you durable, at-least-once messaging. If you need at-most-once, that is also available, but at-least-once is the default and usually what you want.
Pub/Sub Topics and Subscriptions
from google.cloud import pubsub_v1
import json
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
# Create topic
topic_path = publisher.topic_path('project-id', 'events-topic')
publisher.create_topic(request={'name': topic_path})
# Publish message
future = publisher.publish(
topic_path,
data=json.dumps({'event': 'user_signup', 'user_id': '123'}).encode('utf-8'),
attributes={'env': 'production'}
)
# Publish batch
with publisher.batch() as batch:
for event in events:
batch.publish(
data=json.dumps(event).encode('utf-8'),
attributes={'timestamp': event['timestamp']}
)
Subscription Patterns
# Pull subscription (synchronous)
subscription_path = subscriber.subscription_path('project-id', 'events-sub')
def callback(message):
try:
data = json.loads(message.data.decode('utf-8'))
process_event(data)
message.ack()
except Exception as e:
logger.error(f"Failed to process: {e}")
message.nack()
subscriber.subscribe(subscription_path, callback=callback)
# Pull with flow control
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=pubsub_v1.types.FlowControl(
max_messages=10
)
)
Push Subscriptions
from google.cloud import pubsub_v1
# Create push subscription
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
# Create topic
topic_path = publisher.topic_path('project-id', 'webhook-topic')
subscription_path = subscriber.subscription_path('project-id', 'webhook-sub')
# Create subscription with push endpoint
subscription = subscriber.create_subscription(
request={
'name': subscription_path,
'topic': topic_path,
'push_config': pubsub_v1.types.PushConfig(
push_endpoint='https://my-service.example.com/webhook',
attributes={'x-goog-version': 'v1'}
),
'ack_deadline_seconds': 30
}
)
Google Cloud Dataflow
Dataflow provides serverless, autoscaling stream and batch processing using Apache Beam.
Beam Programming Model
Apache Beam provides a unified programming model for batch and streaming:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions([
'--project=project-id',
'--runner=DataflowRunner',
'--staging_location=gs://dataflow-staging/',
'--temp_location=gs://dataflow-temp/',
'--region=us-central1'
])
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic='projects/project-id/topics/events')
| 'ParseJSON' >> beam.Map(lambda x: json.loads(x))
| 'FilterValid' >> beam.Filter(lambda x: x.get('event_type'))
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'project-id:analytics.events',
schema='event_type:STRING, user_id:STRING, timestamp:TIMESTAMP, properties:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
Stream Processing with Windows
import apache_beam as beam
from apache_beam.transforms import window
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadFromKafka' >> beam.io.ReadFromKafka(
consumer_config={
'bootstrap.servers': 'kafka:9092',
'group_id': 'dataflow-consumer',
'auto.offset.reset': 'latest'
},
topics=['events']
)
| 'ParseEvents' >> beam.Map(parse_kafka_message)
| 'FixedWindows' >> beam.WindowInto(
window.FixedWindows(60), # 1-minute windows
trigger=window.AfterWatermark(max_lateness=30),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
)
| 'CountPerWindow' >> beam.combiners.Count.PerElement()
| 'FormatOutput' >> beam.Map(format_counts)
| 'WriteToGCS' >> beam.io.WriteToText(
'gs://data-lake/processed/aggregates',
file_name_suffix='.json'
)
)
Side Inputs and Late Data
with beam.Pipeline(options=pipeline_options) as p:
# Create side input from static data
reference_data = (
p
| 'ReadReferenceData' >> beam.io.ReadFromText('gs://data/reference/*.csv')
| 'ParseReference' >> beam.Map(parse_csv)
| 'AsDict' >> beam.combiners.ToDict()
)
main_data = (
p
| 'ReadEvents' >> beam.io.ReadFromPubSub(topic='projects/project-id/topics/events')
| 'ParseJSON' >> beam.Map(lambda x: json.loads(x))
)
# Join with side input
result = (
main_data
| 'JoinWithReference' >> beam.Map(
lambda elem, ref: {**elem, 'reference_info': ref.get(elem['user_id'])},
side_input=beam.pvalue.AsDict(reference_data)
)
| 'WriteResults' >> beam.io.WriteToBigQuery(...)
)
Google BigQuery
BigQuery is a serverless, highly scalable data warehouse.
BigQuery ML
from google.cloud import bigquery
client = bigquery.Client()
# Create ML model
query = """
CREATE MODEL IF NOT EXISTS `analytics.user_churn_model`
OPTIONS(
model_type='logistic_reg',
input_label_cols=['churned']
) AS
SELECT
*,
CASE WHEN last_login < DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
THEN TRUE ELSE FALSE
END AS churned
FROM `analytics.user_features`
WHERE signup_date >= '2025-01-01'
"""
job = client.query(query)
job.result() # Wait for completion
# Predict with model
predict_query = """
SELECT
user_id,
predicted_churned,
predicted_churned_probs[OFFSET(0)].prob AS churn_probability
FROM ML.PREDICT(MODEL `analytics.user_churn_model`,
(
SELECT * FROM `analytics.user_features`
WHERE DATE_DIFF(CURRENT_DATE(), last_login, DAY) > 7
)
)
ORDER BY churn_probability DESC
LIMIT 100
"""
Streaming Inserts
from google.cloud import bigquery
client = bigquery.Client()
# Insert rows one at a time (streaming)
rows_to_insert = [
{'user_id': '123', 'event': 'page_view', 'timestamp': '2026-03-27T10:00:00'},
{'user_id': '456', 'event': 'purchase', 'timestamp': '2026-03-27T10:05:00'}
]
errors = client.insert_rows_json('project-id.analytics.events', rows_to_insert)
if errors:
print(f"Insert errors: {errors}")
Partitioning and Clustering
-- Create partitioned and clustered table
CREATE TABLE analytics.events (
event_id STRING,
user_id STRING,
event_type STRING,
properties STRING,
timestamp TIMESTAMP
)
PARTITION BY DATE(timestamp)
CLUSTER BY user_id, event_type
OPTIONS(
partition_expiration_days=90,
description='User events partitioned by day'
);
-- Query that prunes partitions
SELECT
user_id,
COUNT(*) as event_count
FROM analytics.events
WHERE DATE(timestamp) BETWEEN '2026-03-01' AND '2026-03-27'
AND event_type = 'purchase'
GROUP BY user_id;
Cloud Storage
Cloud Storage provides durable, globally available object storage.
Data Lake Structure
from google.cloud import storage
client = storage.Client()
# Create buckets
def create_data_lake_buckets():
buckets = [
('raw-events', 'Raw event data'),
('processed-data', 'Processed and transformed data'),
('analytics-export', 'Exports for BI tools'),
('ml-training-data', 'Training datasets for ML')
]
for bucket_name, description in buckets:
bucket = client.bucket(bucket_name)
bucket.location = 'US'
bucket.storage_class = 'STANDARD'
bucket.create()
# Set lifecycle for raw bucket
bucket.add_lifecycle_delete_rule(age=30, storage_class='STANDARD')
bucket.add_lifecycle_delete_rule(age=90, storage_class='NEARLINE')
bucket.add_lifecycle_delete_rule(age=365, storage_class='COLDLINE')
create_data_lake_buckets()
Transfer Service for S3 Migration
from google.cloud import storage_transfer
transfer_client = storage_transfer.StorageTransferServiceClient()
# Create transfer from S3 to GCS
transfer_project = 'project-id'
transfer_config = {
'name': 's3-to-gcs-transfer',
'description': 'Daily S3 to GCS sync',
'transfer_spec': {
'aws_s3_data_source': {
'bucket_name': 'source-s3-bucket',
'aws_access_key': {
'access_key_id': 'AKIA...',
'secret_access_key': 'secret...'
}
},
'gcs_data_sink': {
'bucket_name': 'raw-events',
'path': 's3-import/'
},
'transfer_options': {
'delete_objects_from_source_after_transfer': False,
'overwrite_objects_already_existing_in_sink': True
}
},
'schedule': {
'enabled': True,
'schedule_start_date': {
'year': 2026,
'month': 3,
'day': 27
},
'start_time_of_day': {
'hours': 2,
'minutes': 0
}
}
}
result = transfer_client.create_transfer_job(transfer_config)
Architecture Patterns
Streaming Pipeline Pattern
flowchart TD
A[App] -->|Pub/Sub| B[Pub/Sub Topic]
B --> C[Dataflow Streaming]
C --> D[BigQuery]
C --> E[Cloud Storage]
E --> F[Dataflow Batch]
F --> D
Lambda Architecture on GCP
# Speed layer - streaming
speed_pipeline = (
p
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic='events')
| 'ProcessStream' >> beam.Map(process_event)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'project:analytics.events_realtime'
)
)
# Batch layer - batch processing
batch_pipeline = (
p
| 'ReadFromStorage' >> beam.io.ReadFromText('gs://raw-events/*.json')
| 'BatchProcess' >> beam.Map(batch_aggregate)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'project:analytics.events_daily'
)
)
Integration with External Systems
BigQuery Connections
from google.cloud import bigquery
client = bigquery.Client()
# Create connection to external data source
connection = client.create_connection(
connection_id='analytics.external_mysql',
connection_type='CLOUD_SQL',
properties={
'instanceId': 'mysql-instance',
'database': 'users',
'driver': 'mysql',
'name': 'external_mysql'
}
)
# Query external database directly
external_query = """
SELECT u.user_id, u.email, o.total_purchases
FROM EXTERNAL_QUERY('analytics.external_mysql',
'SELECT * FROM users WHERE active = true') u
JOIN (
SELECT user_id, SUM(amount) as total_purchases
FROM orders
GROUP BY user_id
) o ON u.user_id = o.user_id
"""
Cost Optimization
Key GCP-specific cost optimization strategies:
- Use BigQuery on-demand pricing (perTB scanned) for unpredictable workloads
- Switch to flat-rate when query volume is predictable
- Enable partition pruning to scan less data
- Use Cloud Storage lifecycle policies to auto-tier data
- Consider Preemptible VMs for Dataflow batch workers
- Use BigQuery reservations for consistent, predictable workloads
GCP Services Production Failure Scenarios
Pub/Sub delivery failure causes event gap
A Pub/Sub subscription has ack_deadline_seconds: 10. A Dataflow job processes messages but takes 15 seconds on a slow record. The message is not acknowledged before the deadline. Pub/Sub redelivers it. The downstream system processes the same event twice. An order fulfillment system ships the same order twice.
Mitigation: Set ack_deadline_seconds to at least 2x your expected processing time. Use exactly-once delivery in Dataflow when available for sinks that support it. Add idempotency keys in your message processing.
Dataflow job fails silently with no monitoring alert
A Dataflow streaming job enters a retry loop due to a transient BigQuery error. Each retry succeeds but clears the watermark, causing a 30-minute data gap. No alert fires because the job stays in a “running” state. Dashboards show stale data. The issue is discovered only when a business user notices.
Mitigation: Monitor DataflowJob::JobMetrics::SystemLatencies and DataflowJob::RailroadDfaultProgress. Set Cloud Monitoring alerts on job state changes. Use Dataflow’s built-in availability and latency metrics.
BigQuery slot exhaustion during quarterly reports
Quarterly financial report queries run concurrently, consuming all available BigQuery slots. Interactive queries submitted by analysts start queuing. Query queuing time exceeds 5 minutes. Engineers scramble to cancel running queries or increase reserved slots mid-quarter.
Mitigation: Use BigQuery reservations to guarantee slot capacity for production workloads. Set query priority at submission time. Monitor SlotMillisAvailable and alert when utilization exceeds 80%.
Cloud Storage bucket deleted by accidental lifecycle rule
A lifecycle rule is configured with delete() action but incorrect prefix filter. All objects in the bucket are deleted within 24 hours. There is no cross-region replication enabled. The backup from 2 days ago is restored but 2 days of data are permanently lost.
Mitigation: Enable uniformBucketLevelAccess to prevent accidental deletions. Enable versioning and Object Lifecycle Management with age rules that require at least 2 versions to remain. Enable Soft Delete. Use storage.buckets.update IAM to restrict bucket deletion.
Dataflow late data watermark stalls permanently
Late data arrives after the watermark advances past a trigger threshold. The trigger fires with partial data. Because accumulation_mode=DISCARDING, late data is dropped. Quarterly financial aggregates are permanently undercounted. No error is raised. The wrong number ships to stakeholders.
Mitigation: Use accumulation_mode=ACCUMULATING during periods where late data is expected. Set max_lateness explicitly. Add monitoring on watermark progress vs expected data arrival. Add a validation step after triggers to check row counts against expected ranges.
GCP Services Trade-Offs
| Service | When to Use | Key Risk |
|---|---|---|
| Pub/Sub | At-least-once messaging, global messaging, IoT | No exactly-once; ordering guarantees limited |
| Dataflow | Unified batch/streaming, autoscaling, Beam portability | Vendor lock-in with DataflowRunner; debugging harder |
| BigQuery | Petabyte-scale analytics, serverless SQL, ML | On-demand can surprise on large scans; slots needed for consistency |
| Cloud Storage | Data lake, archival, lifecycle management | No real-time access tier (use Memorystore for that) |
| Bigtable | High-throughput time series, IoT sensor data | No SQL; single-row lookups only |
| Dataproc | When you need Spark/Hadoop control, cost-sensitive batch | More ops overhead than Dataflow; not serverless |
GCP vs AWS Data Services
| Capability | GCP | AWS |
|---|---|---|
| Messaging | Pub/Sub | Kinesis / SQS |
| Stream Processing | Dataflow (Apache Beam) | Kinesis Data Analytics / Flink on EMR |
| Serverless ETL | Dataflow Templates | AWS Glue |
| Data Warehouse | BigQuery | Redshift / Redshift Serverless |
| Object Storage | Cloud Storage | S3 |
| Time Series | Bigtable | Timestream (limited) |
| Managed Spark | Dataproc | EMR |
GCP Services Capacity Estimation
Pub/Sub Throughput
def estimate_pubsub_throughput(
messages_per_second: int,
avg_message_size_kb: float,
retention_hours: int = 24
) -> dict:
"""
Estimate Pub/Sub capacity requirements.
Throughput limits: ~1GB/s per project per region
"""
throughput_mbps = (messages_per_second * avg_message_size_kb) / 1024
return {
'messages_per_second': messages_per_second,
'throughput_mbps': round(throughput_mbps, 2),
'retention_hours': retention_hours,
'storage_per_topic_gb': round(messages_per_second * avg_message_size_kb * retention_hours * 3600 / (1024 * 1024), 2),
'note': 'Pub/Sub auto-scales; no manual shard management needed'
}
# Example:
result = estimate_pubsub_throughput(10000, 2, 24)
# ~19.2 GB storage per topic with 24h retention
Dataflow Worker Sizing
def estimate_dataflow_workers(
streaming: bool,
cpu_per_worker: int = 4,
max_workers: int = 20
) -> dict:
"""
Estimate Dataflow worker requirements.
Default: n1-standard-4 per worker (4 vCPU, 15GB RAM)
Streaming jobs need more memory per worker due to state.
"""
workers_needed = max_workers # Autoscaling handles actual need
return {
'cpu_per_worker': cpu_per_worker,
'default_machine_type': 'n1-standard-4',
'streaming_recommended_mem_gb': 15 if streaming else 15,
'max_workers': max_workers,
'estimated_hourly_cost_per_worker': 0.20 * cpu_per_worker, # ~$0.80/hr for n1-standard-4
'note': 'Dataflow autoscaling handles scaling automatically'
}
# Example:
result = estimate_dataflow_workers(streaming=True, max_workers=50)
# Estimated max hourly cost: 50 * $0.80 = $40/hr
BigQuery Slot Planning
def estimate_bigquery_slots(
queries_per_hour: int,
avg_query_duration_seconds: int,
avg_slots_per_query: int = 10
) -> dict:
"""
Estimate BigQuery slot requirements for flat-rate pricing.
1 slot = ~1 query at a time at low complexity
"""
concurrent_queries = queries_per_hour * (avg_query_duration_seconds / 3600)
avg_concurrent = concurrent_queries / 2 # Rough average
slots_needed = int(avg_concurrent * avg_slots_per_query)
return {
'queries_per_hour': queries_per_hour,
'avg_query_duration_seconds': avg_query_duration_seconds,
'avg_concurrent_queries': round(avg_concurrent, 1),
'slots_needed': slots_needed,
'flat_rate_monthly_cost_per_slot': 80, # Approximate
'estimated_monthly': slots_needed * 80,
'note': 'On-demand usually cheaper if < 100 slots needed consistently'
}
# Example: 60 queries/hour, 120s each, 10 slots each
result = estimate_bigquery_slots(60, 120, 10)
# ~100 slots needed, ~$8,000/month flat-rate
GCP Services Observability Hooks
| Service | Metric | Alert Threshold |
|---|---|---|
| Pub/Sub | subscription/backlog_bytes | > 1GB for > 10m |
| Pub/Sub | subscription/ack_expiration_count | > 0 |
| Dataflow | job/elements_produced_count | sudden drop to 0 |
| Dataflow | system_latency | > 300s for streaming |
| BigQuery | query/execution_times | > 30s p95 |
| BigQuery | slots/allocated | > 90% of reservation |
| BigQuery | query/queueing_time | > 60s |
| Cloud Storage | storage/object_count | unexpected growth |
| Cloud Storage | storage/type_mismatch_count | > 0 (integrity check) |
GCP Services Security Checklist
- IAM service accounts follow least-privilege — no broad permissions on production projects
- Pub/Sub subscriptions use VPC Service Controls for boundary security
- Cloud Storage buckets use uniform bucket-level access; no fine-grained ACLs
- BigQuery dataset access controlled via IAM, not legacy ACLs
- Dataflow runs in private networks with no public IPs
- Cloud KMS used for encryption keys; no customer-managed keys stored in code
- VPC Service Controls enabled for BigQuery to prevent data exfiltration
- Cloud Audit Logs enabled on all services (Admin Activity, Data Access)
- Cloud Storage objects encrypted by default (Google-managed keys); use CMEK for compliance requirements
GCP Services Anti-Patterns
Using Pub/Sub for ordered delivery. Pub/Sub does not guarantee ordering across partitions. If you need strict ordering, use a single partition or a different system. Assuming Pub/Sub preserves order across regions leads to data inconsistencies.
Running Dataflow without monitoring watermark progress. If you do not track whether your watermark is advancing, you will not know when late data is being silently dropped. Always monitor watermark metrics.
BigQuery streaming inserts for batch workloads. Streaming inserts have no minimum batch size but are significantly more expensive than batch loads. Using streaming for bulk data loads is a common cost mistake.
Not using BigQuery partitioning on time. An unpartitioned BigQuery table scanned by date-range queries costs the same as scanning the full table. Always partition on a timestamp column when your data has a time dimension.
Storing data in Cloud Storage without lifecycle rules. Objects default to Standard storage forever. Without lifecycle rules, costs grow indefinitely. Set lifecycle rules on day one.
Quick Recap
- Pub/Sub provides at-least-once durable messaging with global reach; set
ack_deadline_secondsto 2x expected processing time - Dataflow runs serverless Apache Beam pipelines with autoscaling; monitor watermark progress to catch late data issues
- BigQuery is serverless SQL analytics at petabyte scale; use partitioning, clustering, and reservations for cost control
- Cloud Storage is the foundation of GCP data architectures; always set lifecycle rules
- Monitor Pub/Sub backlog, Dataflow system latency, BigQuery slot utilization, and Cloud Storage object counts
- Use VPC Service Controls and IAM least-privilege to protect data boundaries
For more on event-driven patterns, see our Pub-Sub Patterns and Event-Driven Architecture guides.
Key Takeaways:
- Pub/Sub for durable, at-least-once messaging with push or pull delivery
- Dataflow for serverless, autoscaling stream and batch processing with Apache Beam
- BigQuery for serverless analytics with ML built in
- Cloud Storage for object storage with lifecycle management
Architecture Decision Guide:
- Real-time streaming: Pub/Sub + Dataflow streaming
- ETL pipelines: Dataflow batch with templates
- Analytics warehouse: BigQuery with partitioning and clustering
- Data lake: Cloud Storage with Dataflow for processing
- Event-driven microservices: Pub/Sub with Cloud Functions
Integration Points:
- Pub/Sub connects applications to Dataflow
- Dataflow writes to BigQuery and Cloud Storage
- BigQuery ML for machine learning on warehouse data
- Cloud Storage as staging and archive layer
For more on event-driven patterns, see our Pub-Sub Patterns and Event-Driven Architecture guides.
Category
Related Posts
AWS Data Services: Kinesis, Glue, Redshift, and S3
Guide to AWS data services for building data pipelines. Compare Kinesis vs Kafka, use Glue for ETL, query with Athena, and design S3 data lakes.
Azure Data Services: Data Factory, Synapse, and Event Hubs
Build data pipelines on Azure with Data Factory, Synapse Analytics, and Event Hubs. Learn integration patterns, streaming setup, and data architecture.
Data Migration: Strategies and Patterns for Moving Data
Learn proven strategies for migrating data between systems with minimal downtime. Covers bulk migration, CDC patterns, validation, and rollback.