Data Migration: Strategies and Patterns for Moving Data
Learn proven strategies for migrating data between systems with minimal downtime. Covers bulk migration, CDC patterns, validation, and rollback.
Data Migration: Strategies and Patterns for Moving Data
Moving production data is one of the riskiest operations in data engineering. Get it wrong and you lose data, corrupt records, or cause outages that affect customers. Get it right and you migrate seamlessly with zero downtime.
This guide covers migration strategies, CDC patterns, validation techniques, and rollback planning.
Migration Strategies
Each strategy has a distinct rhythm and risk profile. The diagram below shows how they differ.
flowchart TD
subgraph BigBang["Big Bang Migration"]
A1[Stop writes to source] --> A2[Extract all data]
A2 --> A3[Load to target]
A3 --> A4[Validate]
A4 --> A5[Restart on target]
end
subgraph Phased["Phased Migration"]
B1[Extract batch 1] --> B2[Load batch 1]
B2 --> B3[Validate batch 1]
B3 --> B4[Extract batch 2]
B4 --> B5[Load batch 2]
B5 --> B6[Validate batch 2]
B6 --> B7[... repeat]
end
subgraph CDC["Change Data Capture"]
C1[Initial bulk load] --> C2[Enable CDC]
C2 --> C3[Apply changes to target]
C3 --> C4[Validate continuously]
C4 --> C5[Cut over when aligned]
end
subgraph DualWrite["Dual-Write"]
D1[Write to source + shadow] --> D2[Validate shadow = source]
D2 --> D3[Read from target]
D3 --> D4[Switch reads when stable]
end
Strategy 1: Big Bang Migration
Migrate everything in a single batch window. You stop writes, migrate the data, validate, then restart. Simple in concept. The problem is the downtime window.
# Big bang migration
def big_bang_migrate(source_conn, target_conn):
"""Migrate all data at once"""
# Extract all from source
records = source_conn.extract_all('users')
# Load all to target
target_conn.load('users', records)
# Validate
assert source_conn.count('users') == target_conn.count('users')
Pros: Simple, quick Cons: High risk, long downtime window, no easy rollback
Strategy 2: Phased Migration
Migrate in phases, moving a subset of data at a time. This reduces risk per phase and lets you validate incrementally.
# Phased migration
def phased_migrate(source_conn, target_conn, batch_size=10000):
"""Migrate in phases by ID range"""
min_id, max_id = source_conn.get_id_range('users')
offset = min_id
while offset <= max_id:
records = source_conn.extract_range(
'users',
offset=offset,
limit=batch_size
)
target_conn.load('users', records)
offset += batch_size
# Validate each batch
assert source_conn.count('users', f'id <= {offset}') == \
target_conn.count('users', f'id <= {offset}')
# Validate each batch
assert source_conn.count('users', f'id <= {offset}') == \
target_conn.count('users', f'id <= {offset}')
Pros: Lower risk per phase, can validate incrementally Cons: More complex, longer total migration time
Strategy 3: Dual-Write with Shadow Table
Write to both systems during transition, validate, then switch reads.
class DualWrite:
def __init__(self, primary_conn, shadow_conn):
self.primary = primary_conn
self.shadow = shadow_conn
def write(self, table, record):
# Write to both
primary_id = self.primary.insert(table, record)
shadow_id = self.shadow.insert(table, {**record, 'dual_write_id': primary_id})
# Track correlation
self.primary.update(table, primary_id, {'shadow_id': shadow_id})
return primary_id
def read(self, table, query):
# Compare reads during validation period
primary_result = self.primary.query(table, query)
shadow_result = self.shadow.query(table, query)
if primary_result != shadow_result:
logger.warning(f"Data mismatch: {query}")
return primary_result
Pros: Zero downtime, can validate before switching Cons: Application changes required, complexity
Strategy 4: Change Data Capture (CDC)
Capture ongoing changes from source and apply to target.
import json
from datetime import datetime
class CDCMigration:
def __init__(self, source, target, checkpoint_table):
self.source = source
self.target = target
self.checkpoint = checkpoint_table
def get_last_checkpoint(self, stream):
row = self.checkpoint.get(f'cdc_{stream}')
if row:
return json.loads(row['value'])
return {'sequence': 0, 'timestamp': '1970-01-01'}
def save_checkpoint(self, stream, sequence, timestamp):
self.checkpoint.set(f'cdc_{stream}', json.dumps({
'sequence': sequence,
'timestamp': timestamp
}))
def capture_changes(self, stream):
checkpoint = self.get_last_checkpoint(stream)
changes = self.source.get_changes(
stream,
since_sequence=checkpoint['sequence'],
since_timestamp=checkpoint['timestamp']
)
for change in changes:
apply_change(change)
self.save_checkpoint(stream, change.sequence, change.timestamp)
def apply_change(change):
"""Apply CDC change to target"""
if change.operation == 'INSERT':
target.insert(change.table, change.data)
elif change.operation == 'UPDATE':
target.update(change.table, change.key, change.data)
elif change.operation == 'DELETE':
target.delete(change.table, change.key)
CDC Implementation Patterns
AWS DMS (Database Migration Service)
# Configure DMS replication task
import boto3
dms = boto3.client('dms')
# Create endpoint for source
dms.create_endpoint(
EndpointIdentifier='source-mysql',
EndpointType='source',
EngineName='mysql',
MySQLSettings={
'ServerName': 'source-db.example.com',
'Port': 3306,
'DatabaseName': 'production',
'Username': 'dms_user',
'Password': 'password'
}
)
# Create endpoint for target
dms.create_endpoint(
EndpointIdentifier='target-postgres',
EndpointType='target',
EngineName='postgres',
PostgresqlSettings={
'ServerName': 'target-db.example.com',
'Port': 5432,
'DatabaseName': 'production',
'Username': 'dms_user',
'Password': 'password'
}
)
# Create replication instance
dms.create_replication_instance(
ReplicationInstanceIdentifier='dms-instance',
ReplicationInstanceClass='dms.t3.medium',
AllocatedStorage=50,
VpcSecurityGroupIds=['sg-123456'],
AvailabilityZone='us-east-1a'
)
# Create migration task
dms.create_replication_task(
ReplicationTaskIdentifier='migrate-users',
SourceEndpointArn='arn:aws:dms:us-east-1:123456789:endpoint:SOURCE-ID',
TargetEndpointArn='arn:aws:dms:us-east-1:123456789:endpoint:TARGET-ID',
ReplicationInstanceArn='arn:aws:dms:...',
MigrationType='full-load-and-cdc',
TableMappings=json.dumps({
'rules': [
{
'rule-type': 'selection',
'rule-id': '1',
'rule-name': 'migrate-users',
'object-locator': {
'schema-name': 'production',
'table-name': 'users'
},
'rule-action': 'include'
}
]
})
)
Debezium for CDC
# Debezium connector configuration for Kafka
debezium_config = {
"name": "users-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-source",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"topic.prefix": "dbz",
"database.include.list": "production",
"table.include.list": "production.users,production.orders",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes",
"include.schema.changes": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"poll.interval.ms": "1000"
}
}
# Consume CDC events
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'dbz.production.users',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True
)
for message in consumer:
change = message.value
if change['op'] in ['c', 'u', 'd', 'r']: # create, update, delete, read
apply_to_target(change['payload'])
GCP Datastream
# GCP Datastream for MySQL to BigQuery migration
from google.cloud import datastream
client = datastream.DatastreamClient()
# Create source connection profile (MySQL)
source_profile = {
"mysql_profile": {
"hostname": "mysql-source.example.com",
"port": 3306,
"username": "datastream_user",
"password": "password",
"database": "production"
}
}
source_connection = client.create_connection_profile(
parent="projects/project-id/locations/us-central1",
connection_profile_id="mysql-source",
connection_profile=source_profile
)
# Create destination connection profile (GCS)
destination_profile = {
"gcs_profile": {
"bucket": "datastream-staging",
"path": "backfill/",
"gcs_bucket_folder": "backfill"
}
)
# Create stream
stream = client.create_stream(
parent="projects/project-id/locations/us-central1",
stream_id="users-stream",
source_connection_profile_name=source_connection.name,
destination_connection_profile_name=destination_connection.name,
backfill_all=True,
stream={"mysql_exports": {"migrate_objects": [{"mysql_table": {"table_name": "users"}}]}}
)
Migration Validation
Record Count Validation
def validate_record_count(source, target, tables):
"""Validate record counts match"""
results = {}
for table in tables:
source_count = source.count(table)
target_count = target.count(table)
results[table] = {
'source': source_count,
'target': target_count,
'match': source_count == target_count
}
return results
# Example output
{
'users': {'source': 1000000, 'target': 1000000, 'match': True},
'orders': {'source': 5000000, 'target': 4999999, 'match': False} # Problem!
}
Checksum Validation
import hashlib
def compute_table_checksum(conn, table, columns=None):
"""Compute MD5 checksum of table data"""
if columns is None:
columns = conn.get_columns(table)
query = f"""
SELECT MD5(
CONCAT_WAGG(
COALESCE(CAST({columns[0]} AS CHAR), ''),
{' , '.join([f"COALESCE(CAST({c} AS CHAR), '')" for c in columns[1:]])}
)
) as checksum
"""
result = conn.execute(query)
return result[0]['checksum']
def validate_checksums(source, target, table):
"""Validate checksums match"""
columns = source.get_columns(table)
source_checksum = compute_table_checksum(source, table, columns)
target_checksum = compute_table_checksum(target, table, columns)
return {
'source': source_checksum,
'target': target_checksum,
'match': source_checksum == target_checksum
}
Sample Validation
import random
def sample_validation(source, target, table, sample_size=1000):
"""Compare random sample of records"""
source_records = source.sample(table, sample_size)
target_records = target.sample(table, sample_size)
# Build lookup for target
target_lookup = {r['id']: r for r in target_records}
mismatches = []
for record in source_records:
target_record = target_lookup.get(record['id'])
if not target_record:
mismatches.append({'id': record['id'], 'issue': 'missing in target'})
elif record != target_record:
# Find differences
diffs = {k: {'source': record[k], 'target': target_record[k]}
for k in record if record[k] != target_record.get(k)}
mismatches.append({'id': record['id'], 'issue': 'value_mismatch', 'diffs': diffs})
return {
'sampled': sample_size,
'mismatches': len(mismatches),
'match_rate': (sample_size - len(mismatches)) / sample_size,
'samples': mismatches[:10] # First 10 mismatches
}
Data Type Mapping
Different databases have different data types. Map carefully.
Common Mappings
| Source (MySQL) | Target (PostgreSQL) |
|---|---|
| TINYINT | SMALLINT |
| MEDIUMINT | INTEGER |
| BIGINT | BIGINT |
| VARCHAR(255) | VARCHAR(255) |
| TEXT | TEXT |
| DATETIME | TIMESTAMP |
| TIMESTAMP | TIMESTAMP |
| DECIMAL(10,2) | NUMERIC(10,2) |
Data Type Validation
def validate_types(source, target, table):
"""Validate data types are compatible"""
source_types = source.get_column_types(table)
target_types = target.get_column_types(table)
mismatches = []
for col in source_types:
if col not in target_types:
mismatches.append(f"Column {col} missing in target")
elif not compatible_types(source_types[col], target_types[col]):
mismatches.append(
f"Type mismatch for {col}: "
f"{source_types[col]} -> {target_types[col]}"
)
return {
'source_types': source_types,
'target_types': target_types,
'mismatches': mismatches
}
def compatible_types(source_type, target_type):
"""Check if types are compatible"""
# Simplified check
numeric = ['INT', 'BIGINT', 'SMALLINT', 'DECIMAL', 'NUMERIC', 'FLOAT', 'DOUBLE']
string = ['VARCHAR', 'CHAR', 'TEXT']
date = ['DATE', 'TIME', 'DATETIME', 'TIMESTAMP']
for category in [numeric, string, date]:
if any(t in source_type.upper() for t in category) and \
any(t in target_type.upper() for t in category):
return True
return source_type.upper() == target_type.upper()
Handling Schema Changes
Migrations often involve schema changes.
Add Column Migration
def add_column_migration(source, target, table, column_def):
"""Add column to target with default value"""
# 1. Add column with default (non-blocking)
target.execute(f"""
ALTER TABLE {table}
ADD COLUMN {column_def['name']} {column_def['type']}
DEFAULT {column_def['default']}
""")
# 2. Backfill existing records
backfill_column(source, target, table, column_def)
# 3. Add NOT NULL constraint if needed
if column_def.get('not_null'):
target.execute(f"""
ALTER TABLE {table}
ALTER COLUMN {column_def['name']} DROP DEFAULT
""")
def backfill_column(source, target, table, column_def):
"""Backfill column with values from source"""
batch_size = 10000
offset = 0
while True:
records = source.fetch_range(
table,
offset=offset,
limit=batch_size
)
if not records:
break
# Transform records
for record in records:
record[column_def['name']] = transform_value(
record,
column_def
)
target.insert(table, records)
offset += batch_size
Cutover Planning
Pre-Cutover Checklist
## Migration Cutover Checklist
### 1 Week Before
- [ ] Final validation runs clean
- [ ] Rollback procedure documented and tested
- [ ] All stakeholders informed
- [ ] Monitoring dashboards ready
### 1 Day Before
- [ ] Freeze application writes (if applicable)
- [ ] Run final sync
- [ ] Take application snapshots
- [ ] Verify backup integrity
### Cutover Day
- [ ] Stop writes to source
- [ ] Run final CDC sync
- [ ] Validate record counts
- [ ] Run checksum validation
- [ ] Point application to target
- [ ] Verify application works
- [ ] Monitor error rates
### Post-Cutover
- [ ] Monitor for 24 hours
- [ ] Keep source available for 1 week (emergency rollback)
- [ ] Decommission source after 1 week
Rollback Procedure
def rollback_migration(source, target, application_config):
"""Rollback to source database"""
# 1. Stop application
stop_application()
# 2. Point to source
application_config.update({
'database_host': 'source-db.example.com',
'database_port': 3306
})
# 3. Restart application
restart_application()
# 4. Verify
assert application_config.is_connected_to_source()
Production Failure Scenarios
Migrations fail in ways that are predictable if you know where to look. Four scenarios come up repeatedly in production.
Scenario 1: Silent Data Loss During Bulk Load
Bulk load completes with a record count match but some rows are silently dropped due to type coercion or truncation errors.
What happens: A VARCHAR(50) column in the source contains 60-character strings. The target has VARCHAR(50). The load does not error — it truncates. Your application keeps running, but some customer records now have truncated names and nobody notices until customers call support.
Mitigation:
def validate_no_truncation(source, target, table):
"""Check for truncation by comparing string lengths"""
source_lengths = source.execute(f"""
SELECT id, LENGTH(string_column) as len
FROM {table} HAVING LENGTH(string_column) > 50
""")
if source_lengths:
raise ValueError(f"Found {len(source_lengths)} rows that would truncate in target")
# Also check for silent type coercion
target_rows = target.execute(f"SELECT COUNT(*) FROM {table}")
source_rows = source.execute(f"SELECT COUNT(*) FROM {table}")
if target_rows[0]['count'] != source_rows[0]['count']:
raise DataLossError(f"Row count mismatch: {source_rows} vs {target_rows}")
Scenario 2: CDC Lag Exceeding Cutover Window
CDC replication falls behind during peak traffic. The lag grows beyond the cutover window, making the migration take much longer than planned.
What happens: Your CDC migration looks healthy during low-traffic hours. During peak traffic, the source generates changes faster than the CDC pipeline can apply them. What should have been a 2-hour lag becomes 18 hours. Your cutover is delayed and the downtime window expands.
Mitigation:
def monitor_cdc_lag(cdc_client, stream_name, max_acceptable_lag_minutes=30):
"""Alert if CDC lag exceeds acceptable window"""
lag = cdc_client.get_lag(stream_name)
lag_minutes = lag / 60
if lag_minutes > max_acceptable_lag_minutes:
send_alert(
f"CDC lag is {lag_minutes:.0f} minutes — "
f"cutover window at risk. "
f"Consider pausing source writes or scaling CDC."
)
return lag_minutes
Scenario 3: Application Cutover to Corrupt Target State
Application points to target before validation completes. The target has post-migration inconsistencies that were not caught.
What happens: Application switches to target database mid-validation. A batch of records had datetime timezone inconsistencies during migration — times shifted by hours. The application starts making decisions on bad data before the validation report finishes running.
Mitigation:
def safe_cutover(source, target, application_config, validation_queries):
"""Only cut over after all validation queries pass"""
results = {}
for name, query in validation_queries.items():
source_result = source.execute(query)
target_result = target.execute(query)
match = compare_results(source_result, target_result)
results[name] = {'match': match, 'source': source_result, 'target': target_result}
failures = [k for k, v in results.items() if not v['match']]
if failures:
raise SafetyCheckFailed(
f"Validation failed for: {failures}. "
f"Cutover blocked. Review results before proceeding."
)
# All validations passed — safe to cut over
application_config.update({'database_host': target_host})
return results
Scenario 4: Rollback Fails Due to Source Write Lock
You attempt rollback after cutover, but source database rejects writes because the application has not fully released its connections.
What happens: After pointing the application to the target, you attempt to “rollback” by re-enabling writes on the source. But some application instances are still connected to the source, holding locks. Writes fail. The application experiences errors on both systems simultaneously.
Mitigation:
def rollback_with_connection_drain(source, application_config, drain_timeout_seconds=60):
"""Safely drain source connections before rollback"""
# 1. Confirm all app instances have switched
remaining_connections = source.get_active_connections()
if remaining_connections > 0:
print(f"Waiting for {remaining_connections} source connections to drain...")
time.sleep(drain_timeout_seconds)
remaining_connections = source.get_active_connections()
if remaining_connections > 0:
raise RollbackBlocked(
f"{remaining_connections} connections still on source. "
f"Force-killing may cause data loss. "
f"Investigate connection pool settings."
)
# 2. Now safe to re-enable writes on source
source.enable_writes()
Performance Optimization
Bulk Loading
# PostgreSQL COPY for fast bulk loads
import psycopg2
from io import StringIO
def bulk_load_postgres(target_conn, table, records):
"""Use COPY for fast bulk loading"""
buffer = StringIO()
# Write CSV header
if records:
buffer.write(','.join(records[0].keys()) + '\n')
# Write data
for record in records:
values = [str(v).replace('\n', ' ') for v in record.values()]
buffer.write(','.join(values) + '\n')
buffer.seek(0)
cursor = target_conn.cursor()
cursor.copy_from(
buffer,
table,
sep=',',
null=''
)
target_conn.commit()
Parallel Loads
from concurrent.futures import ThreadPoolExecutor
def parallel_load(target, table, records, num_workers=4):
"""Load records in parallel"""
batch_size = len(records) // num_workers
batches = [
records[i:i + batch_size]
for i in range(0, len(records), batch_size)
]
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(target.load_batch, table, batch)
for batch in batches
]
# Wait for all to complete
results = [f.result() for f in futures]
return all(results)
Quick Recap
Key Takeaways:
- Choose migration strategy based on downtime tolerance and risk
- CDC enables continuous migration with minimal downtime
- Always validate record counts, checksums, and samples
- Plan rollback procedures before starting migration
- Test the full migration process in staging first
Strategy Selection:
- Zero downtime required: CDC with dual-write
- Small dataset, low risk: Big bang with validation
- Large dataset, risk-averse: Phased migration
- Complex schema changes: Dual-write with shadow table
Critical Checks:
- Record counts match between source and target
- Checksums match (MD5 or similar)
- Data types are compatible
- No data corruption during transfer
- Application works against target
- Rollback procedure tested
For more on data pipeline patterns, see our AWS Data Services, GCP Data Services, and Azure Data Services guides. For cloud cost considerations during migration, our Cost Optimization guide covers reserved instances and storage tiering.
Category
Related Posts
Serverless Data Processing: Building Elastic Pipelines
Build scalable data pipelines using serverless services. Learn how AWS Lambda, Azure Functions, and Cloud Functions integrate for cost-effective processing.
AWS Data Services: Kinesis, Glue, Redshift, and S3
Guide to AWS data services for building data pipelines. Compare Kinesis vs Kafka, use Glue for ETL, query with Athena, and design S3 data lakes.
Azure Data Services: Data Factory, Synapse, and Event Hubs
Build data pipelines on Azure with Data Factory, Synapse Analytics, and Event Hubs. Learn integration patterns, streaming setup, and data architecture.