Horizontal Sharding: Distribution Strategies for Scale
Learn database sharding strategies including shard key selection, consistent hashing, cross-shard queries, and operational procedures for distributed data.
Horizontal Sharding: Distribution Strategies for Massive Datasets
Sharding splits your data across multiple database servers. Each shard holds a subset of your total data. When done well, you can scale writes linearly by adding more shards.
The complexity cost is high. Sharding introduces challenges around query routing, data distribution, and cross-shard operations. Most applications reach for sharding too early. Vertical scaling and read replicas handle impressive workloads.
I have seen teams spend months implementing sharding when a larger server would have sufficed. The engineering cost is real. Only shard when you have exhausted simpler options.
Why Shard
Databases have natural limits. A single server fits only so much data and handles only so many writes. When you exceed these limits, you distribute the load across multiple servers.
Sharding works when your data naturally partitions. User data partitions by user ID. Order data partitions by order ID. The shard key determines which server stores each record.
graph LR
Users[Users Table] -->|user_id % 4| Shard0[(Shard 0)]
Users -->|user_id % 4| Shard1[(Shard 1)]
Users -->|user_id % 4| Shard2[(Shard 2)]
Users -->|user_id % 4| Shard3[(Shard 3)]
This approach assumes data access follows the partition key. A user record is accessed using the user ID. Orders for that user are accessed using order ID plus user ID. As long as queries include the shard key, routing stays simple.
Choosing a Shard Key
The shard key determines data distribution. A bad key creates hot spots and uneven load. The ideal key distributes writes evenly and allows efficient queries.
User ID or account ID often works well. These are high-cardinality, evenly distributed values. Queries naturally filter by user ID.
Time-based keys are common for event data. But time creates hot spots. New data all writes to the current time partition. Historical data becomes cold.
# Good: user ID distributes writes evenly
shard_key = user_id
shard = hash(user_id) % num_shards
# Bad: timestamps cluster writes
shard_key = created_at
shard = hash(created_at) % num_shards # Recent timestamps hit same shard
Composite keys can help. User ID plus entity type distributes writes but keeps related data together. This helps when you often query user data for a specific entity type.
Shard Key Selection by Use Case
Choose your shard key based on the dominant access pattern. The key must appear in most queries to avoid scatter-gather across all shards:
| Use Case | Recommended Shard Key | Alternative Key | Avoid | Reason |
|---|---|---|---|---|
| User data / accounts | user_id (hash) | email | created_at | Even distribution, user queries are primary access |
| E-commerce orders | user_id (hash) + order_id | order_id alone | created_at | User orders clustered, range by date needs separate index |
| IoT sensor data | device_id (hash) | device_id + timestamp | timestamp alone | Devices are independent, timestamp alone clusters writes |
| Multi-tenant SaaS | tenant_id (hash) | tenant_id + entity_id | Shared keys across tenants | Tenant isolation, each tenant’s data separate |
| Time-series metrics | metric_name + time_bucket | time_bucket alone | timestamp alone | Write load distributed by metric, range queries within metric |
| Social feeds | user_id (hash) | post_id | created_at | Feed queries by user, posts distributed evenly |
| Financial transactions | account_id (hash) | transaction_id | timestamp alone | Account lookups primary, ordering within account |
| Log aggregation | source_id (hash) | source_id + timestamp | timestamp alone | Sources are independent, logs queried by source |
Decision rules:
- Identify your most frequent query pattern (point lookup, range scan, aggregation)
- Find a field that appears in >80% of queries
- Check cardinality — must be high enough to distribute writes evenly
- Test distribution with production-like data before going live
Operational Procedures
Adding a Shard
Adding a shard with consistent hashing is the safest approach. The ring structure limits redistribution to neighboring keys:
def add_shard_with_consistent_hashing(router, new_shard_id, virtual_nodes=100):
"""
Add a new shard to an existing consistent hash ring.
Only keys in the new shard's range are redistributed.
"""
# Record current state
current_distribution = {
shard: count_keys(router, shard)
for shard in router.shards
}
# Add new shard to ring
router.add_shard(new_shard_id, virtual_nodes)
# Verify redistribution is limited to adjacent range
new_distribution = {
shard: count_keys(router, shard)
for shard in router.shards
}
# Calculate migration percentage
total_keys = sum(current_distribution.values())
migrated = sum(abs(new_distribution.get(s, 0) - current_distribution.get(s, 0))
for s in set(new_distribution) | set(current_distribution)) // 2
migration_pct = migrated / total_keys * 100
print(f"Migration: {migration_pct:.1f}% of keys moved")
return migration_pct < 10 # Expect <10% migration with consistent hashing
For MongoDB, adding a shard triggers automatic rebalancing of chunks across available shards. The balancer runs in the background and moves chunks gradually to avoid impacting performance.
For Citus, adding a worker node requires rebalancing distributed tables:
-- Add new worker
SELECT citus_add_node('worker-4.internal', 5432);
-- Rebalance distributed tables (runs in background)
SELECT citus_rebalance_start();
Removing a Shard
Shard removal requires migrating data before decommissioning. Never remove a shard until all data has moved:
def remove_shard(router, shard_to_remove):
"""
Safely remove a shard by migrating data first.
"""
# 1. Stop routing new traffic to shard
router.deactivate_shard(shard_to_remove)
# 2. Migrate all data
for key in router.get_keys_for_shard(shard_to_remove):
new_shard = router.get_shard(key) # Will route to new owner
data = router.get_data(shard_to_remove, key)
new_shard.put(key, data)
# 3. Verify migration complete
remaining = router.count_keys(shard_to_remove)
if remaining > 0:
raise ValueError(f"Shard still has {remaining} keys")
# 4. Remove from ring
router.remove_shard(shard_to_remove)
In MongoDB, use mongosh to migrate chunks off a shard before removing it. Always verify chunk counts and document migration before decommissioning.
Handling Split-Brain Scenarios
Split-brain happens when network partition causes two shards to believe they own the same data. The best approach is prevention:
- Use replication factor >= 3: With 3 replicas, a partition minority cannot form a quorum.
- Implement fencing tokens: Before writing, acquire a fencing token from a consensus service.
- Detect stale writes: Include version vectors or timestamps in records. Reject updates from older versions.
def safe_write(shard, key, value, fencing_token):
"""
Write with fencing token to prevent split-brain.
"""
current_token = shard.get_fencing_token(key)
if fencing_token <= current_token:
raise SplitBrainError(
f"Stale write rejected: token {fencing_token} <= {current_token}"
)
shard.write_with_token(key, value, fencing_token)
If split-brain is detected, the safest recovery is: stop all writes to affected shards, determine which partition has the most recent data, redirect all traffic to that partition, rebuild the other shard from the primary.
Sharding Tools Landscape
Different tools sit at different layers of the sharding stack. Choose based on how much control you need:
| Tool | Type | Sharding Approach | Best For |
|---|---|---|---|
| Vitess | Middleware (MySQL) | Vertical sharding + horizontal sharding | MySQL workloads needing scale, YouTube-scale deployments |
| PlanetScale | Managed Vitess | Horizontal sharding via Vitess | teams wanting MySQL compatibility without managing Vitess |
| CockroachDB | Distributed SQL | Automatic range-based sharding | Global applications needing strong consistency |
| Spanner | Distributed SQL | Automatic sharding, TrueTime | Large-scale global applications with budget for managed solution |
| YugabyteDB | Distributed SQL | Hash and range sharding | PostgreSQL/Cassandra compatibility with scale |
| Citus | PostgreSQL extension | Hash-based sharding | Teams on PostgreSQL needing distributed scale |
| KScale | Managed sharding | Automatic sharding | Teams wanting managed sharding without database changes |
Vitess was built at YouTube to scale MySQL beyond what a single MySQL instance could handle. It handles connection pooling, query routing, and resharding automatically. The tradeoff is operational complexity. Vitess itself requires expertise to run.
CockroachDB and Spanner both present a single logical database while sharding automatically underneath. The difference is consistency model: CockroachDB uses HLC-based consistency while Spanner uses TrueTime hardware. Spanner is more expensive but handles global writes with less latency due to its TrueTime guarantees.
For most teams, starting with a distributed database like CockroachDB or YugabyteDB eliminates the need for manual sharding entirely. The database handles distribution while you work with normal SQL.
Consistent Hashing
Simple modulo hashing has a problem. When you add or remove shards, almost every key maps to a different shard. This triggers massive data migration.
Consistent hashing solves this. Keys map to points on a hash ring. Each shard owns a range of the ring. Adding a shard only affects its neighboring range.
graph TD
Ring[Hash Ring] --> P1[Key A: hash=50]
Ring --> P2[Key B: hash=150]
Ring --> P3[Key C: hash=250]
Ring --> S1[Shard 1: 0-100]
Ring --> S2[Shard 2: 101-200]
Ring --> S3[Shard 3: 201-255]
S1 --> P1
S2 --> P2
S3 --> P3
With consistent hashing, adding Shard 4 between Shard 1 and Shard 2 only moves keys in the 100-150 range. Most keys stay on their current shard.
Virtual nodes improve distribution further. Each physical shard claims multiple points on the ring. This smooths out uneven data distribution that comes from any single large partition.
Hash-Based vs Range-Based Partitioning
When sharding your data, you choose between hash-based and range-based partitioning strategies. Each has distinct trade-offs:
| Aspect | Hash-Based Partitioning | Range-Based Partitioning |
|---|---|---|
| Data Distribution | Evenly distributed across shards | Can become uneven (hot/cold shards) |
| Shard Key Selection | Must use hash of key, not the key itself | Can use the key directly or a prefix |
| Range Queries | Poor - requires scatter-gather across all shards | Excellent - sequential access within shard |
| Adding Shards | Only affects neighboring range on hash ring | May require significant data movement |
| Hot Spot Risk | Low (even distribution) | High (contiguous keys may receive heavy writes) |
| Write Throughput | High and evenly distributed | Can bottleneck on hot shards |
| Use Case Fit | Key lookups, random access patterns | Time-series data, sequential scans |
| Implementation Complexity | Lower (consistent hashing handles redistribution) | Higher (need to manage key ranges manually) |
When to Use Hash-Based Partitioning:
- Your access patterns are random (point queries by ID)
- You need even write distribution across shards
- You can afford scatter-gather for range queries
- Example: User data accessed by user_id, order data accessed by order_id
When to Use Range-Based Partitioning:
- Your queries naturally filter by a sequential key (timestamp, numeric range)
- You frequently scan data within a range
- Your data has natural temporal patterns
- Example: IoT sensor data partitioned by day/month, logs partitioned by time range
Hybrid Approaches:
Some systems combine both approaches. A composite shard key uses a hash of one field to determine shard, then range-based within that shard. This provides even distribution while enabling range queries within a single shard.
import hashlib
class ConsistentHash:
def __init__(self, num_slots=2**32):
self.num_slots = num_slots
self.ring = {}
self.sorted_keys = []
def add_shard(self, shard_id, virtual_nodes=100):
for i in range(virtual_nodes):
key = self._hash(f"{shard_id}:vn{i}")
self.ring[key] = shard_id
self.sorted_keys.append(key)
self.sorted_keys.sort()
def get_shard(self, key):
hash_val = self._hash(key)
for slot in self.sorted_keys:
if slot > hash_val:
return self.ring[slot]
return self.ring[self.sorted_keys[0]]
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16) % self.num_slots
Database-Specific Sharding Configuration
Each database handles sharding differently. Understanding the native options helps you decide whether to use built-in sharding or a middleware approach.
MongoDB Sharding
MongoDB provides built-in sharding with automatic chunk distribution:
// Enable sharding on the database
sh.enableSharding("myapp");
// Shard a collection by hashed shard key (for even distribution)
sh.shardCollection("myapp.orders", { order_id: "hashed" });
// Shard by range (when range queries matter)
sh.shardCollection("myapp.events", { timestamp: 1 });
// Check cluster status
sh.status();
// Manually split a chunk to enable migration
db.adminCommand({
split: "myapp.orders",
middle: { order_id: NumberLong(500000) },
});
// Move a chunk to a specific shard
db.adminCommand({
moveChunk: "myapp.orders",
find: { order_id: NumberLong(500000) },
to: "shard2",
});
Shard key selection for MongoDB:
| Use Case | Shard Key | Why |
|---|---|---|
| User data | user_id (hashed) | Even distribution, user-specific queries efficient |
| Time-series events | device_id + timestamp (compound) | Query by device range, timestamps within device |
| E-commerce | user_id + order_id (compound) | User orders together, order lookups by user |
| Multi-tenant SaaS | tenant_id (hashed) | Tenant isolation, even load per tenant |
PostgreSQL Citus
Citus extends PostgreSQL for distributed tables. It partitions tables across worker nodes while presenting a single PostgreSQL interface:
-- Install Citus extension on coordinator
CREATE EXTENSION IF NOT EXISTS citus;
-- Add worker nodes
SELECT citus_add_node('worker-1.internal', 5432);
SELECT citus_add_node('worker-2.internal', 5432);
SELECT citus_add_node('worker-3.internal', 5432);
-- Create distributed table (by hash)
CREATE TABLE users (
user_id BIGSERIAL,
email TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (user_id)
);
SELECT create_distributed_table('users', 'user_id', shard_count => 16);
-- Create reference table (replicated to all nodes)
CREATE TABLE countries (
country_code TEXT PRIMARY KEY,
name TEXT NOT NULL
);
SELECT create_reference_table('countries');
-- Check shard placement
SELECT * FROM citus_shards;
SELECT * FROM citus_shard_placements;
Citus supports co-located tables, where joined data stays on the same shard, and distributed tables that span shards. The create_distributed_table function takes the shard key and number of shards as arguments.
CockroachDB
CockroachDB shards automatically by default. You control data distribution through range splitting and leaseholder placement:
-- Split a table into ranges by primary key
ALTER TABLE users SPLIT AT VALUES ('m'), ('t');
-- Move a range to a specific store
ALTER TABLE users EXPERIMENTAL_RELOCATE VALUES (1, 'store-1');
-- Check range status
SELECT * FROM crdb_internal.ranges_no_leases;
-- Set up locality-aware replication
ALTER TABLE users SET locality_aware_replication = true;
CockroachDB’s KV layer automatically splits ranges when they exceed the target size (default 512 MiB). Forcing specific placement is rarely needed. The optimizer handles most distribution decisions.
When to Use Built-In vs Middleware Sharding
| Approach | Examples | Best For |
|---|---|---|
| Built-in database sharding | MongoDB sharding, CockroachDB, Spanner | When the database handles distribution natively |
| Extension-based sharding | Citus for PostgreSQL, YugabyteDB | When you want PostgreSQL compatibility with scale |
| Middleware sharding | Vitess, PlanetScale | When you need MySQL compatibility with Vitess routing |
| Application-level sharding | Custom shard router | When no other option fits and you control all queries |
Cross-Shard Queries
Sharding creates query challenges. Queries that filter by shard key hit one shard. Queries that do not must query all shards.
# Single shard query: efficient
user = shards[get_shard(user_id)].query(
"SELECT * FROM users WHERE id = ?", user_id
)
# Full table query: must check all shards
all_users = []
for shard in shards.values():
all_users.extend(shard.query("SELECT * FROM users"))
Aggregation across shards is expensive. Count, sum, average require querying all shards and combining results. Real-time analytics across sharded data is slow.
Denormalization helps. Pre-compute aggregations and store them on specific shards. Store user summary data on the user’s shard rather than computing it across shards.
Rebalancing Shards
Data grows unevenly. Some shards accumulate more data than others. You need to rebalance.
Rebalancing means moving data between shards. You add new shards, migrate data, then remove old shards. During migration, both old and new shards must serve traffic.
def rebalance_shard(shards, old_shard_id, new_shard_ids):
# 1. Add new shards
for new_id in new_shard_ids:
shards[new_id] = create_shard(new_id)
# 2. Migrate data
old_shard = shards[old_shard_id]
for record in old_shard.fetch_all():
new_shard = pick_shard_for(record)
new_shard.insert(record)
# 3. Update routing
update_routing_table()
# 4. Remove old shard
delete_shard(old_shard_id)
The challenge is maintaining availability during rebalancing. Applications continue writing while data migrates. Dual-write during transition or careful sequencing minimizes inconsistencies.
When to Shard
Sharding is complexity. Only shard when simpler options fail.
Shard when you hit write limits that vertical scaling cannot address. If your database CPU maxes out at write throughput, adding replicas does not help. Writes go to primary.
Shard when data size exceeds single server capacity. If you need terabytes and a single server cannot hold it, distribute across shards.
Do not shard for read scale. Read replicas handle read scale. Sharding complicates queries. Use replicas first.
Most applications should not shard. Start with vertical scaling. Add replicas. Implement caching. These handle millions of requests per second. Sharding is for when you have exhausted all other options.
When to Use and When Not to Use Horizontal Sharding
When to Use Horizontal Sharding:
- Write throughput exceeds single primary server capacity after vertical scaling
- Data size exceeds single server storage capacity
- You have clear, stable access patterns that align with a shard key
- Your team has operational expertise for distributed databases
- You have exhausted vertical scaling and read replicas
When Not to Use Horizontal Sharding:
- Read replicas can solve your performance problem
- Your access patterns do not naturally partition by a shard key
- You need ad-hoc queries across all data
- Your team lacks experience with distributed database operations
- You can solve the problem with caching or optimization
Production Failure Scenarios
| Failure | Impact | Mitigation |
|---|---|---|
| Shard key hotspot | Uneven load, some shards overloaded | Choose high-cardinality keys, use consistent hashing with virtual nodes |
| Shard router failure | All queries fail | Implement router redundancy, use multiple routers |
| Cross-shard transaction failure | Partial data updates, inconsistency | Avoid cross-shard transactions, use saga pattern for multi-shard operations |
| Shard addition triggers reshuffle | Temporary performance degradation | Use consistent hashing, add capacity in larger increments |
| Shard hardware failure | Data unavailable, potential data loss | Maintain replication factor >= 2 per shard, regular backups |
| Rebalance in progress | Migrating data temporarily unavailable | Implement dual-write during transition, graceful failover |
| Cross-shard query timeout | Aggregations and joins fail | Set appropriate timeouts, pre-aggregate data, use materialized views |
| Stale routing table | Queries routed to wrong shard | Implement cache invalidation for routing table, version routing metadata |
Observability Checklist
Metrics to Monitor:
- Request latency by shard (detect hotspots)
- Request distribution across shards (detect skew)
- Shard storage utilization and growth rate
- Replication status per shard
- Rebalancing progress and impact
- Cross-shard query frequency and latency
- Shard router health and request queue
- Connection pool utilization per shard
Logs to Capture:
- Shard key distribution analysis
- Cross-shard query patterns (for optimization opportunities)
- Rebalancing events and progress
- Shard failures and failover events
- Routing table updates
- Slow query logs with shard identification
Alerts to Set:
- Shard storage > 80% capacity
- Request distribution skew > 2x between shards
- Replication lag per shard
- Cross-shard query latency spike
- Router failure or unavailability
- Rebalance progress stalled
- Connection pool exhaustion per shard
# Example: Shard health monitoring
def check_shard_health(shards):
health = {}
for shard_id, shard in shards.items():
metrics = shard.get_metrics()
health[shard_id] = {
'latency_p99': metrics['latency_p99'],
'storage_used_pct': metrics['storage_used'] / metrics['storage_total'],
'connections': metrics['active_connections'],
'replication_lag': metrics.get('replication_lag', 0)
}
return health
Security Checklist
- Implement authentication at both router and shard level
- Use TLS encryption for all router-to-shard communication
- Apply least privilege for router database users
- Audit log all cross-shard operations
- Encrypt data at rest on each shard
- Implement network segmentation between shards
- Secure shard-to-shard replication channels
- Regular security reviews of routing logic
- Implement access controls for rebalancing operations
- Backup encryption keys separately from shard data
Common Pitfalls and Anti-Patterns
-
Choosing a poor shard key: The wrong shard key creates hotspots and uneven distribution. Test distribution before production with realistic data.
-
Underestimating cross-shard query cost: Queries across shards are slow and expensive. Design access patterns to minimize them before sharding.
-
Rebalancing without planning: Adding shards without a rebalance strategy causes chaos. Use consistent hashing to minimize reshuffling.
-
Sharding too early: Sharding adds significant operational complexity. Exhaust vertical scaling and caching first.
-
Ignoring multi-shard transactions: Distributed transactions are complex and slow. Design your schema to avoid them.
-
Not planning for shard growth: Some shards will accumulate more data. Monitor growth and plan for split before hitting limits.
-
Losing sight of total dataset: With data distributed, it’s easy to lose track of total size and growth. Monitor cluster-wide metrics.
-
Assuming linear scalability: Adding a shard does not double capacity. Rebalancing, router overhead, and cross-shard queries limit gains.
Quick Recap
Key Bullets:
- Sharding distributes writes across multiple servers using a shard key
- Choose high-cardinality shard keys that align with your access patterns
- Consistent hashing minimizes data movement when adding shards
- Cross-shard queries are expensive; design to minimize them
- Sharding should be a last resort after exhausting simpler options
- Monitor shard distribution and rebalance before hotspots become problems
Copy/Paste Checklist:
# Consistent hashing for shard selection
import hashlib
class ShardRouter:
def __init__(self, shards, virtual_nodes=100):
self.ring = {}
self.shards = shards
for shard in shards:
for i in range(virtual_nodes):
key = int(hashlib.md5(f"{shard}:vn{i}".encode()).hexdigest(), 16)
self.ring[key] = shard
self.sorted_keys = sorted(self.ring.keys())
def get_shard(self, key):
hash_val = int(hashlib.md5(str(key).encode()).hexdigest(), 16) % (2**32)
for k in self.sorted_keys:
if k >= hash_val:
return self.ring[k]
return self.ring[self.sorted_keys[0]]
# Usage
router = ShardRouter(['shard-0', 'shard-1', 'shard-2', 'shard-3'])
shard = router.get_shard(user_id)
Capacity Estimation: Shard Count and Key Distribution
Sharding starts with estimating how many shards you need. The formula depends on your write throughput and storage requirements.
For write throughput: if your primary database handles 10,000 writes per second and each shard can handle 2,000 writes per second, you need at minimum 5 shards. Account for growth: design for 2-3x your current write volume. For 10,000 writes per second with 2x growth headroom, you need shards capable of 20,000 writes per second — at 2,000 per shard, that is 10 shards.
For storage: if your dataset is 2TB and each shard server has 1TB usable storage (with replication factor 2), you need at minimum 4 shards for storage alone. With replication factor 3, you need 6 shards.
The shard key distribution formula: use the coefficient of variation (CV) of shard sizes to detect hot spots. If your shard sizes have a CV above 0.3 (meaning any shard is more than 30% larger or smaller than average), you have a distribution problem. The fix is either to rebalance or to change your shard key.
For consistent hashing with virtual nodes, the minimum virtual nodes per shard should be 100-200 to ensure even distribution. With fewer virtual nodes, the hash ring has visible gaps and adding/removing shards causes noticeable redistribution. With 150 virtual nodes per shard and 6 shards, you have 900 points on the ring — fine-grained enough that adding one shard moves roughly 1/7 of keys.
Real-World Case Study: Instagram’s Sharding Evolution
Instagram’s user database started on a single PostgreSQL server. As they grew, they added read replicas, then started partitioning. Their sharding evolution went through three generations.
First generation was pure UUID-based sharding: user_id determined the shard via modulo. Simple but inflexible — adding shards required changing the modulo divisor, which meant remapping every user to a new shard. They managed this with a migration that ran for months while serving live traffic.
Second generation moved to consistent hashing with virtual nodes. Adding a shard now moved only the neighboring keys instead of remapping all users. But they still had a problem: their shard key was user_id alone, and Instagram’s user distribution was not uniform. Some users were more active than others, and some users had relationships with much larger audiences, creating write hotspots for specific shards.
Third generation introduced application-level sharding: the application layer decided not just which shard a user’s data lived on, but also cached user data aggressively and used a fanout-on-read model for celebrity activity feeds. Write amplification for celebrity posts was managed by separating celebrity posts from regular user posts — celebrity posts were sharded differently (by post_id) than regular posts (by user_id).
The lesson: Instagram’s biggest challenge was not distributing data evenly but managing the cross-shard queries that their feed and notification systems required. They solved this with denormalization — pre-computing follower feeds and storing them close to the users who needed them. The shard key was not just a storage decision, it was an access pattern decision.
Interview Questions
Q: You are designing a sharding strategy for a multi-tenant SaaS application. Each tenant has a different data volume — some have millions of rows, others have hundreds. What shard key do you choose and why?
Use tenant_id as the shard key if tenants are roughly equal in size and you frequently query by tenant. If tenant sizes are very uneven (some tenants have 10,000x more data than others), tenant_id alone creates hot spots for large tenants — those shards become overloaded while others sit idle. For uneven tenants, consider sharding by tenant_id plus a sub-key like entity_type or using hash partitioning on a composite of tenant_id and entity_id. An alternative for very large tenants is to give them dedicated shards, but this adds operational complexity.
Q: Your application uses consistent hashing with 8 shards. You need to add 4 more shards to handle growth. How much data migration happens?
With consistent hashing and a properly distributed hash ring, adding 4 shards to an 8-shard ring moves roughly 1/12 of existing keys to new shards (each new shard claims about 1/12 of the ring space, and each displacement affects keys at the boundary). More precisely, with virtual nodes each new shard takes roughly 1/(old_shard_count + new_shard_count) of the total key space, so you expect about 33% of keys to migrate when doubling from 8 to 12 shards. Without consistent hashing (plain modulo), 100% of keys would migrate.
Q: What happens to your cross-shard JOIN queries when one shard is significantly slower than others?
The slow shard becomes the bottleneck for the entire query. In a scatter-gather query across 8 shards, if one shard has a disk I/O problem and takes 5 seconds while others take 100ms, the entire query takes 5 seconds — the query cannot return until the slowest shard responds. The fix is to implement per-shard timeouts and either fail the query or return partial results with a warning. Alternatively, detect slow shards via health monitoring and route queries away from them while the issue is investigated.
Q: When would you use a distributed SQL database like CockroachDB over manual sharding with PostgreSQL?
Use CockroachDB when you want the scalability benefits of sharding but without the operational complexity of managing shard distribution yourself. CockroachDB handles automatic resharding, rebalancing, and failover transparently. You write normal SQL and the database handles distributing data across nodes. The tradeoff is performance overhead — CockroachDB’s distributed transactions have higher latency than single-node PostgreSQL, and you pay a premium for the strong consistency guarantees. If your team lacks the expertise to manage manual sharding and you can tolerate higher latency for distributed operations, CockroachDB is a reasonable choice.
Conclusion
Sharding distributes writes across multiple servers. A good shard key evenly distributes load and allows efficient queries. Consistent hashing minimizes reshuffling when adding shards.
Cross-shard queries are expensive. Design your schema and access patterns to minimize them. Rebalancing is complex and requires careful planning.
Sharding should be a last resort. The engineering cost is significant. Only shard when you have exhausted simpler scaling approaches.
For related reading, see Database Scaling for broader scaling strategies, and Consistent Hashing for details on the hashing approach used in distributed systems.
Category
Related Posts
Asynchronous Replication: Speed and Availability at Scale
Learn how asynchronous replication works in distributed databases, including eventual consistency implications, lag monitoring, and practical use cases where speed outweighs strict consistency.
Amazon DynamoDB: Scalable NoSQL with Predictable Performance
Deep dive into Amazon DynamoDB architecture, partitioned tables, eventual consistency, on-demand capacity, and the single-digit millisecond SLA.
Apache Cassandra: Distributed Column Store Built for Scale
Explore Apache Cassandra's peer-to-peer architecture, CQL query language, tunable consistency, compaction strategies, and use cases at scale.