Serverless Data Processing: Building Elastic Pipelines
Build scalable data pipelines using serverless services. Learn how AWS Lambda, Azure Functions, and Cloud Functions integrate for cost-effective processing.
Serverless Data Processing: Building Elastic Data Pipelines Without Infrastructure
Serverless computing changes how you build data pipelines. Instead of provisioning servers and managing capacity, you write functions that run on-demand and scale automatically. For variable workloads or event-driven processing, this model can cut costs and reduce operational overhead.
This guide covers serverless data processing patterns across AWS, Azure, and GCP. It focuses on where serverless works well and where it falls short.
When Serverless Works
Serverless excels for event-driven processing like file uploads or queue messages, periodic batch jobs, variable or unpredictable workloads, prototyping, and simple transformations that do not need complex state.
Serverless struggles with long-running processes (functions have timeouts, AWS Lambda caps at 15 minutes), high-throughput continuous streaming (batch processing is cheaper here), complex state management, and heavy computational workloads that get expensive at scale.
AWS Lambda for Data Processing
Lambda integrates tightly with AWS data services.
Lambda with S3 Events
import json
import boto3
s3 = boto3.client('s3')
def process_s3_event(event, context):
"""Triggered when new file lands in S3"""
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Only process new files
if not key.startswith('raw/'):
return {'statusCode': 200, 'body': 'skipped'}
# Determine output key
output_key = key.replace('raw/', 'processed/')
# Read and transform
response = s3.get_object(Bucket=bucket, Key=key)
data = json.loads(response['Body'].read())
# Process data
processed = transform_records(data)
# Write output
s3.put_object(
Bucket=bucket,
Key=output_key,
Body=json.dumps(processed),
ContentType='application/json'
)
return {'statusCode': 200, 'body': f'Processed {len(processed)} records'}
def transform_records(records):
"""Transform raw records to processed format"""
processed = []
for record in records:
processed.append({
'id': record['id'],
'timestamp': record['timestamp'],
'value': float(record['value']) * 100, # Convert to cents
'category': record.get('category', 'unknown').upper()
})
return processed
Lambda with Kinesis
import json
import boto3
kinesis = boto3.client('kinesis')
def process_kinesis_stream(event, context):
"""Process Kinesis records in batches"""
records = []
for record in event['Records']:
# Decode and process
data = json.loads(json.loads(record['kinesis']['data']))
records.append(data)
# Batch process for efficiency
aggregated = aggregate_records(records)
# Write aggregated results
for result in aggregated:
kinesis.put_record(
StreamName='processed-stream',
Data=json.dumps(result),
PartitionKey=result['id']
)
return {'processed': len(records), 'output': len(aggregated)}
def aggregate_records(records):
"""Aggregate records by id"""
aggregated = {}
for record in records:
id = record['id']
if id not in aggregated:
aggregated[id] = {
'id': id,
'count': 0,
'total_value': 0,
'min_timestamp': record['timestamp'],
'max_timestamp': record['timestamp']
}
agg = aggregated[id]
agg['count'] += 1
agg['total_value'] += float(record['value'])
agg['min_timestamp'] = min(agg['min_timestamp'], record['timestamp'])
agg['max_timestamp'] = max(agg['max_timestamp'], record['timestamp'])
return list(aggregated.values())
Lambda with SQS
import json
import boto3
sqs = boto3.client('sqs')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('processing-state')
def process_sqs_batch(event, context):
"""Process messages from SQS queue"""
processed_count = 0
for message in event['Records']:
body = json.loads(message['body'])
process_id = body['id']
# Check if already processed (idempotency)
existing = table.get_item(Key={'process_id': process_id})
if existing.get('Item'):
continue
# Process
result = do_processing(body)
# Mark as processed
table.put_item(Item={
'process_id': process_id,
'result': result,
'processed_at': context.invoked_function_arn
})
processed_count += 1
return {'processed': processed_count}
Google Cloud Functions
Cloud Functions are lightweight, single-purpose functions.
Cloud Functions with Pub/Sub
import json
from google.cloud import bigquery
bq = bigquery.Client()
def process_pubsub_event(event, context):
"""Triggered by Pub/Sub message"""
pubsub_message = event.data
data = json.loads(pubsub_message.decode('utf-8'))
if data.get('type') != 'user_event':
return
# Enrich with reference data
enriched = enrich_event(data)
# Write to BigQuery
table = bq.dataset('analytics').table('events')
errors = bq.insert_rows_json(table, [enriched])
if errors:
print(f'BigQuery insert errors: {errors}')
def enrich_event(event):
"""Enrich event with reference data"""
from google.cloud import datastore
datastore_client = datastore.Client()
key = datastore_client.key('User', event['user_id'])
user = datastore_client.get(key)
return {
'event_id': event['id'],
'user_id': event['user_id'],
'event_type': event['type'],
'timestamp': event['timestamp'],
'user_tier': user.get('tier', 'free') if user else 'unknown',
'country': user.get('country', 'unknown') if user else 'unknown'
}
Cloud Functions with Cloud Storage
import json
from google.cloud import storage
storage_client = storage.Client()
def process_gcs_file(event, context):
"""Triggered when file is uploaded to GCS"""
bucket_name = event['bucket']
file_name = event['name']
if not file_name.startswith('raw/'):
return
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
# Read content
content = blob.download_as_text()
# Process
processed = transform_csv(content)
# Write to processed folder
output_blob = bucket.blob(file_name.replace('raw/', 'processed/'))
output_blob.upload_from_string(
data=processed,
content_type='application/json'
)
def transform_csv(content):
"""Transform CSV to JSON"""
import csv
import io
reader = csv.DictReader(io.StringIO(content))
records = []
for row in reader:
records.append({
'id': row['id'],
'name': row['name'],
'value': float(row['value']) * 100,
'timestamp': row['timestamp']
})
return json.dumps(records)
Azure Functions
Azure Functions support multiple languages and triggers.
Azure Functions with Event Hubs
import json
import logging
from azure.eventhub import EventHubConsumerClient
from azure.functions import EventHubConverter
def main(events: list[func.EventHubEvent]) -> str:
"""Process Event Hubs messages"""
processed = 0
for event in events:
# Decode message
data = json.loads(event.get_body().decode('utf-8'))
# Process
result = process_event(data)
if result:
processed += 1
logging.info(f'Processed {processed} events')
return f'Processed {processed} events'
def process_event(event):
"""Process single event"""
if not event.get('timestamp'):
return False
# Business logic
return True
Azure Functions with Blob Triggers
import json
import logging
from azure.functions import BlobStreamHandler, InputStream
def main(inputblob: InputStream, context: Context) -> None:
"""Process blob when uploaded"""
blob_path = inputblob.name
logger = logging.getLogger()
if not blob_path.startswith('raw/'):
return
# Read content
content = inputblob.read().decode('utf-8')
# Process based on file type
if blob_path.endswith('.csv'):
result = process_csv(content)
elif blob_path.endswith('.json'):
result = process_json(content)
else:
logger.warning(f'Unsupported file type: {blob_path}')
return
# Write result
output_path = blob_path.replace('raw/', 'processed/')
output_container = 'processed'
# Use BlobService to write output
from azure.storage.blob import BlobClient
blob_client = BlobClient.from_connection_string(
conn_str='DefaultEndpointsProtocol=https;AccountName=...',
container_name=output_container,
blob_name=output_path
)
blob_client.upload_blob(json.dumps(result))
Cold Start Mitigation
Cold starts can be problematic for latency-sensitive workloads.
Keep Functions Warm
# CloudWatch Events rule to invoke Lambda every 5 minutes
Resources:
KeepWarmRule:
Type: AWS::Events::Rule
Properties:
ScheduleExpression: rate(5 minutes)
Targets:
- Id: KeepWarmTarget
Arn: !GetAtt ProcessFunction.Arn
Permission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref ProcessFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt KeepWarmRule.Arn
Provisioned Concurrency (AWS)
# AWS Lambda provisioned concurrency
import boto3
lambda_client = boto3.client('lambda')
# Create provisioned concurrency configuration
lambda_client.put_provisioned_concurrency_config(
FunctionName='process-function',
Qualifier='production',
ProvisionedConcurrentExecutions=10
)
Concurrency Limits
Each platform has concurrency limits:
| Platform | Default Limit | Can Increase? |
|---|---|---|
| AWS Lambda | 1000/region | Yes, with request |
| Azure Functions | 200/instances | Yes, with tier |
| GCP Cloud Functions | 1000/region | Yes |
| AWS Lambda per function | 100 | Yes |
Managing Concurrency
# AWS Lambda reserved concurrency
lambda_client.put_function_concurrency(
FunctionName='critical-function',
ReservedConcurrentExecutions=50 # Reserve capacity
)
# AWS Lambda execution duration limits
# General: 15 minutes max
# Note: For long-running tasks, consider AWS Step Functions
Cost Comparison
Serverless pricing varies:
| Provider | Pricing Model | Free Tier |
|---|---|---|
| AWS Lambda | Requests + Duration (GB-s) | 400,000 GB-s/month |
| Azure Functions | Executions + GB-s | 400,000 GB-s/month |
| GCP Cloud Functions | Invocations + GB-s + CPU-s | 400,000 GB-s/month |
Cost Optimization
# Cost optimization strategies
# 1. Bundle dependencies to reduce deployment package size
# Smaller packages = faster cold starts
# 2. Right-size memory allocation
# Lambda: paying for memory, but faster execution may use less overall
# 3. Use efficient serialization (JSON -> binary)
# CSV/Parquet instead of JSON when possible
# 4. Batch where possible
# SQS batch, Kinesis enhanced fan-out
# 5. Consider Step Functions for complex workflows
# Often cheaper than Lambda-to-Lambda chains
Step Functions for Complex Workflows
For workflows beyond single functions, Step Functions coordinates multiple Lambda functions.
import boto3
step_functions = boto3.client('stepfunctions')
# Define state machine
state_machine_def = {
"Comment": "Data processing pipeline",
"StartAt": "ValidateInput",
"States": {
"ValidateInput": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:validate",
"Next": "ProcessData"
},
"ProcessData": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "ProcessPart1",
"States": {
"ProcessPart1": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:process1",
"End": True
}
}
},
{
"StartAt": "ProcessPart2",
"States": {
"ProcessPart2": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:process2",
"End": True
}
}
}
],
"Next": "AggregateResults"
},
"AggregateResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:aggregate",
"End": True
}
}
}
# Create state machine
step_functions.create_state_machine(
name='data-processing-pipeline',
definition=json.dumps(state_machine_def),
roleARN='arn:aws:iam::account:role/step-functions-role'
)
Common Patterns
Pattern 1: File Processing Pipeline
flowchart TD
A[S3 Upload] -->|Trigger| B[Lambda Validator]
B -->|Valid| C[Lambda Transformer]
B -->|Invalid| D[Lambda DLQ Handler]
C --> E[Processed S3]
E -->|Trigger| F[Lambda Aggregator]
F --> G[Redshift/Athena]
Pattern 2: Queue-Based Processing
flowchart TD
A[Producer] --> B[SQS Queue]
B -->|Batch| C[Lambda Processor]
C --> D[DynamoDB]
C -->|Failures| E[DLQ]
Pattern 3: Stream Enrichment
flowchart TD
A[Kinesis] --> B[Lambda Enricher]
B -->|Lookup| C[DynamoDB]
B --> D[Lambda Formatter]
D --> E[Firehose]
E --> F[S3/Redshift]
Production Failure Scenarios
Serverless functions fail differently from long-running servers. The failure modes are predictable — knowing them helps you design around them.
Scenario 1: Cold Start Latency Causing Timeouts
Cold starts add seconds of latency on the first invocation after idle periods. For synchronous APIs, this causes timeouts if clients have short timeout thresholds.
What happens: Your Lambda is idle for 10 minutes. A new request arrives and Lambda starts a new execution environment. Cold start takes 3 seconds. The API Gateway timeout is 3 seconds. The request fails with a 504 before the function finishes initializing.
Mitigation:
# Keep functions warm with scheduled provisioned concurrency
import boto3
lambda_client = boto3.client('lambda')
def ensure_warm(function_name, provisioned_concurrency=5):
"""Ensure minimum provisioned concurrency before traffic spikes"""
try:
lambda_client.put_provisioned_concurrency_config(
FunctionName=function_name,
Qualifier='production',
ProvisionedConcurrentExecutions=provisioned_concurrency
)
except lambda_client.exceptions.ResourceNotFoundException:
print(f"Function {function_name} not found")
Scenario 2: Timeout During Variable-Size Processing
Lambda timeouts are fixed per invocation. A function processing a variable number of records may timeout on large batches while succeeding on small ones.
What happens: Your Lambda processes SQS messages. Most batches have 10 messages and complete in 2 seconds. A batch of 1000 messages causes 30 seconds of processing and hits the 30-second timeout. The messages return to the queue, are retried, and create duplicate processing.
Mitigation:
def process_sqs_batch(event, context):
"""Process with visibility timeout awareness"""
records = event['Records']
batch_size = len(records)
# Estimate processing time: ~50ms per record
estimated_time = batch_size * 0.05
# If estimated to exceed 80% of remaining time, let it fail and retry
remaining_time = context.get_remaining_time_in_millis() / 1000
if estimated_time > remaining_time * 0.8:
# Return without deleting — messages go back to queue for retry
raise Exception(f"Estimated processing {estimated_time}s exceeds {remaining_time:.1f}s remaining")
# Process normally
for record in records:
process_record(record)
return {'processed': batch_size}
Scenario 3: Concurrency Limit Breach Creating Backpressure
Each Lambda function has reserved concurrency limits. When all concurrent executions are saturated, new invocations are throttled instead of queued.
What happens: Your fraud detection function receives a traffic spike. All 100 reserved concurrency slots are occupied by long-running fraud model inference. New fraud check requests are throttled with 429 errors. Transactions proceed without fraud review.
Mitigation:
# Set reserved concurrency to leave headroom for critical paths
lambda_client.put_function_concurrency(
FunctionName='fraud-detection',
ReservedConcurrentExecutions=80 # Leave 20 for lower-priority batch jobs
)
# Use SQS as a buffer between the spike and Lambda
# SQS queues messages and Lambda processes at its own pace
# Configure maximum receive count on the DLQ to handle poison messages
Observability Hooks
Instrument serverless functions to catch degradation before it becomes an incident.
What to instrument:
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def observe_lambda(handler):
"""Decorator that instruments Lambda handler"""
@wraps(handler)
def wrapper(event, context):
start = time.time()
cold_start = hasattr(context, 'invoked_function_arn') and context.get_remaining_time_in_millis() > context.timeout * 1000 * 0.9
try:
result = handler(event, context)
elapsed = time.time() - start
logger.info(
"lambda_invocation",
extra={
'function_name': context.function_name,
'duration_ms': round(elapsed * 1000, 1),
'cold_start': cold_start,
'max_memory_mb': context.memory_limit_in_mb,
'remaining_time_ms': context.get_remaining_time_in_millis()
}
)
return result
except Exception as e:
elapsed = time.time() - start
logger.error(
"lambda_error",
extra={
'function_name': context.function_name,
'duration_ms': round(elapsed * 1000, 1),
'error': str(e),
'cold_start': cold_start
}
)
raise
return wrapper
@observe_lambda
def handler(event, context):
# Your Lambda logic here
pass
Metrics to alert on:
- Invocation duration p95 exceeding baseline by >50%
- Cold start rate >5% of invocations in a 5-minute window
- Error rate >1% of invocations
- Throttle count >0 (any throttling is a signal of undersized concurrency)
- Memory usage approaching limit (logged from context.memory_limit_in_mb)
Security Checklist
Serverless functions run with broad IAM permissions by default. Lock them down before production.
- IAM roles — least privilege. Functions should have exactly the permissions they need and nothing more. A function that reads from S3 and writes to DynamoDB needs
s3:GetObjectanddynamodb:PutItem, not full S3 and DynamoDB access. Review and rotate IAM roles regularly. - Function permissions — no wildcard principals. When granting cross-account access or access from API Gateway, use specific resource ARNs, not
*.arn:aws:lambda:us-east-1:123456789:function:*should bearn:aws:lambda:us-east-1:123456789:function:my-specific-function. - Secrets management. Never store credentials in environment variables. Use AWS Secrets Manager, Parameter Store, or KMS for sensitive configuration. Environment variables are visible in CloudWatch logs and can be accessed by anyone with GetFunctionConfiguration permissions.
- VPC consideration. If your function accesses VPC resources (RDS, ElastiCache), ensure the VPC has proper security groups. Functions in a VPC cannot access the internet unless the VPC has NAT Gateway or VPC endpoints configured.
- Dependency scanning. Serverless functions often bundle third-party libraries. Run
pip-auditornpm auditon your requirements before deployment. A vulnerable dependency in a Lambda is as exploitable as one on a server. - Dead letter queues. Configure DLQs for every function. Without a DLQ, failed invocations disappear and you lose failed events permanently.
Common Anti-Patterns
1. Deep Lambda Chains
Chaining Lambda-to-Lambda-to-Lambda creates tight coupling and adds latency. Each hop adds cold start time and per-hop costs. If function C in a 3-link chain fails, the entire pipeline breaks and debugging is painful. Use Step Functions for multi-step workflows instead. The state machine is visual, failures are identifiable, and retries are declarative.
2. Oversized Functions Doing Too Much
A single Lambda handling file upload, format validation, enrichment, transformation, and database writes is hard to test and debug. If the enrichment service is down, the entire function fails even though the upload and validation worked. Split into separate functions with an SQS queue between stages instead. Each function does one thing. Failures are isolated and retries are scoped to the failing stage.
3. Synchronous Waiting Inside a Function
Calling a downstream service synchronously inside a Lambda ties up the execution slot for the duration. If your Lambda calls a microservice that takes 5 seconds, you pay for 5 seconds of idle time. Use async patterns instead: publish to an SQS queue or SNS topic and return immediately. Let the downstream consumer process on its own timeline.
4. No Provisioned Concurrency for Latency-Critical Paths
Leaving cold starts to chance on user-facing paths means occasional 3-second delays — enough to send users elsewhere. Use provisioned concurrency for synchronous user-facing functions. The idle capacity cost during spikes is almost always cheaper than the alternative: losing users to timeouts.
Quick Recap
Key Takeaways:
- Serverless works best for event-driven processing, not continuous streaming
- Lambda/Cloud Functions/Functions scale automatically but have limits
- Cold starts can be mitigated with provisioned concurrency or scheduled warming
- Use Step Functions/Workflows for complex multi-step pipelines
- Monitor costs carefully at scale (batch processing often cheaper)
When to Use Serverless:
- Variable, unpredictable workloads
- Event-driven architectures
- Periodic batch jobs
- Prototyping and MVPs
- Simple transformations without complex state
When to Avoid Serverless:
- High-throughput continuous streaming
- Long-running processes (> 15 minutes)
- Very high compute requirements
- Consistent, predictable base load (reserved capacity cheaper)
Cost Tips:
- Monitor execution time and optimize
- Use appropriate memory settings
- Batch messages where possible
- Consider Step Functions for complex workflows
- Right-size concurrency reservations
For more on event-driven patterns, see our Event-Driven Architecture and Pub-Sub Patterns guides. For observability in serverless environments, our Distributed Tracing guide covers tracing across function invocations.
Category
Tags
Related Posts
Data Migration: Strategies and Patterns for Moving Data
Learn proven strategies for migrating data between systems with minimal downtime. Covers bulk migration, CDC patterns, validation, and rollback.
AWS Data Services: Kinesis, Glue, Redshift, and S3
Guide to AWS data services for building data pipelines. Compare Kinesis vs Kafka, use Glue for ETL, query with Athena, and design S3 data lakes.
Azure Data Services: Data Factory, Synapse, and Event Hubs
Build data pipelines on Azure with Data Factory, Synapse Analytics, and Event Hubs. Learn integration patterns, streaming setup, and data architecture.