Distributed Caching: Multi-Node Cache Clusters
Scale caching across multiple nodes. Learn about cache clusters, consistency models, session stores, and cache coherence patterns.
Distributed Caching
A single cache instance only gets you so far. At some point, your cache needs more memory than one machine can provide, or you need to handle more requests than one node can serve, or you need high availability so a cache node failure does not take down your application.
That is when you move to distributed caching. But distributed caching introduces problems that do not exist in single-node setups.
Why Distribute?
Single-node caching works until it doesn’t.
| Limitation | What Happens |
|---|---|
| Memory | Your 16GB cache node fills up. You need 64GB. |
| Requests | Your cache serves 100k requests/second. You need 500k. |
| Availability | Your cache node crashes. Every request hits the database. |
Distributed caching solves all three. But it introduces coordination problems.
Multi-Node Architectures
Client-Side Sharding
The client decides which cache node to use based on the key. No proxy needed.
graph LR
A[Client] --> B{Which node?}
B -->|key1| C[Node 1]
B -->|key2| D[Node 2]
B -->|key3| E[Node 3]
import hashlib
from bisect import bisect
class ShardedCache:
def __init__(self, nodes):
self.nodes = nodes
self.ring = {}
self.sorted_keys = []
self._build_ring()
def _build_ring(self):
for node in self.nodes:
for i in range(150):
key = f"{node}:{i}"
hash_key = int(hashlib.md5(key.encode()).hexdigest(), 16)
self.ring[hash_key] = node
self.sorted_keys.append(hash_key)
self.sorted_keys.sort()
def _get_node(self, key):
if not self.ring:
return None
hash_key = int(hashlib.md5(key.encode()).hexdigest(), 16)
idx = bisect(self.sorted_keys, hash_key)
if idx >= len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def get(self, key):
node = self._get_node(key)
return node.cache.get(key)
def set(self, key, value):
node = self._get_node(key)
node.cache.set(key, value)
Pros: No proxy layer, low latency, simple for small clusters. Cons: Client manages sharding, adding nodes requires moving data.
Proxy-Based Routing
A proxy (like Twemproxy or Redis Cluster client support) routes requests. Clients don’t know about the cluster topology.
graph LR
A[Client] --> B[Proxy]
B --> C[Node 1]
B --> D[Node 2]
B --> E[Node 3]
Clients send all requests to the proxy. The proxy handles sharding logic. This is how managed services like Redis Enterprise and ElastiCache work.
Pros: Client simplicity, topology hidden from clients. Cons: Extra hop (proxy), proxy is a SPOF unlessHA), more latency.
Redis Cluster
Redis Cluster is Redis’s native distributed mode. It handles sharding, replication, and failover automatically.
# Minimum Redis Cluster setup (6 nodes = 3 primaries + 3 replicas)
redis-cli --cluster create \
192.168.1.1:7000 \
192.168.1.2:7000 \
192.168.1.3:7000 \
192.168.1.4:7000 \
192.168.1.5:7000 \
192.168.1.6:7000 \
--cluster-replicas 1
import redis
# Redis Cluster client handles topology automatically
r = redis.RedisCluster(
host='192.168.1.1',
port=7000,
skip_full_coverage_check=True
)
# Works the same as single Redis
r.set('key', 'value')
r.get('key')
Redis Cluster splits keys across 16,384 slots. Each primary node owns a subset of slots. If a primary fails, its replica promotes automatically.
Cache Coherence
When you have multiple cache nodes, coherence becomes a problem. If you update data in Node 1, how do you invalidate that same data in Node 2?
The Problem
graph TD
A[Write to DB] --> B[Update Node 1]
B -.->|Async| C[Node 2 has stale data]
D[Read from Node 2] --> E[Stale data returned]
Solutions
1. Write-Through All Nodes
On write, update all cache nodes.
def write_to_all_nodes(key, value, nodes):
for node in nodes:
node.set(key, value)
# Problem: More write latency, potential inconsistency if one fails
This works but is slow and fragile. One slow node drags everything down.
2. Invalidation Broadcasting
On write, broadcast invalidation to all nodes.
def invalidate_across_cluster(key, nodes, pubsub):
# Publish invalidation event
pubsub.publish('cache:invalidate', key)
# Subscribe on each node
for node in nodes:
node.delete(key)
# Subscribe to invalidation
pubsub.subscribe('cache:invalidate')
Redis pub/sub is useful here. When data changes, publish an invalidation message. All cache instances subscribe and delete the key.
# On write
redis_cluster.publish('invalidation', json.dumps({'key': 'user:123'}))
# On each cache node
pubsub = redis_cluster.pubsub()
pubsub.subscribe('invalidation')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
cache.delete(data['key'])
3. Consistent Hashing with Virtual Nodes
Use consistent hashing so keys map to the same nodes reliably. Adding a node only moves some keys.
class VirtualNodeSharding:
def __init__(self, nodes, replicas=150):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
for node in nodes:
for i in range(replicas):
# Virtual node for better distribution
key = f"{node}:vnodes:{i}"
hash_key = self._hash(key)
self.ring[hash_key] = node
self.sorted_keys.append(hash_key)
self.sorted_keys.sort()
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def get_node(self, key):
hash_key = self._hash(key)
idx = bisect(self.sorted_keys, hash_key)
if idx >= len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
Session Store Patterns
Sessions are a common distributed caching use case. User sessions need to be available across all application instances.
Redis Session Store
import redis
import json
import secrets
class RedisSessionStore:
def __init__(self, redis_cluster, ttl=86400):
self.redis = redis_cluster
self.ttl = ttl
def create_session(self, user_id, data=None):
session_id = secrets.token_urlsafe(32)
session_key = f"session:{session_id}"
session_data = {
'user_id': user_id,
'created_at': time.time(),
'data': data or {}
}
self.redis.setex(
session_key,
self.ttl,
json.dumps(session_data)
)
return session_id
def get_session(self, session_id):
session_key = f"session:{session_id}"
data = self.redis.get(session_key)
if not data:
return None
return json.loads(data)
def update_session(self, session_id, data):
session_key = f"session:{session_id}"
session = self.get_session(session_id)
if not session:
return False
session['data'].update(data)
session['updated_at'] = time.time()
self.redis.setex(
session_key,
self.ttl,
json.dumps(session)
)
return True
def destroy_session(self, session_id):
session_key = f"session:{session_id}"
self.redis.delete(session_key)
Session affinity vs distributed sessions
If your application runs on multiple instances, sessions need to be shared. Two options:
-
Sticky sessions: Route each user to the same instance. Simpler, but instance failure loses sessions.
-
Distributed sessions: Sessions stored in Redis, available to all instances. More resilient, slightly slower.
For anything critical, use distributed sessions. Sticky sessions will ruin your day when an instance goes down.
High Availability Patterns
Replication
Add replica nodes that copy data from the primary. Reads can go to replicas, spreading load.
# Redis replication config (on replica)
replicaof 192.168.1.1 7000
replica-read-only yes
# Read from replica, write to primary
def get_from_replica(key):
return replica_redis.get(key)
def set_to_primary(key, value):
return primary_redis.setex(key, ttl, value)
def get_cached(key):
# Try primary first (has freshest data)
val = primary_redis.get(key)
if val:
return val
# Fall back to replica
return replica_redis.get(key)
Automatic Failover
Redis Sentinel handles failover automatically. When a primary fails, Sentinel promotes a replica and updates clients.
# Sentinel configuration
sentinel monitor mymaster 192.168.1.1 7000 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
from redis.sentinel import Sentinel
sentinel = Sentinel([
('192.168.1.1', 26379),
('192.168.1.2', 26379),
('192.168.1.3', 26379)
], socket_timeout=0.1)
# Get master and replica connections
master = sentinel.master_for('mymaster')
replica = sentinel.slave_for('mymaster')
# Automatic failover handling
master.set('key', 'value') # Writes always go to master
replica.get('key') # Reads can go to replica
Monitoring Distributed Cache
You need visibility into your distributed cache. Key metrics:
# Check cluster health
def check_cluster_health(cluster):
health = {
'nodes': [],
'total_keys': 0,
'total_memory': 0,
'hit_rate': 0
}
for node in cluster.get_nodes():
info = node.info()
health['nodes'].append({
'host': node.host,
'role': node.role,
'connected': node.connected
})
health['total_keys'] += info.get('keys', 0)
health['total_memory'] += info.get('used_memory', 0)
return health
# Alert thresholds
ALERT_THRESHOLDS = {
'memory_usage_percent': 80, # Alert if >80% memory used
'hit_rate_percent': 80, # Alert if hit rate <80%
'connected_clients': 10000, # Alert if >10k clients
'replication_lag_ms': 100 # Alert if replica lag >100ms
}
When to Use / When Not to Use
| Architecture | When to Use | When Not to Use |
|---|---|---|
| Client-Side Sharding | Small to medium clusters (<10 nodes); low latency critical; team comfortable with client logic | Large clusters; multi-language environments; teams want simplicity |
| Proxy-Based Routing | Multi-client support; topology hidden; managed service environments | Extra hop latency unacceptable; proxy is single point of failure |
| Redis Cluster | Large datasets (>10GB); need automatic failover; want native sharding | Small datasets; need strict ordering; transaction-heavy workloads |
| Write-Through All Nodes | Small clusters; strong consistency required; write volume manageable | Large clusters; high write volume; eventual consistency acceptable |
| Pub/Sub Invalidation | Multi-region; eventual consistency acceptable; need real-time invalidation | Strict consistency required; high invalidation frequency |
| Consistent Hashing | Frequent node additions/removals; want minimal key remapping | Fixed cluster size; need strict key distribution |
| Sticky Sessions | Simple deployments; session loss acceptable on failure; cost-sensitive | High availability required; sessions must survive instance failures |
| Distributed Sessions | HA critical; multi-instance deployments; sessions must persist | Simple single-instance apps; session loss tolerable |
Decision Guide
graph TD
A[Scale Need] --> B{Memory > Single Node?}
B -->|Yes| C{Need HA?}
B -->|No| D[Single Node Cache]
C -->|Yes| E[Redis Cluster or Replicated Setup]
C -->|No| F{Client Simplicity?}
F -->|Yes| G[Proxy-Based Routing]
F -->|No| H[Client-Side Sharding]
E --> I{Need Automatic Failover?}
I -->|Yes| J[Sentinel or Redis Cluster]
I -->|No| K[Manual Replica Promotion]
Capacity Estimation: Cluster Memory Sizing and Slot Distribution
Planning a distributed cache cluster requires understanding how data and load distribute across nodes.
Cluster memory sizing formula:
total_cache_memory = sum(memory_per_node × number_of_nodes)
effective_capacity = total_cache_memory × (1 - replication_overhead) × (1 - fragmentation_factor)
usable_capacity = effective_capacity × 0.7 # Keep 30% headroom for spikes
For a Redis Cluster with 3 masters × 16GB each and 1 replica per master (total 6 nodes):
- Total raw memory: 48GB
- Replication overhead: 33% (1 replica per master): 48GB × 0.67 = 32GB effective
- Fragmentation factor: typically 1.1-1.5× depending on allocator: 32GB / 1.2 = 26.7GB
- Usable at 70%: ~18.7GB for cached data
Slot distribution in Redis Cluster: 16,384 slots divided across N masters. With 3 masters: each owns ~5,461 slots. When adding nodes, Redis Cluster migrates slots in units — one slot at a time. The cluster slots command shows current ownership. Plan slot assignments in advance using redis-cli --cluster import or manual cluster setslot commands during maintenance windows.
Hot key detection: Even with consistent hashing, hot keys cause uneven load. The formula for identifying hot keys:
hot_key_probability = (requests_per_key / total_requests) × number_of_nodes
If 1% of keys receive 50% of traffic on a 10-node cluster, those keys are 5× hotter than average. Mitigation: replicate hot keys to all nodes and route randomly, use key spreading (split user:123 into user:123:shard0 … user:123:shard9), or move hot key handling to a dedicated single-node Redis with replica read scaling.
Memcached cluster write capacity: With N nodes in a consistent hash ring, each write goes to one node. Write throughput = single_node_throughput × N (assuming even distribution). If one Memcached node handles 200k ops/sec, a 10-node cluster handles ~2M ops/sec total, but any single key’s write rate is still bound by one node.
Real-World Case Study: Redis Cluster Failure at Scale
A major e-commerce platform ran a 24-node Redis Cluster (12 masters, 12 replicas) for session storage and cart data. During a peak traffic event, a network switch failed causing a brief partition between 4 nodes and the rest of the cluster.
The cluster’s quorum-based failover correctly detected that the 4 isolated nodes could not form a majority and did not promote themselves — correct behavior. But the remaining 20 nodes experienced a cascade: the primary for one of the hot key slots was among the isolated nodes. The replica for that slot was in the majority partition. Failover promoted the replica to primary.
The problem: the application was using a slot-aware client that cached slot-to-node mappings. When failover changed slot ownership, the client’s cached mapping became stale. Some application instances still routed requests for the affected slot to the demoted node (now a replica). The replica rejected writes, and those application instances had no mechanism to refresh the slot map — they kept retrying the same failed node.
The recovery took 8 minutes. The fix: application code was updated to handle MOVED errors by refreshing the slot map and retrying, and the slot refresh timeout was reduced from 60 seconds to 5 seconds. Monitoring was added for MOVED error rates per application instance.
The lesson: cluster failover works, but your application’s cluster client must handle MOVED redirects correctly. A slot-aware client that does not honor MOVED creates a partial failure that bypasses cluster resilience. Test failover explicitly — trigger manual failover and observe whether your application recovers cleanly.
Production Failure Scenarios
| Failure | Impact | Mitigation |
|---|---|---|
| Single cache node crashes | Keys on that node missed; potential database overload | Replica promotion; circuit breaker; replica reads during failover |
| Network partition | Cluster split-brain or unavailable | Redis Cluster handles automatically; Sentinel monitors |
| Proxy failure (Twemproxy) | All cache requests fail | Run multiple proxies with load balancer; keep-alive health checks |
| Shard rebalancing | Temporary key remapping; some misses during migration | Use stable cluster configuration; avoid frequent resharding |
| Invalidation broadcast lost | Stale data on some nodes | Implement periodic reconciliation; use version vectors |
| Replica lag | Stale reads from replicas | Monitor lag; route consistency-critical reads to primary |
| Sentinel election failure | No automatic failover possible | Run 3+ Sentinels; ensure proper quorum configuration |
Observability Checklist
Metrics to Track
Cluster-Level:
cluster_nodes- Number of nodes, their states, and rolescluster_slots_fail- Number of slots not covered by reachable nodescluster_known_nodes- Total known nodes in cluster
Node-Level:
connected_slaves- Number of replicas connected to each mastermaster_repl_offset- Replication positionslave_repl_offset- Replica lag calculation
# Cluster health check
def check_cluster_health(cluster):
health = {
'healthy': True,
'nodes': {},
'issues': []
}
for node in cluster.nodes:
info = node.info()
health['nodes'][node.name] = {
'role': info.get('role'),
'connected': info.get('connected') == 'true',
'memory': info.get('used_memory'),
}
if info.get('role') == 'master' and int(info.get('connected_slaves', 0)) == 0:
health['issues'].append(f"Master {node.name} has no replicas")
health['healthy'] = len(health['issues']) == 0
return health
# Monitor replication lag
def check_replication_lag(primary, replica):
primary_offset = primary.info()['master_repl_offset']
replica_info = replica.info()
replica_offset = replica_info.get('master_repl_offset', 0)
lag_bytes = primary_offset - replica_offset
# Rough estimate: 1MB = ~1 second at typical network speeds
lag_ms = (lag_bytes / 1024 / 1024) * 1000
if lag_ms > 100:
logger.warning("replication_lag_high",
lag_ms=lag_ms,
lag_bytes=lag_bytes)
Logs to Capture
logger = structlog.get_logger()
# Log cluster topology changes
def log_cluster_event(event_type, node_id, old_state, new_state):
logger.warning("cluster_topology_change",
event_type=event_type,
node_id=node_id,
old_state=old_state,
new_state=new_state,
timestamp=time.time())
# Log failover events
def log_failover(primary_id, new_primary_id):
logger.critical("cache_failover_completed",
old_primary=primary_id,
new_primary=new_primary_id,
timestamp=time.time())
Alert Rules
- alert: CacheClusterNodeDown
expr: cache_cluster_connected_nodes < expected_nodes
for: 1m
labels:
severity: critical
annotations:
summary: "Cache cluster node(s) unavailable"
- alert: CacheReplicationLag
expr: cache_replication_lag_bytes > 10485760 # 10MB
for: 5m
labels:
severity: warning
annotations:
summary: "Cache replication lag exceeds 10MB"
- alert: CacheClusterSlotFail
expr: cache_cluster_slots_fail > 0
for: 30s
labels:
severity: critical
annotations:
summary: "Cache cluster has unreachable slots"
Security Checklist
- Node authentication - Use Redis ACLs or cluster authentication
- Inter-node encryption - Encrypt traffic between cluster nodes
- Network isolation - Cluster nodes should be on private network
- Key prefix namespacing - Prevent collisions in shared cluster
- Monitor cluster events - Watch for unauthorized topology changes
- Secure Sentinel - Sentinel instances should require authentication
- Backup cluster state - Regularly backup cluster configuration
- Validate node identity - Verify node certificates if using TLS
# Redis Cluster secure configuration
# On each node:
requirepass your-cluster-auth
cluster-preferred-endpoint-type ipv4
tls-cluster yes
# In redis.conf:
# Bind to internal network only
bind 10.0.1.1 -::1
# ACL for application user
user appuser on >apppassword ~app:* on >appdb ~* +get +set +del +exists -flushall -flushdb
Common Pitfalls / Anti-Patterns
1. Assuming Hashing Is Evenly Distributed
Poor hash functions or small keyspaces cause hot spots.
# BAD: Modulo-based sharding creates hot spots
def get_node(key, num_nodes):
hash_val = hash(key)
return hash_val % num_nodes # Uneven for small keyspaces
# GOOD: Use consistent hashing with virtual nodes
class ConsistentHashSharding:
def __init__(self, nodes, replicas=150):
self.ring = {}
for node in nodes:
for i in range(replicas):
key = f"{node}:{i}"
self.ring[md5(key)] = node
2. Not Handling Node Failure Gracefully
Assuming all nodes always available leads to cascading failures.
# BAD: No failure handling
def get(key):
return nodes[key % len(nodes)].get(key) # Crashes if node down
# GOOD: Retry on different node, circuit breaker
def get_with_fallback(key, max_retries=3):
for attempt in range(max_retries):
try:
node = get_node_for_key(key)
return node.get(key)
except ConnectionError:
logger.warning("node_connection_failed", node=node, attempt=attempt)
# Circuit breaker would prevent repeated attempts
return get_from_database_fallback(key)
3. Pub/Sub Invalidation Without Guarantees
Pub/Sub is fire-and-forget; messages can be lost.
# BAD: Assuming pub/sub invalidation always works
def on_data_update(key, value):
db.update(key, value)
redis_cluster.publish('invalidation', key) # Fire and forget
# GOOD: Verify invalidation, periodic reconciliation
def on_data_update(key, value):
db.update(key, value)
# Publish invalidation
redis_cluster.publish('invalidation', key)
# Also delete locally to handle the common case
redis_cluster.delete(key)
# For strict consistency: check all nodes periodically
schedule_reconciliation(key)
4. Ignoring Slot Migration Complexity
Adding nodes to Redis Cluster requires migrating slots.
graph LR
A[Before Migration] --> B[During Migration<br/>Slot 123 split]
B --> C[Node1 owns 0-5460]
B --> D[Node2 owns 5461-10922<br/>+ Slot 123 migrating]
C --> E[After Migration<br/>Node1 owns 0-5460]
D --> E
# During slot migration:
# 1. Cluster must be in stable state (no other migrations)
# 2. Target node must accept IMPORTING slot
# 3. Source node must accept MIGRATING slot
# 4. Keys must be moved one by one
# 5. Slot ownership transferred after all keys moved
# Don't add nodes during high traffic - migration causes temporary unavailability
5. Single Sentinel for HA
Sentinel needs quorum to elect a new master.
# BAD: Single Sentinel
# If Sentinel crashes, no failover possible
# GOOD: 3+ Sentinels with proper quorum
sentinel monitor mymaster 192.168.1.1 7000 2 # Quorum of 2
sentinel monitor mymaster 192.168.1.2 7000 2 # On different machines
sentinel monitor mymaster 192.168.1.3 7000 2 # Quorum = 2 of 3
Quick Recap
Key Bullets
- Distributed caching solves memory, throughput, and availability limits of single node
- Client-side sharding gives lowest latency but requires client logic
- Proxy-based routing simplifies clients but adds hop and potential SPOF
- Redis Cluster provides automatic sharding, replication, and failover
- Cache coherence across nodes requires explicit strategy (pub/sub, write-all, etc.)
- Session stores are a common distributed cache use case - distributed sessions beat sticky sessions
- Sentinel handles failover for Redis masters; Redis Cluster handles it natively
Copy/Paste Checklist
# Redis Cluster minimum setup (6 nodes)
redis-cli --cluster create \
192.168.1.1:7000 \
192.168.1.2:7000 \
192.168.1.3:7000 \
192.168.1.4:7000 \
192.168.1.5:7000 \
192.168.1.6:7000 \
--cluster-replicas 1
# Sentinel configuration (3 nodes)
sentinel monitor mymaster 192.168.1.1 7000 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
# Pub/Sub invalidation subscription
redis-cli SUBSCRIBE cache:invalidate
# Message format: {"key": "user:123", "action": "delete"}
# Cluster health check
redis-cli cluster info
redis-cli cluster nodes | grep master
# Python Redis Cluster
r = redis.RedisCluster(
host='192.168.1.1',
port=7000,
skip_full_coverage_check=True,
read_from_replicas=True # Read from replicas for scale
)
# Deployment checklist:
# [ ] Minimum 3 masters + replicas for HA
# [ ] Sentinel quorum = (sentinels / 2) + 1
# [ ] Monitor replication lag on all replicas
# [ ] Test failover manually before going to production
# [ ] Implement application-level retry logic for cluster operations
# [ ] Use slot-aware client for efficient routing
# [ ] Set up alerts for cluster node failures
See Also
- Caching Strategies — The strategies that power distributed caches
- Cache Eviction Policies — Eviction across distributed nodes
- Redis & Memcached — Redis Cluster and Memcached distributed modes
- CAP Theorem — Consistency trade-offs in distributed caching
Conclusion
Distributed caching solves scale and availability problems but introduces complexity. Start with a single node, add replication for read scaling, move to clustering when you need more memory or write throughput.
The coherence problem is real. Pick a strategy that matches your consistency requirements. Eventual consistency via pub/sub invalidation works for most use cases. Strong consistency is expensive.
Monitor everything. In distributed systems, you cannot eyeball whether the cache is healthy.
Category
Related Posts
Caching Strategies: Cache-Aside, Write-Through, and More
Master five caching strategies for production systems. Learn cache-aside vs write-through, avoid cache stampede, and scale with these patterns.
Cache Stampede Prevention: Protecting Your Cache
Learn how single-flight, request coalescing, and probabilistic early expiration prevent cache stampedes that can overwhelm your database.
Microservices vs Monolith: Choosing the Right Architecture
Understand the fundamental differences between monolithic and microservices architectures, their trade-offs, and how to decide which approach fits your project.