Data Migration: Strategies and Patterns for Moving Data

Learn proven strategies for migrating data between systems with minimal downtime. Covers bulk migration, CDC patterns, validation, and rollback.

published: reading time: 13 min read

Data Migration: Strategies and Patterns for Moving Data

Moving production data is one of the riskiest operations in data engineering. Get it wrong and you lose data, corrupt records, or cause outages that affect customers. Get it right and you migrate seamlessly with zero downtime.

This guide covers migration strategies, CDC patterns, validation techniques, and rollback planning.

Migration Strategies

Each strategy has a distinct rhythm and risk profile. The diagram below shows how they differ.

flowchart TD
    subgraph BigBang["Big Bang Migration"]
        A1[Stop writes to source] --> A2[Extract all data]
        A2 --> A3[Load to target]
        A3 --> A4[Validate]
        A4 --> A5[Restart on target]
    end

    subgraph Phased["Phased Migration"]
        B1[Extract batch 1] --> B2[Load batch 1]
        B2 --> B3[Validate batch 1]
        B3 --> B4[Extract batch 2]
        B4 --> B5[Load batch 2]
        B5 --> B6[Validate batch 2]
        B6 --> B7[... repeat]
    end

    subgraph CDC["Change Data Capture"]
        C1[Initial bulk load] --> C2[Enable CDC]
        C2 --> C3[Apply changes to target]
        C3 --> C4[Validate continuously]
        C4 --> C5[Cut over when aligned]
    end

    subgraph DualWrite["Dual-Write"]
        D1[Write to source + shadow] --> D2[Validate shadow = source]
        D2 --> D3[Read from target]
        D3 --> D4[Switch reads when stable]
    end

Strategy 1: Big Bang Migration

Migrate everything in a single batch window. You stop writes, migrate the data, validate, then restart. Simple in concept. The problem is the downtime window.

# Big bang migration
def big_bang_migrate(source_conn, target_conn):
    """Migrate all data at once"""
    # Extract all from source
    records = source_conn.extract_all('users')

    # Load all to target
    target_conn.load('users', records)

    # Validate
    assert source_conn.count('users') == target_conn.count('users')

Pros: Simple, quick Cons: High risk, long downtime window, no easy rollback

Strategy 2: Phased Migration

Migrate in phases, moving a subset of data at a time. This reduces risk per phase and lets you validate incrementally.

# Phased migration
def phased_migrate(source_conn, target_conn, batch_size=10000):
    """Migrate in phases by ID range"""
    min_id, max_id = source_conn.get_id_range('users')

    offset = min_id
    while offset <= max_id:
        records = source_conn.extract_range(
            'users',
            offset=offset,
            limit=batch_size
        )
        target_conn.load('users', records)
        offset += batch_size

        # Validate each batch
        assert source_conn.count('users', f'id <= {offset}') == \
               target_conn.count('users', f'id <= {offset}')

        # Validate each batch
        assert source_conn.count('users', f'id <= {offset}') == \
               target_conn.count('users', f'id <= {offset}')

Pros: Lower risk per phase, can validate incrementally Cons: More complex, longer total migration time

Strategy 3: Dual-Write with Shadow Table

Write to both systems during transition, validate, then switch reads.

class DualWrite:
    def __init__(self, primary_conn, shadow_conn):
        self.primary = primary_conn
        self.shadow = shadow_conn

    def write(self, table, record):
        # Write to both
        primary_id = self.primary.insert(table, record)
        shadow_id = self.shadow.insert(table, {**record, 'dual_write_id': primary_id})

        # Track correlation
        self.primary.update(table, primary_id, {'shadow_id': shadow_id})

        return primary_id

    def read(self, table, query):
        # Compare reads during validation period
        primary_result = self.primary.query(table, query)
        shadow_result = self.shadow.query(table, query)

        if primary_result != shadow_result:
            logger.warning(f"Data mismatch: {query}")

        return primary_result

Pros: Zero downtime, can validate before switching Cons: Application changes required, complexity

Strategy 4: Change Data Capture (CDC)

Capture ongoing changes from source and apply to target.

import json
from datetime import datetime

class CDCMigration:
    def __init__(self, source, target, checkpoint_table):
        self.source = source
        self.target = target
        self.checkpoint = checkpoint_table

    def get_last_checkpoint(self, stream):
        row = self.checkpoint.get(f'cdc_{stream}')
        if row:
            return json.loads(row['value'])
        return {'sequence': 0, 'timestamp': '1970-01-01'}

    def save_checkpoint(self, stream, sequence, timestamp):
        self.checkpoint.set(f'cdc_{stream}', json.dumps({
            'sequence': sequence,
            'timestamp': timestamp
        }))

    def capture_changes(self, stream):
        checkpoint = self.get_last_checkpoint(stream)

        changes = self.source.get_changes(
            stream,
            since_sequence=checkpoint['sequence'],
            since_timestamp=checkpoint['timestamp']
        )

        for change in changes:
            apply_change(change)
            self.save_checkpoint(stream, change.sequence, change.timestamp)

def apply_change(change):
    """Apply CDC change to target"""
    if change.operation == 'INSERT':
        target.insert(change.table, change.data)
    elif change.operation == 'UPDATE':
        target.update(change.table, change.key, change.data)
    elif change.operation == 'DELETE':
        target.delete(change.table, change.key)

CDC Implementation Patterns

AWS DMS (Database Migration Service)

# Configure DMS replication task
import boto3

dms = boto3.client('dms')

# Create endpoint for source
dms.create_endpoint(
    EndpointIdentifier='source-mysql',
    EndpointType='source',
    EngineName='mysql',
    MySQLSettings={
        'ServerName': 'source-db.example.com',
        'Port': 3306,
        'DatabaseName': 'production',
        'Username': 'dms_user',
        'Password': 'password'
    }
)

# Create endpoint for target
dms.create_endpoint(
    EndpointIdentifier='target-postgres',
    EndpointType='target',
    EngineName='postgres',
    PostgresqlSettings={
        'ServerName': 'target-db.example.com',
        'Port': 5432,
        'DatabaseName': 'production',
        'Username': 'dms_user',
        'Password': 'password'
    }
)

# Create replication instance
dms.create_replication_instance(
    ReplicationInstanceIdentifier='dms-instance',
    ReplicationInstanceClass='dms.t3.medium',
    AllocatedStorage=50,
    VpcSecurityGroupIds=['sg-123456'],
    AvailabilityZone='us-east-1a'
)

# Create migration task
dms.create_replication_task(
    ReplicationTaskIdentifier='migrate-users',
    SourceEndpointArn='arn:aws:dms:us-east-1:123456789:endpoint:SOURCE-ID',
    TargetEndpointArn='arn:aws:dms:us-east-1:123456789:endpoint:TARGET-ID',
    ReplicationInstanceArn='arn:aws:dms:...',
    MigrationType='full-load-and-cdc',
    TableMappings=json.dumps({
        'rules': [
            {
                'rule-type': 'selection',
                'rule-id': '1',
                'rule-name': 'migrate-users',
                'object-locator': {
                    'schema-name': 'production',
                    'table-name': 'users'
                },
                'rule-action': 'include'
            }
        ]
    })
)

Debezium for CDC

# Debezium connector configuration for Kafka
debezium_config = {
    "name": "users-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql-source",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "password",
        "database.server.id": "184054",
        "topic.prefix": "dbz",
        "database.include.list": "production",
        "table.include.list": "production.users,production.orders",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes",
        "include.schema.changes": "false",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "poll.interval.ms": "1000"
    }
}

# Consume CDC events
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'dbz.production.users',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

for message in consumer:
    change = message.value
    if change['op'] in ['c', 'u', 'd', 'r']:  # create, update, delete, read
        apply_to_target(change['payload'])

GCP Datastream

# GCP Datastream for MySQL to BigQuery migration
from google.cloud import datastream

client = datastream.DatastreamClient()

# Create source connection profile (MySQL)
source_profile = {
    "mysql_profile": {
        "hostname": "mysql-source.example.com",
        "port": 3306,
        "username": "datastream_user",
        "password": "password",
        "database": "production"
    }
}

source_connection = client.create_connection_profile(
    parent="projects/project-id/locations/us-central1",
    connection_profile_id="mysql-source",
    connection_profile=source_profile
)

# Create destination connection profile (GCS)
destination_profile = {
    "gcs_profile": {
        "bucket": "datastream-staging",
        "path": "backfill/",
        "gcs_bucket_folder": "backfill"
    }
)

# Create stream
stream = client.create_stream(
    parent="projects/project-id/locations/us-central1",
    stream_id="users-stream",
    source_connection_profile_name=source_connection.name,
    destination_connection_profile_name=destination_connection.name,
    backfill_all=True,
    stream={"mysql_exports": {"migrate_objects": [{"mysql_table": {"table_name": "users"}}]}}
)

Migration Validation

Record Count Validation

def validate_record_count(source, target, tables):
    """Validate record counts match"""
    results = {}
    for table in tables:
        source_count = source.count(table)
        target_count = target.count(table)
        results[table] = {
            'source': source_count,
            'target': target_count,
            'match': source_count == target_count
        }
    return results

# Example output
{
    'users': {'source': 1000000, 'target': 1000000, 'match': True},
    'orders': {'source': 5000000, 'target': 4999999, 'match': False}  # Problem!
}

Checksum Validation

import hashlib

def compute_table_checksum(conn, table, columns=None):
    """Compute MD5 checksum of table data"""
    if columns is None:
        columns = conn.get_columns(table)

    query = f"""
        SELECT MD5(
            CONCAT_WAGG(
                COALESCE(CAST({columns[0]} AS CHAR), ''),
                {' , '.join([f"COALESCE(CAST({c} AS CHAR), '')" for c in columns[1:]])}
            )
        ) as checksum
    """

    result = conn.execute(query)
    return result[0]['checksum']

def validate_checksums(source, target, table):
    """Validate checksums match"""
    columns = source.get_columns(table)
    source_checksum = compute_table_checksum(source, table, columns)
    target_checksum = compute_table_checksum(target, table, columns)

    return {
        'source': source_checksum,
        'target': target_checksum,
        'match': source_checksum == target_checksum
    }

Sample Validation

import random

def sample_validation(source, target, table, sample_size=1000):
    """Compare random sample of records"""
    source_records = source.sample(table, sample_size)
    target_records = target.sample(table, sample_size)

    # Build lookup for target
    target_lookup = {r['id']: r for r in target_records}

    mismatches = []
    for record in source_records:
        target_record = target_lookup.get(record['id'])
        if not target_record:
            mismatches.append({'id': record['id'], 'issue': 'missing in target'})
        elif record != target_record:
            # Find differences
            diffs = {k: {'source': record[k], 'target': target_record[k]}
                     for k in record if record[k] != target_record.get(k)}
            mismatches.append({'id': record['id'], 'issue': 'value_mismatch', 'diffs': diffs})

    return {
        'sampled': sample_size,
        'mismatches': len(mismatches),
        'match_rate': (sample_size - len(mismatches)) / sample_size,
        'samples': mismatches[:10]  # First 10 mismatches
    }

Data Type Mapping

Different databases have different data types. Map carefully.

Common Mappings

Source (MySQL)Target (PostgreSQL)
TINYINTSMALLINT
MEDIUMINTINTEGER
BIGINTBIGINT
VARCHAR(255)VARCHAR(255)
TEXTTEXT
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
DECIMAL(10,2)NUMERIC(10,2)

Data Type Validation

def validate_types(source, target, table):
    """Validate data types are compatible"""
    source_types = source.get_column_types(table)
    target_types = target.get_column_types(table)

    mismatches = []
    for col in source_types:
        if col not in target_types:
            mismatches.append(f"Column {col} missing in target")
        elif not compatible_types(source_types[col], target_types[col]):
            mismatches.append(
                f"Type mismatch for {col}: "
                f"{source_types[col]} -> {target_types[col]}"
            )

    return {
        'source_types': source_types,
        'target_types': target_types,
        'mismatches': mismatches
    }

def compatible_types(source_type, target_type):
    """Check if types are compatible"""
    # Simplified check
    numeric = ['INT', 'BIGINT', 'SMALLINT', 'DECIMAL', 'NUMERIC', 'FLOAT', 'DOUBLE']
    string = ['VARCHAR', 'CHAR', 'TEXT']
    date = ['DATE', 'TIME', 'DATETIME', 'TIMESTAMP']

    for category in [numeric, string, date]:
        if any(t in source_type.upper() for t in category) and \
           any(t in target_type.upper() for t in category):
            return True

    return source_type.upper() == target_type.upper()

Handling Schema Changes

Migrations often involve schema changes.

Add Column Migration

def add_column_migration(source, target, table, column_def):
    """Add column to target with default value"""
    # 1. Add column with default (non-blocking)
    target.execute(f"""
        ALTER TABLE {table}
        ADD COLUMN {column_def['name']} {column_def['type']}
        DEFAULT {column_def['default']}
    """)

    # 2. Backfill existing records
    backfill_column(source, target, table, column_def)

    # 3. Add NOT NULL constraint if needed
    if column_def.get('not_null'):
        target.execute(f"""
            ALTER TABLE {table}
            ALTER COLUMN {column_def['name']} DROP DEFAULT
        """)

def backfill_column(source, target, table, column_def):
    """Backfill column with values from source"""
    batch_size = 10000
    offset = 0

    while True:
        records = source.fetch_range(
            table,
            offset=offset,
            limit=batch_size
        )

        if not records:
            break

        # Transform records
        for record in records:
            record[column_def['name']] = transform_value(
                record,
                column_def
            )

        target.insert(table, records)
        offset += batch_size

Cutover Planning

Pre-Cutover Checklist

## Migration Cutover Checklist

### 1 Week Before

- [ ] Final validation runs clean
- [ ] Rollback procedure documented and tested
- [ ] All stakeholders informed
- [ ] Monitoring dashboards ready

### 1 Day Before

- [ ] Freeze application writes (if applicable)
- [ ] Run final sync
- [ ] Take application snapshots
- [ ] Verify backup integrity

### Cutover Day

- [ ] Stop writes to source
- [ ] Run final CDC sync
- [ ] Validate record counts
- [ ] Run checksum validation
- [ ] Point application to target
- [ ] Verify application works
- [ ] Monitor error rates

### Post-Cutover

- [ ] Monitor for 24 hours
- [ ] Keep source available for 1 week (emergency rollback)
- [ ] Decommission source after 1 week

Rollback Procedure

def rollback_migration(source, target, application_config):
    """Rollback to source database"""
    # 1. Stop application
    stop_application()

    # 2. Point to source
    application_config.update({
        'database_host': 'source-db.example.com',
        'database_port': 3306
    })

    # 3. Restart application
    restart_application()

    # 4. Verify
    assert application_config.is_connected_to_source()

Production Failure Scenarios

Migrations fail in ways that are predictable if you know where to look. Four scenarios come up repeatedly in production.

Scenario 1: Silent Data Loss During Bulk Load

Bulk load completes with a record count match but some rows are silently dropped due to type coercion or truncation errors.

What happens: A VARCHAR(50) column in the source contains 60-character strings. The target has VARCHAR(50). The load does not error — it truncates. Your application keeps running, but some customer records now have truncated names and nobody notices until customers call support.

Mitigation:

def validate_no_truncation(source, target, table):
    """Check for truncation by comparing string lengths"""
    source_lengths = source.execute(f"""
        SELECT id, LENGTH(string_column) as len
        FROM {table} HAVING LENGTH(string_column) > 50
    """)
    if source_lengths:
        raise ValueError(f"Found {len(source_lengths)} rows that would truncate in target")

    # Also check for silent type coercion
    target_rows = target.execute(f"SELECT COUNT(*) FROM {table}")
    source_rows = source.execute(f"SELECT COUNT(*) FROM {table}")
    if target_rows[0]['count'] != source_rows[0]['count']:
        raise DataLossError(f"Row count mismatch: {source_rows} vs {target_rows}")

Scenario 2: CDC Lag Exceeding Cutover Window

CDC replication falls behind during peak traffic. The lag grows beyond the cutover window, making the migration take much longer than planned.

What happens: Your CDC migration looks healthy during low-traffic hours. During peak traffic, the source generates changes faster than the CDC pipeline can apply them. What should have been a 2-hour lag becomes 18 hours. Your cutover is delayed and the downtime window expands.

Mitigation:

def monitor_cdc_lag(cdc_client, stream_name, max_acceptable_lag_minutes=30):
    """Alert if CDC lag exceeds acceptable window"""
    lag = cdc_client.get_lag(stream_name)
    lag_minutes = lag / 60

    if lag_minutes > max_acceptable_lag_minutes:
        send_alert(
            f"CDC lag is {lag_minutes:.0f} minutes — "
            f"cutover window at risk. "
            f"Consider pausing source writes or scaling CDC."
        )
    return lag_minutes

Scenario 3: Application Cutover to Corrupt Target State

Application points to target before validation completes. The target has post-migration inconsistencies that were not caught.

What happens: Application switches to target database mid-validation. A batch of records had datetime timezone inconsistencies during migration — times shifted by hours. The application starts making decisions on bad data before the validation report finishes running.

Mitigation:

def safe_cutover(source, target, application_config, validation_queries):
    """Only cut over after all validation queries pass"""
    results = {}
    for name, query in validation_queries.items():
        source_result = source.execute(query)
        target_result = target.execute(query)

        match = compare_results(source_result, target_result)
        results[name] = {'match': match, 'source': source_result, 'target': target_result}

    failures = [k for k, v in results.items() if not v['match']]
    if failures:
        raise SafetyCheckFailed(
            f"Validation failed for: {failures}. "
            f"Cutover blocked. Review results before proceeding."
        )

    # All validations passed — safe to cut over
    application_config.update({'database_host': target_host})
    return results

Scenario 4: Rollback Fails Due to Source Write Lock

You attempt rollback after cutover, but source database rejects writes because the application has not fully released its connections.

What happens: After pointing the application to the target, you attempt to “rollback” by re-enabling writes on the source. But some application instances are still connected to the source, holding locks. Writes fail. The application experiences errors on both systems simultaneously.

Mitigation:

def rollback_with_connection_drain(source, application_config, drain_timeout_seconds=60):
    """Safely drain source connections before rollback"""
    # 1. Confirm all app instances have switched
    remaining_connections = source.get_active_connections()
    if remaining_connections > 0:
        print(f"Waiting for {remaining_connections} source connections to drain...")
        time.sleep(drain_timeout_seconds)
        remaining_connections = source.get_active_connections()

    if remaining_connections > 0:
        raise RollbackBlocked(
            f"{remaining_connections} connections still on source. "
            f"Force-killing may cause data loss. "
            f"Investigate connection pool settings."
        )

    # 2. Now safe to re-enable writes on source
    source.enable_writes()

Performance Optimization

Bulk Loading

# PostgreSQL COPY for fast bulk loads
import psycopg2
from io import StringIO

def bulk_load_postgres(target_conn, table, records):
    """Use COPY for fast bulk loading"""
    buffer = StringIO()

    # Write CSV header
    if records:
        buffer.write(','.join(records[0].keys()) + '\n')

        # Write data
        for record in records:
            values = [str(v).replace('\n', ' ') for v in record.values()]
            buffer.write(','.join(values) + '\n')

    buffer.seek(0)

    cursor = target_conn.cursor()
    cursor.copy_from(
        buffer,
        table,
        sep=',',
        null=''
    )
    target_conn.commit()

Parallel Loads

from concurrent.futures import ThreadPoolExecutor

def parallel_load(target, table, records, num_workers=4):
    """Load records in parallel"""
    batch_size = len(records) // num_workers
    batches = [
        records[i:i + batch_size]
        for i in range(0, len(records), batch_size)
    ]

    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [
            executor.submit(target.load_batch, table, batch)
            for batch in batches
        ]

    # Wait for all to complete
    results = [f.result() for f in futures]
    return all(results)

Quick Recap

Key Takeaways:

  • Choose migration strategy based on downtime tolerance and risk
  • CDC enables continuous migration with minimal downtime
  • Always validate record counts, checksums, and samples
  • Plan rollback procedures before starting migration
  • Test the full migration process in staging first

Strategy Selection:

  • Zero downtime required: CDC with dual-write
  • Small dataset, low risk: Big bang with validation
  • Large dataset, risk-averse: Phased migration
  • Complex schema changes: Dual-write with shadow table

Critical Checks:

  • Record counts match between source and target
  • Checksums match (MD5 or similar)
  • Data types are compatible
  • No data corruption during transfer
  • Application works against target
  • Rollback procedure tested

For more on data pipeline patterns, see our AWS Data Services, GCP Data Services, and Azure Data Services guides. For cloud cost considerations during migration, our Cost Optimization guide covers reserved instances and storage tiering.

Category

Related Posts

Serverless Data Processing: Building Elastic Pipelines

Build scalable data pipelines using serverless services. Learn how AWS Lambda, Azure Functions, and Cloud Functions integrate for cost-effective processing.

#data-engineering #serverless #lambda

AWS Data Services: Kinesis, Glue, Redshift, and S3

Guide to AWS data services for building data pipelines. Compare Kinesis vs Kafka, use Glue for ETL, query with Athena, and design S3 data lakes.

#data-engineering #aws #kinesis

Azure Data Services: Data Factory, Synapse, and Event Hubs

Build data pipelines on Azure with Data Factory, Synapse Analytics, and Event Hubs. Learn integration patterns, streaming setup, and data architecture.

#data-engineering #azure #data-factory