Horizontal Sharding: Distribution Strategies for Scale

Learn database sharding strategies including shard key selection, consistent hashing, cross-shard queries, and operational procedures for distributed data.

published: reading time: 36 min read author: GeekWorkBench

Introduction

Databases have natural limits. A single server fits only so much data and handles only so many writes. When you exceed these limits, you distribute the load across multiple servers.

Sharding works when your data naturally partitions. User data partitions by user ID. Order data partitions by order ID. The shard key determines which server stores each record.

graph LR
    Users[Users Table] -->|user_id % 4| Shard0[(Shard 0)]
    Users -->|user_id % 4| Shard1[(Shard 1)]
    Users -->|user_id % 4| Shard2[(Shard 2)]
    Users -->|user_id % 4| Shard3[(Shard 3)]

This approach assumes data access follows the partition key. A user record is accessed using the user ID. Orders for that user are accessed using order ID plus user ID. As long as queries include the shard key, routing stays simple.

Sharding Tools Landscape

Different tools sit at different layers of the sharding stack. Choose based on how much control you need:

ToolTypeSharding ApproachBest For
VitessMiddleware (MySQL)Vertical sharding + horizontal shardingMySQL workloads needing scale, YouTube-scale deployments
PlanetScaleManaged VitessHorizontal sharding via Vitessteams wanting MySQL compatibility without managing Vitess
CockroachDBDistributed SQLAutomatic range-based shardingGlobal applications needing strong consistency
SpannerDistributed SQLAutomatic sharding, TrueTimeLarge-scale global applications with budget for managed solution
YugabyteDBDistributed SQLHash and range shardingPostgreSQL/Cassandra compatibility with scale
CitusPostgreSQL extensionHash-based shardingTeams on PostgreSQL needing distributed scale
KScaleManaged shardingAutomatic shardingTeams wanting managed sharding without database changes

Vitess was built at YouTube to scale MySQL beyond what a single MySQL instance could handle. It handles connection pooling, query routing, and resharding automatically. The tradeoff is operational complexity. Vitess itself requires expertise to run.

CockroachDB and Spanner both present a single logical database while sharding automatically underneath. The difference is consistency model: CockroachDB uses HLC-based consistency while Spanner uses TrueTime hardware. Spanner is more expensive but handles global writes with less latency due to its TrueTime guarantees.

For most teams, starting with a distributed database like CockroachDB or YugabyteDB eliminates the need for manual sharding entirely. The database handles distribution while you work with normal SQL.

Database-Specific Sharding Configuration

Each database handles sharding differently. Understanding the native options helps you decide whether to use built-in sharding or a middleware approach.

MongoDB Sharding

MongoDB provides built-in sharding with automatic chunk distribution:

// Enable sharding on the database
sh.enableSharding("myapp");

// Shard a collection by hashed shard key (for even distribution)
sh.shardCollection("myapp.orders", { order_id: "hashed" });

// Shard by range (when range queries matter)
sh.shardCollection("myapp.events", { timestamp: 1 });

// Check cluster status
sh.status();

// Manually split a chunk to enable migration
db.adminCommand({
  split: "myapp.orders",
  middle: { order_id: NumberLong(500000) },
});

// Move a chunk to a specific shard
db.adminCommand({
  moveChunk: "myapp.orders",
  find: { order_id: NumberLong(500000) },
  to: "shard2",
});

Shard key selection for MongoDB:

Use CaseShard KeyWhy
User datauser_id (hashed)Even distribution, user-specific queries efficient
Time-series eventsdevice_id + timestamp (compound)Query by device range, timestamps within device
E-commerceuser_id + order_id (compound)User orders together, order lookups by user
Multi-tenant SaaStenant_id (hashed)Tenant isolation, even load per tenant

PostgreSQL Citus

Citus extends PostgreSQL for distributed tables. It partitions tables across worker nodes while presenting a single PostgreSQL interface:

-- Install Citus extension on coordinator
CREATE EXTENSION IF NOT EXISTS citus;

-- Add worker nodes
SELECT citus_add_node('worker-1.internal', 5432);
SELECT citus_add_node('worker-2.internal', 5432);
SELECT citus_add_node('worker-3.internal', 5432);

-- Create distributed table (by hash)
CREATE TABLE users (
    user_id BIGSERIAL,
    email TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT now(),
    PRIMARY KEY (user_id)
);
SELECT create_distributed_table('users', 'user_id', shard_count => 16);

-- Create reference table (replicated to all nodes)
CREATE TABLE countries (
    country_code TEXT PRIMARY KEY,
    name TEXT NOT NULL
);
SELECT create_reference_table('countries');

-- Check shard placement
SELECT * FROM citus_shards;
SELECT * FROM citus_shard_placements;

Citus supports co-located tables, where joined data stays on the same shard, and distributed tables that span shards. The create_distributed_table function takes the shard key and number of shards as arguments.

CockroachDB

CockroachDB shards automatically by default. You control data distribution through range splitting and leaseholder placement:

-- Split a table into ranges by primary key
ALTER TABLE users SPLIT AT VALUES ('m'), ('t');

-- Move a range to a specific store
ALTER TABLE users EXPERIMENTAL_RELOCATE VALUES (1, 'store-1');

-- Check range status
SELECT * FROM crdb_internal.ranges_no_leases;

-- Set up locality-aware replication
ALTER TABLE users SET locality_aware_replication = true;

CockroachDB’s KV layer automatically splits ranges when they exceed the target size (default 512 MiB). Forcing specific placement is rarely needed. The optimizer handles most distribution decisions.

When to Use Built-In vs Middleware Sharding

ApproachExamplesBest For
Built-in database shardingMongoDB sharding, CockroachDB, SpannerWhen the database handles distribution natively
Extension-based shardingCitus for PostgreSQL, YugabyteDBWhen you want PostgreSQL compatibility with scale
Middleware shardingVitess, PlanetScaleWhen you need MySQL compatibility with Vitess routing
Application-level shardingCustom shard routerWhen no other option fits and you control all queries

Operational Procedures

Adding a Shard

Adding a shard with consistent hashing is the safest approach. The ring structure limits redistribution to neighboring keys:

def add_shard_with_consistent_hashing(router, new_shard_id, virtual_nodes=100):
    """
    Add a new shard to an existing consistent hash ring.
    Only keys in the new shard's range are redistributed.
    """
    # Record current state
    current_distribution = {
        shard: count_keys(router, shard)
        for shard in router.shards
    }

    # Add new shard to ring
    router.add_shard(new_shard_id, virtual_nodes)

    # Verify redistribution is limited to adjacent range
    new_distribution = {
        shard: count_keys(router, shard)
        for shard in router.shards
    }

    # Calculate migration percentage
    total_keys = sum(current_distribution.values())
    migrated = sum(abs(new_distribution.get(s, 0) - current_distribution.get(s, 0))
                   for s in set(new_distribution) | set(current_distribution)) // 2

    migration_pct = migrated / total_keys * 100
    print(f"Migration: {migration_pct:.1f}% of keys moved")

    return migration_pct < 10  # Expect <10% migration with consistent hashing

For MongoDB, adding a shard triggers automatic rebalancing of chunks across available shards. The balancer runs in the background and moves chunks gradually to avoid impacting performance.

For Citus, adding a worker node requires rebalancing distributed tables:

-- Add new worker
SELECT citus_add_node('worker-4.internal', 5432);

-- Rebalance distributed tables (runs in background)
SELECT citus_rebalance_start();

Removing a Shard

Shard removal requires migrating data before decommissioning. Never remove a shard until all data has moved:

def remove_shard(router, shard_to_remove):
    """
    Safely remove a shard by migrating data first.
    """
    # 1. Stop routing new traffic to shard
    router.deactivate_shard(shard_to_remove)

    # 2. Migrate all data
    for key in router.get_keys_for_shard(shard_to_remove):
        new_shard = router.get_shard(key)  # Will route to new owner
        data = router.get_data(shard_to_remove, key)
        new_shard.put(key, data)

    # 3. Verify migration complete
    remaining = router.count_keys(shard_to_remove)
    if remaining > 0:
        raise ValueError(f"Shard still has {remaining} keys")

    # 4. Remove from ring
    router.remove_shard(shard_to_remove)

In MongoDB, use mongosh to migrate chunks off a shard before removing it. Always verify chunk counts and document migration before decommissioning.

Shard Migration Playbook

Live shard migration involves four phases: setup, dual-write, cutover, and cleanup. Each phase has specific validation gates.

Phase 1 — Setup:

  1. Provision the new shard and verify it is healthy
  2. Update the routing layer to recognize the new shard
  3. Enable replication from source shard to new shard
  4. Verify initial sync is complete before proceeding
def migration_phase1_setup(router, source_shard, new_shard):
    """Phase 1: Initialize migration infrastructure."""
    # Verify new shard is reachable and healthy
    if not new_shard.is_healthy():
        raise MigrationError("New shard failed health check")

    # Configure replication from source to target
    router.configure_replication(source_shard, new_shard)

    # Verify initial sync catches up
    if not new_shard.is_synced(source_shard):
        raise MigrationError("Initial sync incomplete")

    print(f"Phase 1 complete: {source_shard}{new_shard} replication active")

def migration_phase2_dual_write(router, source_shard, new_shard):
    """Phase 2: Enable dual-write to both shards."""
    router.enable_dual_write(source_shard, new_shard)

    # Verify writes go to both
    test_key = f"__migration_test_{new_shard.id}__"
    router.write(test_key, {"test": True}, dual_write=True)

    if not new_shard.has_key(test_key):
        raise MigrationError("Dual-write verification failed")

    router.delete(test_key)
    print(f"Phase 2 complete: dual-write active for {new_shard.id}")

def migration_phase3_cutover(router, old_shard, new_shard):
    """Phase 3: Switch routing to new shard, stop writes to old."""
    # Verify all data migrated via checksum
    if not router.verify_checksum(old_shard, new_shard):
        raise MigrationError("Checksum mismatch — migration incomplete")

    # Switch routing
    router.switch_shard(old_shard, new_shard)
    router.disable_write(old_shard)

    print(f"Phase 3 complete: cutover to {new_shard.id}")

def migration_phase4_cleanup(router, old_shard, new_shard):
    """Phase 4: Remove old shard, verify new shard is healthy."""
    # Verify no traffic going to old shard
    if router.count_requests(old_shard) > 0:
        raise MigrationError("Old shard still receiving traffic")

    # Remove from ring
    router.remove_shard(old_shard)

    # Verify new shard handles all traffic
    if not new_shard.is_healthy():
        raise MigrationError("New shard unhealthy after cleanup")

    print(f"Phase 4 complete: {old_shard.id} removed, {new_shard.id} active")

def run_migration_playbook(router, source_shard, new_shard):
    """Execute all four phases of shard migration."""
    migration_phase1_setup(router, source_shard, new_shard)
    migration_phase2_dual_write(router, source_shard, new_shard)
    migration_phase3_cutover(router, source_shard, new_shard)
    migration_phase4_cleanup(router, source_shard, new_shard)
    print("Migration playbook complete")

Validation gates per phase:

PhaseGate
Phase 1 — SetupNew shard healthy, initial sync complete
Phase 2 — Dual WriteVerification write confirmed on new shard
Phase 3 — CutoverChecksum matches, zero divergence
Phase 4 — CleanupOld shard has zero traffic, removed from ring

Security checklist for migration operations:

  • Implement authentication at both router and shard level
  • Use TLS encryption for all router-to-shard communication
  • Apply least privilege for router database users
  • Audit log all cross-shard operations
  • Encrypt data at rest on each shard
  • Implement network segmentation between shards
  • Secure shard-to-shard replication channels
  • Regular security reviews of routing logic
  • Implement access controls for rebalancing operations
  • Backup encryption keys separately from shard data

Best Practices Summary

Design Phase:

  • Choose high-cardinality shard keys with even write distribution
  • Model access patterns before selecting shard key — >80% of queries should filter by the key
  • Test distribution with production-like data before going live
  • Start with consistent hashing rather than modulo hashing

Migration Phase:

  • Use virtual nodes for gradual, low-impact resharding
  • Implement dual-write during migration to maintain consistency
  • Validate checksum before cutover; verify no divergence between old and new shards
  • Set per-shard timeouts to prevent slow shards from blocking scatter-gather queries

Operations Phase:

  • Monitor per-shard latency, storage, and connection pool utilization
  • Alert on shard distribution skew >2x and storage >80% capacity
  • Maintain replication factor >= 2 per shard for fault tolerance
  • Plan shard split before hitting limits — do not wait for crisis

Anti-Patterns to Avoid:

  • Sharding before exhausting simpler scaling options (vertical, replicas, caching)
  • Using timestamp or date-based keys for high-write workloads
  • Cross-shard transactions as a primary pattern — redesign schema instead
  • Assuming linear scalability — adding shards gives diminishing returns

Real-World Case Studies

Instagram’s Sharding Evolution

Instagram’s user database started on a single PostgreSQL server. As they grew, they added read replicas, then started partitioning. Their sharding evolution went through three generations.

First generation was pure UUID-based sharding: user_id determined the shard via modulo. Simple but inflexible — adding shards required changing the modulo divisor, which meant remapping every user to a new shard. They managed this with a migration that ran for months while serving live traffic.

Second generation moved to consistent hashing with virtual nodes. Adding a shard now moved only the neighboring keys instead of remapping all users. But they still had a problem: their shard key was user_id alone, and Instagram’s user distribution was not uniform. Some users were more active than others, and some users had relationships with much larger audiences, creating write hotspots for specific shards.

Third generation introduced application-level sharding: the application layer decided not just which shard a user’s data lived on, but also cached user data aggressively and used a fanout-on-read model for celebrity activity feeds. Write amplification for celebrity posts was managed by separating celebrity posts from regular user posts — celebrity posts were sharded differently (by post_id) than regular posts (by user_id).

The lesson: Instagram’s biggest challenge was not distributing data evenly but managing the cross-shard queries that their feed and notification systems required. They solved this with denormalization — pre-computing follower feeds and storing them close to the users who needed them. The shard key was not just a storage decision, it was an access pattern decision.

Real-World Failure Scenarios

Understanding how sharding systems fail in production helps you design more resilient architectures. These scenarios are drawn from real incidents and common pitfalls.

Hot Shard Due to Hash Collision

A ride-sharing company used driver_id as the shard key with hash-based partitioning. All drivers were assigned sequential IDs during onboarding. When their batch import process ran, it created thousands of drivers with consecutive IDs. Since hash(driver_id) mapped sequential IDs to the same hash ring position, writes from the batch job all landed on a single shard, creating a write hot spot that caused 40x latency spikes during import windows.

Fix: Add entropy to the shard key — use hash(driver_id + random_salt) or a different hash function that spreads sequential IDs across the ring. Never assume IDs have uniform distribution.

Router Table Corruption During Reshard

A social media company running on Vitess performed a resharding operation. During the migration, a network partition interrupted the routing table update. Some Vitess tablets received the new routing configuration while others did not. Requests for migrated keys were routed to old shards that no longer held the data, resulting in missing posts and conversations for affected users for 3 hours before the issue was identified.

Fix: Use a two-phase routing update: (1) deploy new routing rules that check both old and new shard locations, (2) verify all keys migrated via checksum, (3) switch to new routing once migration is 100% complete. Implement circuit breakers that fail closed (return error) for keys that should be on a new shard but are not found.

Cross-Shard JOIN Timeout Cascade

A gaming company used a microservices architecture with a sharded PostgreSQL cluster. One service executed a report that required a JOIN across all shards. The query coordinator sent sub-queries to 8 shards. One shard had a slow disk due to a failed drive that was being rebuilt — it took 45 seconds to respond while others responded in under 100ms. The query coordinator had no per-shard timeout, so the entire report request timed out after 60 seconds, consuming connection slots on all shards and causing cascading timeouts for other services.

Fix: Implement per-shard query timeouts. If a shard exceeds its timeout, fail the request with a partial result error rather than blocking indefinitely. Monitor per-shard latency in the routing layer and mark slow shards as unhealthy until the issue is resolved.

Schema Migration on Sharded Database

A fintech company needed to add a new column to a table spanning 16 shards. They used ALTER TABLE directly on each shard during low-traffic windows. However, one shard had a significantly larger table due to uneven data distribution. The ALTER took 45 minutes on that shard while others completed in under 5 minutes. During the migration window, the application code that expected both schemas (old and new) ran in production — but the large shard was still running the old schema while smaller shards had the new schema. Queries that joined data across shards failed because of type mismatches on the new column.

Fix: For sharded schema migrations, run ALTERs in stages: (1) add the column as nullable with no NOT NULL constraint, (2) deploy application code that handles both schemas, (3) backfill default values for existing rows on each shard, (4) add NOT NULL constraint once all rows have valid values. Use pt-online-schema-change or similar tools that handle backfill automatically. Always test schema changes on the largest shard first.

Shard Rebalancing Data Loss

A distributed key-value store using consistent hashing performed a resharding operation. The rebalance process migrated keys from old shards to new shards. During migration, a write arrived for a key that had been partially migrated — the key existed on both old and new shards with different values. The router, configured to prefer the new shard, returned stale data to the user. When the user updated their data, the update went to the new shard while the old shard still held the previous version. When the old shard was decommissioned, the newer version was lost.

Fix: Use a write-lock during migration for active keys: either block writes to keys being migrated, or use a versioning scheme that lets you detect and resolve conflicts. Implement dual-write during migration so both shards see all writes, and only decommission the old shard once all writes for the migration window have been verified on the new shard.

Trade-off Analysis

Sharding involves fundamental trade-offs across multiple dimensions. Understanding these helps you make informed architectural decisions.

ScenarioModulo ShardingConsistent HashingDistributed SQL (CockroachDB/Spanner)
Shard addition100% data migration~33% migration (doubling scenario)Automatic, ~0% application migration
Operational complexityLow (simple math)Medium (ring management)Low (database handles it)
Hot spot riskHigh (no virtual nodes)Low (virtual nodes even distribution)Medium (automatic but unaware of access patterns)
Cross-shard query performanceSame as consistent hashingScatter-gather requiredOptimizer handles, but distributed JOINs are expensive
Consistency modelDepends on databaseTypically eventualStrong (Spanner TrueTime) or eventual (CockroachDB follower reads)
CostLow (uses standard DB)Medium (custom routing layer)High (managed service or complex self-hosted)
Schema changesComplex across shardsComplex across shardsOnline schema changes supported
Write latencyLowest (single shard)Low (single shard per key)Higher (distributed consensus required)
Read latencyLowestLow (local replica option)Variable (follower reads vs primary)

When to Choose Each Approach

Modulo sharding is acceptable for write-once read-many workloads where data is never re-sharded. It is the wrong choice for any production system expecting growth. If you are using modulo, plan your migration path to consistent hashing before you need it.

Consistent hashing is the right choice when you need fine-grained control over shard placement, you expect to add and remove shards frequently, or you want to minimize migration percentage when resharding. The tradeoff is building or maintaining a routing layer.

Distributed SQL is the right choice when you want to focus on application logic rather than database operations, you need strong consistency across global deployments, and your team lacks distributed systems expertise. The tradeoff is higher latency for distributed transactions and higher cost for managed solutions.

Shard Key Cardinality Trade-offs

Shard Key TypeMax Shards PossibleHot Spot RiskRange Query Support
tenant_id (10 tenants)10High if tenant sizes differEfficient within tenant
user_id (millions of users)Limited by hash spaceLowRequires scatter-gather
order_id (GUID/UUID)Virtually unlimitedVery lowRequires scatter-gather
tenant_id + entity_id (composite)Limited by tenant cardinalityMediumEfficient within tenant + entity
hashed(user_id)Virtually unlimitedVery lowRandom distribution, no range support

Interview Questions

1. You are designing a sharding strategy for a multi-tenant SaaS application. Each tenant has a different data volume — some have millions of rows, others have hundreds. What shard key do you choose and why?

Use tenant_id as the shard key if tenants are roughly equal in size and you frequently query by tenant. If tenant sizes are very uneven (some tenants have 10,000x more data than others), tenant_id alone creates hot spots for large tenants — those shards become overloaded while others sit idle. For uneven tenants, consider sharding by tenant_id plus a sub-key like entity_type or using hash partitioning on a composite of tenant_id and entity_id. An alternative for very large tenants is to give them dedicated shards, but this adds operational complexity.

2. Your application uses consistent hashing with 8 shards. You need to add 4 more shards to handle growth. How much data migration happens?

With consistent hashing and a properly distributed hash ring, adding 4 shards to an 8-shard ring moves roughly 1/12 of existing keys to new shards (each new shard claims about 1/12 of the ring space, and each displacement affects keys at the boundary). More precisely, with virtual nodes each new shard takes roughly 1/(old_shard_count + new_shard_count) of the total key space, so you expect about 33% of keys to migrate when doubling from 8 to 12 shards. Without consistent hashing (plain modulo), 100% of keys would migrate.

3. What happens to your cross-shard JOIN queries when one shard is significantly slower than others?

The slow shard becomes the bottleneck for the entire query. In a scatter-gather query across 8 shards, if one shard has a disk I/O problem and takes 5 seconds while others take 100ms, the entire query takes 5 seconds — the query cannot return until the slowest shard responds. The fix is to implement per-shard timeouts and either fail the query or return partial results with a warning. Alternatively, detect slow shards via health monitoring and route queries away from them while the issue is investigated.

4. When would you use a distributed SQL database like CockroachDB over manual sharding with PostgreSQL?

Use CockroachDB when you want the scalability benefits of sharding but without the operational complexity of managing shard distribution yourself. CockroachDB handles automatic resharding, rebalancing, and failover transparently. You write normal SQL and the database handles distributing data across nodes. The tradeoff is performance overhead — CockroachDB's distributed transactions have higher latency than single-node PostgreSQL, and you pay a premium for the strong consistency guarantees. If your team lacks the expertise to manage manual sharding and you can tolerate higher latency for distributed operations, CockroachDB is a reasonable choice.

5. You need to migrate from modulo-based sharding to consistent hashing without downtime. Walk through the migration strategy and how you handle the transition period.

Migration from modulo to consistent hashing requires a multi-phase approach. First, introduce a routing layer that can translate between old modulo keys and new consistent hash ring positions — this is the dual-write phase where new writes go to both schemes while reads check both. Second, backfill historical data from old shards to new shard locations, verifying each key's checksum matches. Third, migrate reads to prefer the new consistent hash routing while still falling back to old shard for any keys not yet migrated. Fourth, once all reads and writes are on the new scheme and old shard has zero traffic, decommission the old routing logic. The critical path is verifying data integrity during the transition — use a background checksum job that compares old and new values for every key and alerts on divergence. The total migration time depends on data volume; for large datasets, expect days to weeks of dual-write operation.

6. How do you design a sharding strategy for a social media application where celebrity users have orders of magnitude more activity than regular users?

Celebrity hot spots require separating the hot data from the cold data. One approach is a two-tier sharding scheme: regular users are sharded by `user_id` hash as normal, but posts from celebrity users (identified by a `is_celebrity` flag or follower count threshold) are sharded by `post_id` hash instead. This prevents celebrity write volume from overwhelming the shard containing their user record. Alternatively, use a fanout-on-write to fanout-on-read swap: celebrity posts go to a single shard keyed by `post_id`, and when a user's feed is loaded, the system fetches celebrity posts from that shard rather than pre-computing. Another option is dedicated shard assignment for top-K celebrities — these users get their own shard or set of shards with higher replication factor for availability. The trade-off is added query complexity for celebrity content versus write hot-spot risk.

7. Explain how you would implement shard-aware connection pooling. How does connection management differ from a single-node database?

In a sharded environment, each shard needs its own connection pool. A naive approach of a single global pool to a proxy routing layer works but hides the per-shard connection state from the application. A better approach is a router-level pool that maintains pools per shard: the router holds connections to each physical shard, and the application gets connections from the router without knowing which shard a given key maps to. Per-shard connection pools allow independent tuning — hot shards can have more connections while cold shards conserve resources. The challenge is connection lifetime: when a shard is added or removed, existing connections are disrupted. Use circuit breakers per shard to isolate failures: if shard-2 is unreachable, the router marks it unhealthy, returns errors for keys on shard-2, and continues serving other shards rather than having one shard's failure cascade to all requests.

8. A customer reports their queries are timing out after you added 2 new shards to a 4-shard cluster. The old 4 shards are healthy. What could be wrong?

With consistent hashing, adding shards only affects keys in the new shards' hash ranges — existing keys on old shards should be unaffected. The most likely culprit is a bug in the routing table update: if the new shards were added but the router's ring was not updated correctly, queries for keys that should route to the new shards instead hit a fallback path (e.g., scatter-gather to all shards) that times out. Another possibility is that the new shards are under-provisioned — they are serving requests but running at high CPU/disk utilization causing slow responses. A third possibility is that the new shard addition triggered a rebalance that is still in progress, and the old shards are overloaded handling both their existing traffic and the migration data stream. Check the router's shard membership, verify the new shards' resource utilization, and inspect whether rebalancing has completed.

9. How do you handle schema changes across shards? Specifically, how do you add a new column to a table that spans 8 shards without taking the database offline?

Schema changes in a sharded database require a multi-phase approach. First, deploy the application code change that handles both the old schema (without the new column) and the new schema (with the column). The application must be tolerant of the column being absent. Second, run an online schema migration: for each shard, issue an ALTER TABLE statement during low-traffic windows. Use pt-online-schema-change (for MySQL) or similar tools that create a shadow table, copy data in batches, then swap. Third, backfill the new column's default value for all existing rows: since the column now exists, you can set a default so existing rows get a valid value without needing to individually update every row. Fourth, once all shards have the new schema and the application has been updated to use the new column, stop writing the old column value. The key constraint is that the application must handle both schemas simultaneously during the transition — the migration is only safe if old and new code can run side by side.

10. What is shard affinity and how does it affect multi-region sharding deployments? How do you handle the case where most of your traffic comes from one region but your shards are evenly distributed?

Shard affinity means routing requests to the shard that is geographically closest to the user making the request. In a multi-region deployment, if your shards are evenly distributed but 90% of traffic originates from us-east-1, most requests incur cross-region latency hitting shards in eu-west or ap-south. The fix is to replicate data widely for reads (all regions have copies of hot data) but route writes to the primary shard regardless of region. For read-heavy workloads, use a regional read-replica of each shard — the routing layer directs reads to the local replica and only routes writes to the primary. CockroachDB and Spanner handle this automatically via follower reads. Another approach is to skew shard placement so hot shards (those serving the majority of traffic) live in the region with most users. For YouTube-scale, you accept that writes to celebrity posts may be slow but reads are fast.

11. You are running a sharded PostgreSQL with Citus. A query that used to take 50ms now takes 3 seconds. The only change is that one tenant's data grew 100x. Walk through your diagnosis.

With Citus, if the tenant is on a single shard (Citus co-locates rows with the same tenant_id), that shard now holds 100x more data. First check which shard is hosting the tenant and whether that shard's disk I/O has saturated: a single shard serving 100x data likely has much higher read amplification — queries that were index-only scans now do full scans. Check disk throughput on the shard's worker node. Second, check whether the working set no longer fits in memory: with 100x more data, the buffer pool is overwhelmed and queries spill to disk. Third, verify that index statistics are up to date — the planner may have outdated estimates causing poor plan selection. Fourth, check for lock contention: if the tenant's queries involve sequences or serial columns, they may serialize on the sequence. The fix likely involves partitioning the tenant's table further within the shard, adding an index on the most common filter, or splitting the tenant across shards using a sub-tenant key.

12. How does read repair work in an eventually-consistent sharded database? Under what conditions could a read repair cause data loss?

Read repair runs during read operations: when you read a key from multiple replicas and find versions differ, you write the latest version back to the stale replicas. This is called read repair because it repairs divergence lazily during reads rather than requiring a background process. Read repair works for eventually-consistent databases but has a gap: if a replica is down when you read, it misses the repair and continues holding stale data until the next read that hits it. Under what conditions could read repair cause data loss? If the most recent write was accepted by only one replica (write quorum W=1) and that replica fails before the data is replicated, the write is lost — subsequent reads may repair other replicas with the lost value's predecessor, but the actual write is gone. Read repair cannot recover writes that were never replicated. This is why Dynamo-style systems recommend W > 1 for critical data: with W=1 and R=1, a single node failure can lose writes.

13. Design a rate-limiting system that works correctly across sharded database instances. The requirement is that each user can make 100 API calls per minute across all shards.

A naïve per-shard counter fails because it allows 100 calls per shard — with 4 shards, a user gets 400 calls per minute. You need a shared counter. The best approach is a centralized rate-limit store: use Redis with a sliding window log or counter keyed by `user_id`. Each API request first checks Redis — if the count exceeds 100, reject immediately; if not, increment and proceed. This central store becomes a bottleneck at high traffic. A better distributed approach uses a two-level scheme: each shard maintains a local approximate count, and periodically sync to a central store for accuracy. A more sophisticated approach uses a token bucket with a deterministic algorithm: compute the user's token budget from a timestamp and user_id hash locally, without any cross-shard coordination. For 100 calls/minute, compute how many tokens the user should have based on elapsed time, and reject if the local counter exceeds the budget. This is eventually consistent — a user could burst slightly across shards — but is simple and requires no coordination.

14. Your sharded database has a hot shard despite using consistent hashing with 150 virtual nodes per shard. Analysis shows one particular user_id range has 10x more writes than other ranges. The data is not skewed by user — the hot range contains many different users. What could be causing this and how do you fix it?

Even with consistent hashing and virtual nodes, a hash collision at the application level can create hot spots. If a batch job, cron job, or background worker is generating writes with sequential or monotonic user IDs (e.g., processing users in order of creation), those sequential IDs hash to the same or adjacent hash ring positions. With enough virtual nodes, the distribution across physical shards is even — but if the hash function itself maps sequential IDs to a narrow band of the hash space, those writes all land on the same shard. The fix is to add entropy to the shard key: instead of user_id, use hash(user_id + random_salt) or include a sub-key like entity_type. Alternatively, inject a random shard index into the key for write-heavy batch workloads. A second cause: if you use a composite key like user_id + timestamp and the batch job happens to process users whose timestamps all fall within a recent window, those composite keys cluster on one shard. Use a hash of the full composite key rather than range-based within the composite.

15. What are the trade-offs between using a shard key with high cardinality versus low cardinality? Provide examples of when each approach makes sense.

High-cardinality shard keys (like user_id, order_id) distribute writes evenly across shards because each key value maps to a unique point on the hash ring. This maximizes parallelism and write throughput. The downside is that range queries across all users (e.g., "find all orders from the last 30 days") become scatter-gather operations hitting every shard.

Low-cardinality shard keys (like tenant_id in a SaaS app with only 10 tenants) limit the maximum shard count — you cannot have more shards than unique key values. This creates hot spots if one tenant generates far more traffic than others. Low-cardinality keys work well when: you need to co-locate related data (all data for a tenant on one shard), queries almost always filter by the low-cardinality key, and the key values have similar write volumes.

16. Your engineering team is debating between Vitess (middleware sharding) and CockroachDB (distributed SQL). What factors determine which is the right choice?

Choose Vitess when: you have existing MySQL expertise, your application is already MySQL-compatible, you need fine-grained control over sharding behavior, and you are willing to invest in operating the Vitess layer itself. Vitess is battle-tested at YouTube-scale but adds operational complexity — you are managing both MySQL and the routing middleware.

Choose CockroachDB when: you want minimal operational overhead, you need strong consistency guarantees across global deployments, you prefer writing standard SQL without thinking about shard routing, and you can tolerate higher latency for distributed transactions (CockroachDB's distributed writes have more overhead than single-node MySQL). CockroachDB handles resharding and rebalancing automatically.

The key decision factor is whether you prioritize operational control (Vitess) or operational simplicity (CockroachDB). If your team lacks distributed systems expertise and you need global strong consistency, CockroachDB is lower risk. If you have MySQL expertise and need fine-grained tuning, Vitess gives you more levers.

17. How does the choice between synchronous and asynchronous replication affect your shard quorum configuration?

Synchronous replication requires all replicas to acknowledge a write before it is considered complete. For quorum-based writes, this means W replicas must all respond synchronously — latency equals the slowest replica's acknowledgment time. With synchronous replication and a quorum of W=2 out of R=3, you get strong consistency but at the cost of write latency (you wait for at least 2 replicas).

Asynchronous replication acknowledges writes immediately after the primary commits, then replicates to replicas in the background. This gives lower write latency but creates a window where data can be lost if the primary fails before replication completes. For quorum configurations with asynchronous replication, you typically set W=1 (primary accepts alone) and rely on read repair or background replication to propagate updates — but this sacrifices strong consistency.

The tradeoff: synchronous replication + quorum writes = strong consistency, higher latency; asynchronous + W=1 = lower latency, risk of lost writes. Most production systems use semisynchronous replication (wait for at least one replica) to balance these concerns.

18. What is the impact of shard key colocation on join performance? How do you decide whether to co-locate or distribute related data?

When related data (e.g., a user's orders) is co-located on the same shard as the user record, joins are fast — they execute within a single shard without cross-shard communication. When data is distributed across shards, joins require either shuffling data between shards (expensive) or denormalizing to avoid joins altogether.

The decision depends on query patterns: if you frequently join two entities and they both appear in the same query often, co-locate them on the same shard using a composite shard key like (user_id, order_id). If joins are rare and most queries are point lookups by a single entity, distribute freely to maximize write parallelism.

Citus addresses this with co-location concepts — tables can be co-located so that rows with the same tenant_id are on the same worker node, enabling efficient joins within a tenant without network overhead. The tradeoff is that co-location constrains your shard key choice and can create hotspots if the co-located key has uneven distribution.

19. Describe a scenario where shard rebalancing fails mid-migration. How do you detect the failure and recover safely?

Shard rebalancing can fail if the destination shard becomes unavailable, if network connectivity between source and destination is interrupted, or if the migration job crashes after partially migrating some keys but before updating the routing table. The danger is split-brain: some keys have migrated and route to the new shard, while others still route to the old shard.

Detection: compare key counts between source and destination shards; a large discrepancy indicates incomplete migration. Run checksum validation on a sample of migrated keys to verify data integrity. Monitor the routing table version — if it was updated before failure, some requests may already be routing to the new shard.

Recovery: pause all write traffic, determine the last successfully migrated key (from migration logs), then either complete the migration from that point or roll back by re-routing all keys to the old shard and resuming migration. Use a dual-write window during migration so that both old and new shards stay consistent even during interruptions. The safest approach is to never update the routing table until migration is 100% complete and verified.

20. How does geographic shard placement affect read latency? What strategies minimize cross-region traffic while maintaining consistency?

In a multi-region deployment, if shards are evenly distributed but 80% of traffic originates from us-east-1, most reads incur cross-region latency hitting shards in eu-west or ap-south. The further the physical distance between user and shard, the higher the latency — roughly 1ms per 100km for fiber optic links.

Strategies to minimize cross-region traffic: (1) Follower reads — replicate hot data to all regions, route reads to the local replica, and only route writes to the primary shard in its home region. (2) Skew shard placement — place more shard replicas in regions with higher traffic so reads are served locally. (3) Geolocation-based routing — route requests to the nearest shard based on user IP, with writes always going to the primary.

The consistency tradeoff: follower reads in eventually-consistent replicas may return stale data. For strong consistency, reads must go to the primary or wait for quorum acknowledgment from replicas in multiple regions, which increases latency. CockroachDB and Spanner expose follower read APIs with consistency guarantees that let you choose between low-latency stale reads and higher-latency consistent reads per query.

Further Reading

Papers:

  • “Dynamo: Amazon’s Highly Available Key-value Store” — foundational paper on consistent hashing and eventual consistency in production systems
  • “Bigtable: A Distributed Storage System for Structured Data” — sharding model behind Google’s database infrastructure
  • “Spanner: Google’s Globally Distributed Database” — TrueTime and global consistency guarantees

Books:

  • Designing Data-Intensive Applications by Martin Kleppmann — comprehensive coverage of partitioning, replication, and consistency models
  • Database Internals by Alex Petrov — deep dive into storage engines, B-trees, and LSM trees underlying distributed databases

Tools and References:

Related Topics:

Conclusion

| Use this checklist when designing or reviewing a sharding strategy.

Shard Key Selection: high cardinality, even write distribution, queries include the key, no monotonic patterns.

Consistent Hashing: use virtual nodes, plan for resharding before hitting limits, verify migration stays under 10% when adding shards.

Cross-Shard Query Design: denormalize to avoid JOINs across shards, per-shard timeouts, circuit breakers per shard.

Operations: monitor per-shard latency and storage, alert on skew over 2x and storage over 80%, maintain replication factor at least 2, test shard operations in staging first.

Category

Related Posts

Asynchronous Replication: Speed and Availability at Scale

Learn how asynchronous replication works in distributed databases, including eventual consistency implications, lag monitoring, and practical use cases where speed outweighs strict consistency.

#distributed-systems #replication #eventual-consistency

Amazon DynamoDB: Scalable NoSQL with Predictable Performance

Deep dive into Amazon DynamoDB architecture, partitioned tables, eventual consistency, on-demand capacity, and the single-digit millisecond SLA.

#distributed-systems #databases #amazon

Apache Cassandra: Distributed Column Store Built for Scale

Explore Apache Cassandra's peer-to-peer architecture, CQL query language, tunable consistency, compaction strategies, and use cases at scale.

#distributed-systems #databases #cassandra