Horizontal Sharding: Distribution Strategies for Scale

Learn database sharding strategies including shard key selection, consistent hashing, cross-shard queries, and operational procedures for distributed data.

published: reading time: 24 min read

Horizontal Sharding: Distribution Strategies for Massive Datasets

Sharding splits your data across multiple database servers. Each shard holds a subset of your total data. When done well, you can scale writes linearly by adding more shards.

The complexity cost is high. Sharding introduces challenges around query routing, data distribution, and cross-shard operations. Most applications reach for sharding too early. Vertical scaling and read replicas handle impressive workloads.

I have seen teams spend months implementing sharding when a larger server would have sufficed. The engineering cost is real. Only shard when you have exhausted simpler options.

Why Shard

Databases have natural limits. A single server fits only so much data and handles only so many writes. When you exceed these limits, you distribute the load across multiple servers.

Sharding works when your data naturally partitions. User data partitions by user ID. Order data partitions by order ID. The shard key determines which server stores each record.

graph LR
    Users[Users Table] -->|user_id % 4| Shard0[(Shard 0)]
    Users -->|user_id % 4| Shard1[(Shard 1)]
    Users -->|user_id % 4| Shard2[(Shard 2)]
    Users -->|user_id % 4| Shard3[(Shard 3)]

This approach assumes data access follows the partition key. A user record is accessed using the user ID. Orders for that user are accessed using order ID plus user ID. As long as queries include the shard key, routing stays simple.

Choosing a Shard Key

The shard key determines data distribution. A bad key creates hot spots and uneven load. The ideal key distributes writes evenly and allows efficient queries.

User ID or account ID often works well. These are high-cardinality, evenly distributed values. Queries naturally filter by user ID.

Time-based keys are common for event data. But time creates hot spots. New data all writes to the current time partition. Historical data becomes cold.

# Good: user ID distributes writes evenly
shard_key = user_id
shard = hash(user_id) % num_shards

# Bad: timestamps cluster writes
shard_key = created_at
shard = hash(created_at) % num_shards  # Recent timestamps hit same shard

Composite keys can help. User ID plus entity type distributes writes but keeps related data together. This helps when you often query user data for a specific entity type.

Shard Key Selection by Use Case

Choose your shard key based on the dominant access pattern. The key must appear in most queries to avoid scatter-gather across all shards:

Use CaseRecommended Shard KeyAlternative KeyAvoidReason
User data / accountsuser_id (hash)emailcreated_atEven distribution, user queries are primary access
E-commerce ordersuser_id (hash) + order_idorder_id alonecreated_atUser orders clustered, range by date needs separate index
IoT sensor datadevice_id (hash)device_id + timestamptimestamp aloneDevices are independent, timestamp alone clusters writes
Multi-tenant SaaStenant_id (hash)tenant_id + entity_idShared keys across tenantsTenant isolation, each tenant’s data separate
Time-series metricsmetric_name + time_buckettime_bucket alonetimestamp aloneWrite load distributed by metric, range queries within metric
Social feedsuser_id (hash)post_idcreated_atFeed queries by user, posts distributed evenly
Financial transactionsaccount_id (hash)transaction_idtimestamp aloneAccount lookups primary, ordering within account
Log aggregationsource_id (hash)source_id + timestamptimestamp aloneSources are independent, logs queried by source

Decision rules:

  1. Identify your most frequent query pattern (point lookup, range scan, aggregation)
  2. Find a field that appears in >80% of queries
  3. Check cardinality — must be high enough to distribute writes evenly
  4. Test distribution with production-like data before going live

Operational Procedures

Adding a Shard

Adding a shard with consistent hashing is the safest approach. The ring structure limits redistribution to neighboring keys:

def add_shard_with_consistent_hashing(router, new_shard_id, virtual_nodes=100):
    """
    Add a new shard to an existing consistent hash ring.
    Only keys in the new shard's range are redistributed.
    """
    # Record current state
    current_distribution = {
        shard: count_keys(router, shard)
        for shard in router.shards
    }

    # Add new shard to ring
    router.add_shard(new_shard_id, virtual_nodes)

    # Verify redistribution is limited to adjacent range
    new_distribution = {
        shard: count_keys(router, shard)
        for shard in router.shards
    }

    # Calculate migration percentage
    total_keys = sum(current_distribution.values())
    migrated = sum(abs(new_distribution.get(s, 0) - current_distribution.get(s, 0))
                   for s in set(new_distribution) | set(current_distribution)) // 2

    migration_pct = migrated / total_keys * 100
    print(f"Migration: {migration_pct:.1f}% of keys moved")

    return migration_pct < 10  # Expect <10% migration with consistent hashing

For MongoDB, adding a shard triggers automatic rebalancing of chunks across available shards. The balancer runs in the background and moves chunks gradually to avoid impacting performance.

For Citus, adding a worker node requires rebalancing distributed tables:

-- Add new worker
SELECT citus_add_node('worker-4.internal', 5432);

-- Rebalance distributed tables (runs in background)
SELECT citus_rebalance_start();

Removing a Shard

Shard removal requires migrating data before decommissioning. Never remove a shard until all data has moved:

def remove_shard(router, shard_to_remove):
    """
    Safely remove a shard by migrating data first.
    """
    # 1. Stop routing new traffic to shard
    router.deactivate_shard(shard_to_remove)

    # 2. Migrate all data
    for key in router.get_keys_for_shard(shard_to_remove):
        new_shard = router.get_shard(key)  # Will route to new owner
        data = router.get_data(shard_to_remove, key)
        new_shard.put(key, data)

    # 3. Verify migration complete
    remaining = router.count_keys(shard_to_remove)
    if remaining > 0:
        raise ValueError(f"Shard still has {remaining} keys")

    # 4. Remove from ring
    router.remove_shard(shard_to_remove)

In MongoDB, use mongosh to migrate chunks off a shard before removing it. Always verify chunk counts and document migration before decommissioning.

Handling Split-Brain Scenarios

Split-brain happens when network partition causes two shards to believe they own the same data. The best approach is prevention:

  1. Use replication factor >= 3: With 3 replicas, a partition minority cannot form a quorum.
  2. Implement fencing tokens: Before writing, acquire a fencing token from a consensus service.
  3. Detect stale writes: Include version vectors or timestamps in records. Reject updates from older versions.
def safe_write(shard, key, value, fencing_token):
    """
    Write with fencing token to prevent split-brain.
    """
    current_token = shard.get_fencing_token(key)

    if fencing_token <= current_token:
        raise SplitBrainError(
            f"Stale write rejected: token {fencing_token} <= {current_token}"
        )

    shard.write_with_token(key, value, fencing_token)

If split-brain is detected, the safest recovery is: stop all writes to affected shards, determine which partition has the most recent data, redirect all traffic to that partition, rebuild the other shard from the primary.

Sharding Tools Landscape

Different tools sit at different layers of the sharding stack. Choose based on how much control you need:

ToolTypeSharding ApproachBest For
VitessMiddleware (MySQL)Vertical sharding + horizontal shardingMySQL workloads needing scale, YouTube-scale deployments
PlanetScaleManaged VitessHorizontal sharding via Vitessteams wanting MySQL compatibility without managing Vitess
CockroachDBDistributed SQLAutomatic range-based shardingGlobal applications needing strong consistency
SpannerDistributed SQLAutomatic sharding, TrueTimeLarge-scale global applications with budget for managed solution
YugabyteDBDistributed SQLHash and range shardingPostgreSQL/Cassandra compatibility with scale
CitusPostgreSQL extensionHash-based shardingTeams on PostgreSQL needing distributed scale
KScaleManaged shardingAutomatic shardingTeams wanting managed sharding without database changes

Vitess was built at YouTube to scale MySQL beyond what a single MySQL instance could handle. It handles connection pooling, query routing, and resharding automatically. The tradeoff is operational complexity. Vitess itself requires expertise to run.

CockroachDB and Spanner both present a single logical database while sharding automatically underneath. The difference is consistency model: CockroachDB uses HLC-based consistency while Spanner uses TrueTime hardware. Spanner is more expensive but handles global writes with less latency due to its TrueTime guarantees.

For most teams, starting with a distributed database like CockroachDB or YugabyteDB eliminates the need for manual sharding entirely. The database handles distribution while you work with normal SQL.

Consistent Hashing

Simple modulo hashing has a problem. When you add or remove shards, almost every key maps to a different shard. This triggers massive data migration.

Consistent hashing solves this. Keys map to points on a hash ring. Each shard owns a range of the ring. Adding a shard only affects its neighboring range.

graph TD
    Ring[Hash Ring] --> P1[Key A: hash=50]
    Ring --> P2[Key B: hash=150]
    Ring --> P3[Key C: hash=250]
    Ring --> S1[Shard 1: 0-100]
    Ring --> S2[Shard 2: 101-200]
    Ring --> S3[Shard 3: 201-255]
    S1 --> P1
    S2 --> P2
    S3 --> P3

With consistent hashing, adding Shard 4 between Shard 1 and Shard 2 only moves keys in the 100-150 range. Most keys stay on their current shard.

Virtual nodes improve distribution further. Each physical shard claims multiple points on the ring. This smooths out uneven data distribution that comes from any single large partition.

Hash-Based vs Range-Based Partitioning

When sharding your data, you choose between hash-based and range-based partitioning strategies. Each has distinct trade-offs:

AspectHash-Based PartitioningRange-Based Partitioning
Data DistributionEvenly distributed across shardsCan become uneven (hot/cold shards)
Shard Key SelectionMust use hash of key, not the key itselfCan use the key directly or a prefix
Range QueriesPoor - requires scatter-gather across all shardsExcellent - sequential access within shard
Adding ShardsOnly affects neighboring range on hash ringMay require significant data movement
Hot Spot RiskLow (even distribution)High (contiguous keys may receive heavy writes)
Write ThroughputHigh and evenly distributedCan bottleneck on hot shards
Use Case FitKey lookups, random access patternsTime-series data, sequential scans
Implementation ComplexityLower (consistent hashing handles redistribution)Higher (need to manage key ranges manually)

When to Use Hash-Based Partitioning:

  • Your access patterns are random (point queries by ID)
  • You need even write distribution across shards
  • You can afford scatter-gather for range queries
  • Example: User data accessed by user_id, order data accessed by order_id

When to Use Range-Based Partitioning:

  • Your queries naturally filter by a sequential key (timestamp, numeric range)
  • You frequently scan data within a range
  • Your data has natural temporal patterns
  • Example: IoT sensor data partitioned by day/month, logs partitioned by time range

Hybrid Approaches:

Some systems combine both approaches. A composite shard key uses a hash of one field to determine shard, then range-based within that shard. This provides even distribution while enabling range queries within a single shard.

import hashlib

class ConsistentHash:
    def __init__(self, num_slots=2**32):
        self.num_slots = num_slots
        self.ring = {}
        self.sorted_keys = []

    def add_shard(self, shard_id, virtual_nodes=100):
        for i in range(virtual_nodes):
            key = self._hash(f"{shard_id}:vn{i}")
            self.ring[key] = shard_id
            self.sorted_keys.append(key)
        self.sorted_keys.sort()

    def get_shard(self, key):
        hash_val = self._hash(key)
        for slot in self.sorted_keys:
            if slot > hash_val:
                return self.ring[slot]
        return self.ring[self.sorted_keys[0]]

    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16) % self.num_slots

Database-Specific Sharding Configuration

Each database handles sharding differently. Understanding the native options helps you decide whether to use built-in sharding or a middleware approach.

MongoDB Sharding

MongoDB provides built-in sharding with automatic chunk distribution:

// Enable sharding on the database
sh.enableSharding("myapp");

// Shard a collection by hashed shard key (for even distribution)
sh.shardCollection("myapp.orders", { order_id: "hashed" });

// Shard by range (when range queries matter)
sh.shardCollection("myapp.events", { timestamp: 1 });

// Check cluster status
sh.status();

// Manually split a chunk to enable migration
db.adminCommand({
  split: "myapp.orders",
  middle: { order_id: NumberLong(500000) },
});

// Move a chunk to a specific shard
db.adminCommand({
  moveChunk: "myapp.orders",
  find: { order_id: NumberLong(500000) },
  to: "shard2",
});

Shard key selection for MongoDB:

Use CaseShard KeyWhy
User datauser_id (hashed)Even distribution, user-specific queries efficient
Time-series eventsdevice_id + timestamp (compound)Query by device range, timestamps within device
E-commerceuser_id + order_id (compound)User orders together, order lookups by user
Multi-tenant SaaStenant_id (hashed)Tenant isolation, even load per tenant

PostgreSQL Citus

Citus extends PostgreSQL for distributed tables. It partitions tables across worker nodes while presenting a single PostgreSQL interface:

-- Install Citus extension on coordinator
CREATE EXTENSION IF NOT EXISTS citus;

-- Add worker nodes
SELECT citus_add_node('worker-1.internal', 5432);
SELECT citus_add_node('worker-2.internal', 5432);
SELECT citus_add_node('worker-3.internal', 5432);

-- Create distributed table (by hash)
CREATE TABLE users (
    user_id BIGSERIAL,
    email TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT now(),
    PRIMARY KEY (user_id)
);
SELECT create_distributed_table('users', 'user_id', shard_count => 16);

-- Create reference table (replicated to all nodes)
CREATE TABLE countries (
    country_code TEXT PRIMARY KEY,
    name TEXT NOT NULL
);
SELECT create_reference_table('countries');

-- Check shard placement
SELECT * FROM citus_shards;
SELECT * FROM citus_shard_placements;

Citus supports co-located tables, where joined data stays on the same shard, and distributed tables that span shards. The create_distributed_table function takes the shard key and number of shards as arguments.

CockroachDB

CockroachDB shards automatically by default. You control data distribution through range splitting and leaseholder placement:

-- Split a table into ranges by primary key
ALTER TABLE users SPLIT AT VALUES ('m'), ('t');

-- Move a range to a specific store
ALTER TABLE users EXPERIMENTAL_RELOCATE VALUES (1, 'store-1');

-- Check range status
SELECT * FROM crdb_internal.ranges_no_leases;

-- Set up locality-aware replication
ALTER TABLE users SET locality_aware_replication = true;

CockroachDB’s KV layer automatically splits ranges when they exceed the target size (default 512 MiB). Forcing specific placement is rarely needed. The optimizer handles most distribution decisions.

When to Use Built-In vs Middleware Sharding

ApproachExamplesBest For
Built-in database shardingMongoDB sharding, CockroachDB, SpannerWhen the database handles distribution natively
Extension-based shardingCitus for PostgreSQL, YugabyteDBWhen you want PostgreSQL compatibility with scale
Middleware shardingVitess, PlanetScaleWhen you need MySQL compatibility with Vitess routing
Application-level shardingCustom shard routerWhen no other option fits and you control all queries

Cross-Shard Queries

Sharding creates query challenges. Queries that filter by shard key hit one shard. Queries that do not must query all shards.

# Single shard query: efficient
user = shards[get_shard(user_id)].query(
    "SELECT * FROM users WHERE id = ?", user_id
)

# Full table query: must check all shards
all_users = []
for shard in shards.values():
    all_users.extend(shard.query("SELECT * FROM users"))

Aggregation across shards is expensive. Count, sum, average require querying all shards and combining results. Real-time analytics across sharded data is slow.

Denormalization helps. Pre-compute aggregations and store them on specific shards. Store user summary data on the user’s shard rather than computing it across shards.

Rebalancing Shards

Data grows unevenly. Some shards accumulate more data than others. You need to rebalance.

Rebalancing means moving data between shards. You add new shards, migrate data, then remove old shards. During migration, both old and new shards must serve traffic.

def rebalance_shard(shards, old_shard_id, new_shard_ids):
    # 1. Add new shards
    for new_id in new_shard_ids:
        shards[new_id] = create_shard(new_id)

    # 2. Migrate data
    old_shard = shards[old_shard_id]
    for record in old_shard.fetch_all():
        new_shard = pick_shard_for(record)
        new_shard.insert(record)

    # 3. Update routing
    update_routing_table()

    # 4. Remove old shard
    delete_shard(old_shard_id)

The challenge is maintaining availability during rebalancing. Applications continue writing while data migrates. Dual-write during transition or careful sequencing minimizes inconsistencies.

When to Shard

Sharding is complexity. Only shard when simpler options fail.

Shard when you hit write limits that vertical scaling cannot address. If your database CPU maxes out at write throughput, adding replicas does not help. Writes go to primary.

Shard when data size exceeds single server capacity. If you need terabytes and a single server cannot hold it, distribute across shards.

Do not shard for read scale. Read replicas handle read scale. Sharding complicates queries. Use replicas first.

Most applications should not shard. Start with vertical scaling. Add replicas. Implement caching. These handle millions of requests per second. Sharding is for when you have exhausted all other options.

When to Use and When Not to Use Horizontal Sharding

When to Use Horizontal Sharding:

  • Write throughput exceeds single primary server capacity after vertical scaling
  • Data size exceeds single server storage capacity
  • You have clear, stable access patterns that align with a shard key
  • Your team has operational expertise for distributed databases
  • You have exhausted vertical scaling and read replicas

When Not to Use Horizontal Sharding:

  • Read replicas can solve your performance problem
  • Your access patterns do not naturally partition by a shard key
  • You need ad-hoc queries across all data
  • Your team lacks experience with distributed database operations
  • You can solve the problem with caching or optimization

Production Failure Scenarios

FailureImpactMitigation
Shard key hotspotUneven load, some shards overloadedChoose high-cardinality keys, use consistent hashing with virtual nodes
Shard router failureAll queries failImplement router redundancy, use multiple routers
Cross-shard transaction failurePartial data updates, inconsistencyAvoid cross-shard transactions, use saga pattern for multi-shard operations
Shard addition triggers reshuffleTemporary performance degradationUse consistent hashing, add capacity in larger increments
Shard hardware failureData unavailable, potential data lossMaintain replication factor >= 2 per shard, regular backups
Rebalance in progressMigrating data temporarily unavailableImplement dual-write during transition, graceful failover
Cross-shard query timeoutAggregations and joins failSet appropriate timeouts, pre-aggregate data, use materialized views
Stale routing tableQueries routed to wrong shardImplement cache invalidation for routing table, version routing metadata

Observability Checklist

Metrics to Monitor:

  • Request latency by shard (detect hotspots)
  • Request distribution across shards (detect skew)
  • Shard storage utilization and growth rate
  • Replication status per shard
  • Rebalancing progress and impact
  • Cross-shard query frequency and latency
  • Shard router health and request queue
  • Connection pool utilization per shard

Logs to Capture:

  • Shard key distribution analysis
  • Cross-shard query patterns (for optimization opportunities)
  • Rebalancing events and progress
  • Shard failures and failover events
  • Routing table updates
  • Slow query logs with shard identification

Alerts to Set:

  • Shard storage > 80% capacity
  • Request distribution skew > 2x between shards
  • Replication lag per shard
  • Cross-shard query latency spike
  • Router failure or unavailability
  • Rebalance progress stalled
  • Connection pool exhaustion per shard
# Example: Shard health monitoring
def check_shard_health(shards):
    health = {}
    for shard_id, shard in shards.items():
        metrics = shard.get_metrics()
        health[shard_id] = {
            'latency_p99': metrics['latency_p99'],
            'storage_used_pct': metrics['storage_used'] / metrics['storage_total'],
            'connections': metrics['active_connections'],
            'replication_lag': metrics.get('replication_lag', 0)
        }
    return health

Security Checklist

  • Implement authentication at both router and shard level
  • Use TLS encryption for all router-to-shard communication
  • Apply least privilege for router database users
  • Audit log all cross-shard operations
  • Encrypt data at rest on each shard
  • Implement network segmentation between shards
  • Secure shard-to-shard replication channels
  • Regular security reviews of routing logic
  • Implement access controls for rebalancing operations
  • Backup encryption keys separately from shard data

Common Pitfalls and Anti-Patterns

  1. Choosing a poor shard key: The wrong shard key creates hotspots and uneven distribution. Test distribution before production with realistic data.

  2. Underestimating cross-shard query cost: Queries across shards are slow and expensive. Design access patterns to minimize them before sharding.

  3. Rebalancing without planning: Adding shards without a rebalance strategy causes chaos. Use consistent hashing to minimize reshuffling.

  4. Sharding too early: Sharding adds significant operational complexity. Exhaust vertical scaling and caching first.

  5. Ignoring multi-shard transactions: Distributed transactions are complex and slow. Design your schema to avoid them.

  6. Not planning for shard growth: Some shards will accumulate more data. Monitor growth and plan for split before hitting limits.

  7. Losing sight of total dataset: With data distributed, it’s easy to lose track of total size and growth. Monitor cluster-wide metrics.

  8. Assuming linear scalability: Adding a shard does not double capacity. Rebalancing, router overhead, and cross-shard queries limit gains.

Quick Recap

Key Bullets:

  • Sharding distributes writes across multiple servers using a shard key
  • Choose high-cardinality shard keys that align with your access patterns
  • Consistent hashing minimizes data movement when adding shards
  • Cross-shard queries are expensive; design to minimize them
  • Sharding should be a last resort after exhausting simpler options
  • Monitor shard distribution and rebalance before hotspots become problems

Copy/Paste Checklist:

# Consistent hashing for shard selection
import hashlib

class ShardRouter:
    def __init__(self, shards, virtual_nodes=100):
        self.ring = {}
        self.shards = shards
        for shard in shards:
            for i in range(virtual_nodes):
                key = int(hashlib.md5(f"{shard}:vn{i}".encode()).hexdigest(), 16)
                self.ring[key] = shard
        self.sorted_keys = sorted(self.ring.keys())

    def get_shard(self, key):
        hash_val = int(hashlib.md5(str(key).encode()).hexdigest(), 16) % (2**32)
        for k in self.sorted_keys:
            if k >= hash_val:
                return self.ring[k]
        return self.ring[self.sorted_keys[0]]

# Usage
router = ShardRouter(['shard-0', 'shard-1', 'shard-2', 'shard-3'])
shard = router.get_shard(user_id)

Capacity Estimation: Shard Count and Key Distribution

Sharding starts with estimating how many shards you need. The formula depends on your write throughput and storage requirements.

For write throughput: if your primary database handles 10,000 writes per second and each shard can handle 2,000 writes per second, you need at minimum 5 shards. Account for growth: design for 2-3x your current write volume. For 10,000 writes per second with 2x growth headroom, you need shards capable of 20,000 writes per second — at 2,000 per shard, that is 10 shards.

For storage: if your dataset is 2TB and each shard server has 1TB usable storage (with replication factor 2), you need at minimum 4 shards for storage alone. With replication factor 3, you need 6 shards.

The shard key distribution formula: use the coefficient of variation (CV) of shard sizes to detect hot spots. If your shard sizes have a CV above 0.3 (meaning any shard is more than 30% larger or smaller than average), you have a distribution problem. The fix is either to rebalance or to change your shard key.

For consistent hashing with virtual nodes, the minimum virtual nodes per shard should be 100-200 to ensure even distribution. With fewer virtual nodes, the hash ring has visible gaps and adding/removing shards causes noticeable redistribution. With 150 virtual nodes per shard and 6 shards, you have 900 points on the ring — fine-grained enough that adding one shard moves roughly 1/7 of keys.

Real-World Case Study: Instagram’s Sharding Evolution

Instagram’s user database started on a single PostgreSQL server. As they grew, they added read replicas, then started partitioning. Their sharding evolution went through three generations.

First generation was pure UUID-based sharding: user_id determined the shard via modulo. Simple but inflexible — adding shards required changing the modulo divisor, which meant remapping every user to a new shard. They managed this with a migration that ran for months while serving live traffic.

Second generation moved to consistent hashing with virtual nodes. Adding a shard now moved only the neighboring keys instead of remapping all users. But they still had a problem: their shard key was user_id alone, and Instagram’s user distribution was not uniform. Some users were more active than others, and some users had relationships with much larger audiences, creating write hotspots for specific shards.

Third generation introduced application-level sharding: the application layer decided not just which shard a user’s data lived on, but also cached user data aggressively and used a fanout-on-read model for celebrity activity feeds. Write amplification for celebrity posts was managed by separating celebrity posts from regular user posts — celebrity posts were sharded differently (by post_id) than regular posts (by user_id).

The lesson: Instagram’s biggest challenge was not distributing data evenly but managing the cross-shard queries that their feed and notification systems required. They solved this with denormalization — pre-computing follower feeds and storing them close to the users who needed them. The shard key was not just a storage decision, it was an access pattern decision.

Interview Questions

Q: You are designing a sharding strategy for a multi-tenant SaaS application. Each tenant has a different data volume — some have millions of rows, others have hundreds. What shard key do you choose and why?

Use tenant_id as the shard key if tenants are roughly equal in size and you frequently query by tenant. If tenant sizes are very uneven (some tenants have 10,000x more data than others), tenant_id alone creates hot spots for large tenants — those shards become overloaded while others sit idle. For uneven tenants, consider sharding by tenant_id plus a sub-key like entity_type or using hash partitioning on a composite of tenant_id and entity_id. An alternative for very large tenants is to give them dedicated shards, but this adds operational complexity.

Q: Your application uses consistent hashing with 8 shards. You need to add 4 more shards to handle growth. How much data migration happens?

With consistent hashing and a properly distributed hash ring, adding 4 shards to an 8-shard ring moves roughly 1/12 of existing keys to new shards (each new shard claims about 1/12 of the ring space, and each displacement affects keys at the boundary). More precisely, with virtual nodes each new shard takes roughly 1/(old_shard_count + new_shard_count) of the total key space, so you expect about 33% of keys to migrate when doubling from 8 to 12 shards. Without consistent hashing (plain modulo), 100% of keys would migrate.

Q: What happens to your cross-shard JOIN queries when one shard is significantly slower than others?

The slow shard becomes the bottleneck for the entire query. In a scatter-gather query across 8 shards, if one shard has a disk I/O problem and takes 5 seconds while others take 100ms, the entire query takes 5 seconds — the query cannot return until the slowest shard responds. The fix is to implement per-shard timeouts and either fail the query or return partial results with a warning. Alternatively, detect slow shards via health monitoring and route queries away from them while the issue is investigated.

Q: When would you use a distributed SQL database like CockroachDB over manual sharding with PostgreSQL?

Use CockroachDB when you want the scalability benefits of sharding but without the operational complexity of managing shard distribution yourself. CockroachDB handles automatic resharding, rebalancing, and failover transparently. You write normal SQL and the database handles distributing data across nodes. The tradeoff is performance overhead — CockroachDB’s distributed transactions have higher latency than single-node PostgreSQL, and you pay a premium for the strong consistency guarantees. If your team lacks the expertise to manage manual sharding and you can tolerate higher latency for distributed operations, CockroachDB is a reasonable choice.

Conclusion

Sharding distributes writes across multiple servers. A good shard key evenly distributes load and allows efficient queries. Consistent hashing minimizes reshuffling when adding shards.

Cross-shard queries are expensive. Design your schema and access patterns to minimize them. Rebalancing is complex and requires careful planning.

Sharding should be a last resort. The engineering cost is significant. Only shard when you have exhausted simpler scaling approaches.

For related reading, see Database Scaling for broader scaling strategies, and Consistent Hashing for details on the hashing approach used in distributed systems.

Category

Related Posts

Asynchronous Replication: Speed and Availability at Scale

Learn how asynchronous replication works in distributed databases, including eventual consistency implications, lag monitoring, and practical use cases where speed outweighs strict consistency.

#distributed-systems #replication #eventual-consistency

Amazon DynamoDB: Scalable NoSQL with Predictable Performance

Deep dive into Amazon DynamoDB architecture, partitioned tables, eventual consistency, on-demand capacity, and the single-digit millisecond SLA.

#distributed-systems #databases #amazon

Apache Cassandra: Distributed Column Store Built for Scale

Explore Apache Cassandra's peer-to-peer architecture, CQL query language, tunable consistency, compaction strategies, and use cases at scale.

#distributed-systems #databases #cassandra