Distributed Caching: Multi-Node Cache Clusters

Scale caching across multiple nodes. Learn about cache clusters, consistency models, session stores, and cache coherence patterns.

published: reading time: 17 min read

Distributed Caching

A single cache instance only gets you so far. At some point, your cache needs more memory than one machine can provide, or you need to handle more requests than one node can serve, or you need high availability so a cache node failure does not take down your application.

That is when you move to distributed caching. But distributed caching introduces problems that do not exist in single-node setups.


Why Distribute?

Single-node caching works until it doesn’t.

LimitationWhat Happens
MemoryYour 16GB cache node fills up. You need 64GB.
RequestsYour cache serves 100k requests/second. You need 500k.
AvailabilityYour cache node crashes. Every request hits the database.

Distributed caching solves all three. But it introduces coordination problems.


Multi-Node Architectures

Client-Side Sharding

The client decides which cache node to use based on the key. No proxy needed.

graph LR
    A[Client] --> B{Which node?}
    B -->|key1| C[Node 1]
    B -->|key2| D[Node 2]
    B -->|key3| E[Node 3]
import hashlib
from bisect import bisect

class ShardedCache:
    def __init__(self, nodes):
        self.nodes = nodes
        self.ring = {}
        self.sorted_keys = []
        self._build_ring()

    def _build_ring(self):
        for node in self.nodes:
            for i in range(150):
                key = f"{node}:{i}"
                hash_key = int(hashlib.md5(key.encode()).hexdigest(), 16)
                self.ring[hash_key] = node
                self.sorted_keys.append(hash_key)
        self.sorted_keys.sort()

    def _get_node(self, key):
        if not self.ring:
            return None
        hash_key = int(hashlib.md5(key.encode()).hexdigest(), 16)
        idx = bisect(self.sorted_keys, hash_key)
        if idx >= len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

    def get(self, key):
        node = self._get_node(key)
        return node.cache.get(key)

    def set(self, key, value):
        node = self._get_node(key)
        node.cache.set(key, value)

Pros: No proxy layer, low latency, simple for small clusters. Cons: Client manages sharding, adding nodes requires moving data.

Proxy-Based Routing

A proxy (like Twemproxy or Redis Cluster client support) routes requests. Clients don’t know about the cluster topology.

graph LR
    A[Client] --> B[Proxy]
    B --> C[Node 1]
    B --> D[Node 2]
    B --> E[Node 3]

Clients send all requests to the proxy. The proxy handles sharding logic. This is how managed services like Redis Enterprise and ElastiCache work.

Pros: Client simplicity, topology hidden from clients. Cons: Extra hop (proxy), proxy is a SPOF unlessHA), more latency.

Redis Cluster

Redis Cluster is Redis’s native distributed mode. It handles sharding, replication, and failover automatically.

# Minimum Redis Cluster setup (6 nodes = 3 primaries + 3 replicas)
redis-cli --cluster create \
  192.168.1.1:7000 \
  192.168.1.2:7000 \
  192.168.1.3:7000 \
  192.168.1.4:7000 \
  192.168.1.5:7000 \
  192.168.1.6:7000 \
  --cluster-replicas 1
import redis

# Redis Cluster client handles topology automatically
r = redis.RedisCluster(
    host='192.168.1.1',
    port=7000,
    skip_full_coverage_check=True
)

# Works the same as single Redis
r.set('key', 'value')
r.get('key')

Redis Cluster splits keys across 16,384 slots. Each primary node owns a subset of slots. If a primary fails, its replica promotes automatically.


Cache Coherence

When you have multiple cache nodes, coherence becomes a problem. If you update data in Node 1, how do you invalidate that same data in Node 2?

The Problem

graph TD
    A[Write to DB] --> B[Update Node 1]
    B -.->|Async| C[Node 2 has stale data]
    D[Read from Node 2] --> E[Stale data returned]

Solutions

1. Write-Through All Nodes

On write, update all cache nodes.

def write_to_all_nodes(key, value, nodes):
    for node in nodes:
        node.set(key, value)

# Problem: More write latency, potential inconsistency if one fails

This works but is slow and fragile. One slow node drags everything down.

2. Invalidation Broadcasting

On write, broadcast invalidation to all nodes.

def invalidate_across_cluster(key, nodes, pubsub):
    # Publish invalidation event
    pubsub.publish('cache:invalidate', key)

    # Subscribe on each node
    for node in nodes:
        node.delete(key)

# Subscribe to invalidation
pubsub.subscribe('cache:invalidate')

Redis pub/sub is useful here. When data changes, publish an invalidation message. All cache instances subscribe and delete the key.

# On write
redis_cluster.publish('invalidation', json.dumps({'key': 'user:123'}))

# On each cache node
pubsub = redis_cluster.pubsub()
pubsub.subscribe('invalidation')

for message in pubsub.listen():
    if message['type'] == 'message':
        data = json.loads(message['data'])
        cache.delete(data['key'])

3. Consistent Hashing with Virtual Nodes

Use consistent hashing so keys map to the same nodes reliably. Adding a node only moves some keys.

class VirtualNodeSharding:
    def __init__(self, nodes, replicas=150):
        self.replicas = replicas
        self.ring = {}
        self.sorted_keys = []

        for node in nodes:
            for i in range(replicas):
                # Virtual node for better distribution
                key = f"{node}:vnodes:{i}"
                hash_key = self._hash(key)
                self.ring[hash_key] = node
                self.sorted_keys.append(hash_key)

        self.sorted_keys.sort()

    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def get_node(self, key):
        hash_key = self._hash(key)
        idx = bisect(self.sorted_keys, hash_key)
        if idx >= len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

Session Store Patterns

Sessions are a common distributed caching use case. User sessions need to be available across all application instances.

Redis Session Store

import redis
import json
import secrets

class RedisSessionStore:
    def __init__(self, redis_cluster, ttl=86400):
        self.redis = redis_cluster
        self.ttl = ttl

    def create_session(self, user_id, data=None):
        session_id = secrets.token_urlsafe(32)
        session_key = f"session:{session_id}"

        session_data = {
            'user_id': user_id,
            'created_at': time.time(),
            'data': data or {}
        }

        self.redis.setex(
            session_key,
            self.ttl,
            json.dumps(session_data)
        )

        return session_id

    def get_session(self, session_id):
        session_key = f"session:{session_id}"
        data = self.redis.get(session_key)

        if not data:
            return None

        return json.loads(data)

    def update_session(self, session_id, data):
        session_key = f"session:{session_id}"
        session = self.get_session(session_id)

        if not session:
            return False

        session['data'].update(data)
        session['updated_at'] = time.time()

        self.redis.setex(
            session_key,
            self.ttl,
            json.dumps(session)
        )

        return True

    def destroy_session(self, session_id):
        session_key = f"session:{session_id}"
        self.redis.delete(session_key)

Session affinity vs distributed sessions

If your application runs on multiple instances, sessions need to be shared. Two options:

  1. Sticky sessions: Route each user to the same instance. Simpler, but instance failure loses sessions.

  2. Distributed sessions: Sessions stored in Redis, available to all instances. More resilient, slightly slower.

For anything critical, use distributed sessions. Sticky sessions will ruin your day when an instance goes down.


High Availability Patterns

Replication

Add replica nodes that copy data from the primary. Reads can go to replicas, spreading load.

# Redis replication config (on replica)
replicaof 192.168.1.1 7000
replica-read-only yes
# Read from replica, write to primary
def get_from_replica(key):
    return replica_redis.get(key)

def set_to_primary(key, value):
    return primary_redis.setex(key, ttl, value)

def get_cached(key):
    # Try primary first (has freshest data)
    val = primary_redis.get(key)
    if val:
        return val

    # Fall back to replica
    return replica_redis.get(key)

Automatic Failover

Redis Sentinel handles failover automatically. When a primary fails, Sentinel promotes a replica and updates clients.

# Sentinel configuration
sentinel monitor mymaster 192.168.1.1 7000 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
from redis.sentinel import Sentinel

sentinel = Sentinel([
    ('192.168.1.1', 26379),
    ('192.168.1.2', 26379),
    ('192.168.1.3', 26379)
], socket_timeout=0.1)

# Get master and replica connections
master = sentinel.master_for('mymaster')
replica = sentinel.slave_for('mymaster')

# Automatic failover handling
master.set('key', 'value')  # Writes always go to master
replica.get('key')           # Reads can go to replica

Monitoring Distributed Cache

You need visibility into your distributed cache. Key metrics:

# Check cluster health
def check_cluster_health(cluster):
    health = {
        'nodes': [],
        'total_keys': 0,
        'total_memory': 0,
        'hit_rate': 0
    }

    for node in cluster.get_nodes():
        info = node.info()
        health['nodes'].append({
            'host': node.host,
            'role': node.role,
            'connected': node.connected
        })
        health['total_keys'] += info.get('keys', 0)
        health['total_memory'] += info.get('used_memory', 0)

    return health

# Alert thresholds
ALERT_THRESHOLDS = {
    'memory_usage_percent': 80,  # Alert if >80% memory used
    'hit_rate_percent': 80,     # Alert if hit rate <80%
    'connected_clients': 10000, # Alert if >10k clients
    'replication_lag_ms': 100  # Alert if replica lag >100ms
}

When to Use / When Not to Use

ArchitectureWhen to UseWhen Not to Use
Client-Side ShardingSmall to medium clusters (<10 nodes); low latency critical; team comfortable with client logicLarge clusters; multi-language environments; teams want simplicity
Proxy-Based RoutingMulti-client support; topology hidden; managed service environmentsExtra hop latency unacceptable; proxy is single point of failure
Redis ClusterLarge datasets (>10GB); need automatic failover; want native shardingSmall datasets; need strict ordering; transaction-heavy workloads
Write-Through All NodesSmall clusters; strong consistency required; write volume manageableLarge clusters; high write volume; eventual consistency acceptable
Pub/Sub InvalidationMulti-region; eventual consistency acceptable; need real-time invalidationStrict consistency required; high invalidation frequency
Consistent HashingFrequent node additions/removals; want minimal key remappingFixed cluster size; need strict key distribution
Sticky SessionsSimple deployments; session loss acceptable on failure; cost-sensitiveHigh availability required; sessions must survive instance failures
Distributed SessionsHA critical; multi-instance deployments; sessions must persistSimple single-instance apps; session loss tolerable

Decision Guide

graph TD
    A[Scale Need] --> B{Memory > Single Node?}
    B -->|Yes| C{Need HA?}
    B -->|No| D[Single Node Cache]
    C -->|Yes| E[Redis Cluster or Replicated Setup]
    C -->|No| F{Client Simplicity?}
    F -->|Yes| G[Proxy-Based Routing]
    F -->|No| H[Client-Side Sharding]
    E --> I{Need Automatic Failover?}
    I -->|Yes| J[Sentinel or Redis Cluster]
    I -->|No| K[Manual Replica Promotion]

Capacity Estimation: Cluster Memory Sizing and Slot Distribution

Planning a distributed cache cluster requires understanding how data and load distribute across nodes.

Cluster memory sizing formula:

total_cache_memory = sum(memory_per_node × number_of_nodes)
effective_capacity = total_cache_memory × (1 - replication_overhead) × (1 - fragmentation_factor)
usable_capacity = effective_capacity × 0.7  # Keep 30% headroom for spikes

For a Redis Cluster with 3 masters × 16GB each and 1 replica per master (total 6 nodes):

  • Total raw memory: 48GB
  • Replication overhead: 33% (1 replica per master): 48GB × 0.67 = 32GB effective
  • Fragmentation factor: typically 1.1-1.5× depending on allocator: 32GB / 1.2 = 26.7GB
  • Usable at 70%: ~18.7GB for cached data

Slot distribution in Redis Cluster: 16,384 slots divided across N masters. With 3 masters: each owns ~5,461 slots. When adding nodes, Redis Cluster migrates slots in units — one slot at a time. The cluster slots command shows current ownership. Plan slot assignments in advance using redis-cli --cluster import or manual cluster setslot commands during maintenance windows.

Hot key detection: Even with consistent hashing, hot keys cause uneven load. The formula for identifying hot keys:

hot_key_probability = (requests_per_key / total_requests) × number_of_nodes

If 1% of keys receive 50% of traffic on a 10-node cluster, those keys are 5× hotter than average. Mitigation: replicate hot keys to all nodes and route randomly, use key spreading (split user:123 into user:123:shard0user:123:shard9), or move hot key handling to a dedicated single-node Redis with replica read scaling.

Memcached cluster write capacity: With N nodes in a consistent hash ring, each write goes to one node. Write throughput = single_node_throughput × N (assuming even distribution). If one Memcached node handles 200k ops/sec, a 10-node cluster handles ~2M ops/sec total, but any single key’s write rate is still bound by one node.

Real-World Case Study: Redis Cluster Failure at Scale

A major e-commerce platform ran a 24-node Redis Cluster (12 masters, 12 replicas) for session storage and cart data. During a peak traffic event, a network switch failed causing a brief partition between 4 nodes and the rest of the cluster.

The cluster’s quorum-based failover correctly detected that the 4 isolated nodes could not form a majority and did not promote themselves — correct behavior. But the remaining 20 nodes experienced a cascade: the primary for one of the hot key slots was among the isolated nodes. The replica for that slot was in the majority partition. Failover promoted the replica to primary.

The problem: the application was using a slot-aware client that cached slot-to-node mappings. When failover changed slot ownership, the client’s cached mapping became stale. Some application instances still routed requests for the affected slot to the demoted node (now a replica). The replica rejected writes, and those application instances had no mechanism to refresh the slot map — they kept retrying the same failed node.

The recovery took 8 minutes. The fix: application code was updated to handle MOVED errors by refreshing the slot map and retrying, and the slot refresh timeout was reduced from 60 seconds to 5 seconds. Monitoring was added for MOVED error rates per application instance.

The lesson: cluster failover works, but your application’s cluster client must handle MOVED redirects correctly. A slot-aware client that does not honor MOVED creates a partial failure that bypasses cluster resilience. Test failover explicitly — trigger manual failover and observe whether your application recovers cleanly.


Production Failure Scenarios

FailureImpactMitigation
Single cache node crashesKeys on that node missed; potential database overloadReplica promotion; circuit breaker; replica reads during failover
Network partitionCluster split-brain or unavailableRedis Cluster handles automatically; Sentinel monitors
Proxy failure (Twemproxy)All cache requests failRun multiple proxies with load balancer; keep-alive health checks
Shard rebalancingTemporary key remapping; some misses during migrationUse stable cluster configuration; avoid frequent resharding
Invalidation broadcast lostStale data on some nodesImplement periodic reconciliation; use version vectors
Replica lagStale reads from replicasMonitor lag; route consistency-critical reads to primary
Sentinel election failureNo automatic failover possibleRun 3+ Sentinels; ensure proper quorum configuration

Observability Checklist

Metrics to Track

Cluster-Level:

  • cluster_nodes - Number of nodes, their states, and roles
  • cluster_slots_fail - Number of slots not covered by reachable nodes
  • cluster_known_nodes - Total known nodes in cluster

Node-Level:

  • connected_slaves - Number of replicas connected to each master
  • master_repl_offset - Replication position
  • slave_repl_offset - Replica lag calculation
# Cluster health check
def check_cluster_health(cluster):
    health = {
        'healthy': True,
        'nodes': {},
        'issues': []
    }

    for node in cluster.nodes:
        info = node.info()
        health['nodes'][node.name] = {
            'role': info.get('role'),
            'connected': info.get('connected') == 'true',
            'memory': info.get('used_memory'),
        }

        if info.get('role') == 'master' and int(info.get('connected_slaves', 0)) == 0:
            health['issues'].append(f"Master {node.name} has no replicas")

    health['healthy'] = len(health['issues']) == 0
    return health

# Monitor replication lag
def check_replication_lag(primary, replica):
    primary_offset = primary.info()['master_repl_offset']
    replica_info = replica.info()
    replica_offset = replica_info.get('master_repl_offset', 0)
    lag_bytes = primary_offset - replica_offset

    # Rough estimate: 1MB = ~1 second at typical network speeds
    lag_ms = (lag_bytes / 1024 / 1024) * 1000

    if lag_ms > 100:
        logger.warning("replication_lag_high",
            lag_ms=lag_ms,
            lag_bytes=lag_bytes)

Logs to Capture

logger = structlog.get_logger()

# Log cluster topology changes
def log_cluster_event(event_type, node_id, old_state, new_state):
    logger.warning("cluster_topology_change",
        event_type=event_type,
        node_id=node_id,
        old_state=old_state,
        new_state=new_state,
        timestamp=time.time())

# Log failover events
def log_failover(primary_id, new_primary_id):
    logger.critical("cache_failover_completed",
        old_primary=primary_id,
        new_primary=new_primary_id,
        timestamp=time.time())

Alert Rules

- alert: CacheClusterNodeDown
  expr: cache_cluster_connected_nodes < expected_nodes
  for: 1m
  labels:
    severity: critical
  annotations:
    summary: "Cache cluster node(s) unavailable"

- alert: CacheReplicationLag
  expr: cache_replication_lag_bytes > 10485760 # 10MB
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Cache replication lag exceeds 10MB"

- alert: CacheClusterSlotFail
  expr: cache_cluster_slots_fail > 0
  for: 30s
  labels:
    severity: critical
  annotations:
    summary: "Cache cluster has unreachable slots"

Security Checklist

  • Node authentication - Use Redis ACLs or cluster authentication
  • Inter-node encryption - Encrypt traffic between cluster nodes
  • Network isolation - Cluster nodes should be on private network
  • Key prefix namespacing - Prevent collisions in shared cluster
  • Monitor cluster events - Watch for unauthorized topology changes
  • Secure Sentinel - Sentinel instances should require authentication
  • Backup cluster state - Regularly backup cluster configuration
  • Validate node identity - Verify node certificates if using TLS
# Redis Cluster secure configuration
# On each node:
requirepass your-cluster-auth
cluster-preferred-endpoint-type ipv4
tls-cluster yes

# In redis.conf:
# Bind to internal network only
bind 10.0.1.1 -::1

# ACL for application user
user appuser on >apppassword ~app:* on >appdb ~* +get +set +del +exists -flushall -flushdb

Common Pitfalls / Anti-Patterns

1. Assuming Hashing Is Evenly Distributed

Poor hash functions or small keyspaces cause hot spots.

# BAD: Modulo-based sharding creates hot spots
def get_node(key, num_nodes):
    hash_val = hash(key)
    return hash_val % num_nodes  # Uneven for small keyspaces

# GOOD: Use consistent hashing with virtual nodes
class ConsistentHashSharding:
    def __init__(self, nodes, replicas=150):
        self.ring = {}
        for node in nodes:
            for i in range(replicas):
                key = f"{node}:{i}"
                self.ring[md5(key)] = node

2. Not Handling Node Failure Gracefully

Assuming all nodes always available leads to cascading failures.

# BAD: No failure handling
def get(key):
    return nodes[key % len(nodes)].get(key)  # Crashes if node down

# GOOD: Retry on different node, circuit breaker
def get_with_fallback(key, max_retries=3):
    for attempt in range(max_retries):
        try:
            node = get_node_for_key(key)
            return node.get(key)
        except ConnectionError:
            logger.warning("node_connection_failed", node=node, attempt=attempt)
            # Circuit breaker would prevent repeated attempts
    return get_from_database_fallback(key)

3. Pub/Sub Invalidation Without Guarantees

Pub/Sub is fire-and-forget; messages can be lost.

# BAD: Assuming pub/sub invalidation always works
def on_data_update(key, value):
    db.update(key, value)
    redis_cluster.publish('invalidation', key)  # Fire and forget

# GOOD: Verify invalidation, periodic reconciliation
def on_data_update(key, value):
    db.update(key, value)

    # Publish invalidation
    redis_cluster.publish('invalidation', key)

    # Also delete locally to handle the common case
    redis_cluster.delete(key)

    # For strict consistency: check all nodes periodically
    schedule_reconciliation(key)

4. Ignoring Slot Migration Complexity

Adding nodes to Redis Cluster requires migrating slots.

graph LR
    A[Before Migration] --> B[During Migration<br/>Slot 123 split]
    B --> C[Node1 owns 0-5460]
    B --> D[Node2 owns 5461-10922<br/>+ Slot 123 migrating]
    C --> E[After Migration<br/>Node1 owns 0-5460]
    D --> E
# During slot migration:
# 1. Cluster must be in stable state (no other migrations)
# 2. Target node must accept IMPORTING slot
# 3. Source node must accept MIGRATING slot
# 4. Keys must be moved one by one
# 5. Slot ownership transferred after all keys moved

# Don't add nodes during high traffic - migration causes temporary unavailability

5. Single Sentinel for HA

Sentinel needs quorum to elect a new master.

# BAD: Single Sentinel
# If Sentinel crashes, no failover possible

# GOOD: 3+ Sentinels with proper quorum
sentinel monitor mymaster 192.168.1.1 7000 2  # Quorum of 2
sentinel monitor mymaster 192.168.1.2 7000 2  # On different machines
sentinel monitor mymaster 192.168.1.3 7000 2  # Quorum = 2 of 3

Quick Recap

Key Bullets

  • Distributed caching solves memory, throughput, and availability limits of single node
  • Client-side sharding gives lowest latency but requires client logic
  • Proxy-based routing simplifies clients but adds hop and potential SPOF
  • Redis Cluster provides automatic sharding, replication, and failover
  • Cache coherence across nodes requires explicit strategy (pub/sub, write-all, etc.)
  • Session stores are a common distributed cache use case - distributed sessions beat sticky sessions
  • Sentinel handles failover for Redis masters; Redis Cluster handles it natively

Copy/Paste Checklist

# Redis Cluster minimum setup (6 nodes)
redis-cli --cluster create \
  192.168.1.1:7000 \
  192.168.1.2:7000 \
  192.168.1.3:7000 \
  192.168.1.4:7000 \
  192.168.1.5:7000 \
  192.168.1.6:7000 \
  --cluster-replicas 1

# Sentinel configuration (3 nodes)
sentinel monitor mymaster 192.168.1.1 7000 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000

# Pub/Sub invalidation subscription
redis-cli SUBSCRIBE cache:invalidate
# Message format: {"key": "user:123", "action": "delete"}

# Cluster health check
redis-cli cluster info
redis-cli cluster nodes | grep master

# Python Redis Cluster
r = redis.RedisCluster(
    host='192.168.1.1',
    port=7000,
    skip_full_coverage_check=True,
    read_from_replicas=True  # Read from replicas for scale
)

# Deployment checklist:
# [ ] Minimum 3 masters + replicas for HA
# [ ] Sentinel quorum = (sentinels / 2) + 1
# [ ] Monitor replication lag on all replicas
# [ ] Test failover manually before going to production
# [ ] Implement application-level retry logic for cluster operations
# [ ] Use slot-aware client for efficient routing
# [ ] Set up alerts for cluster node failures

See Also


Conclusion

Distributed caching solves scale and availability problems but introduces complexity. Start with a single node, add replication for read scaling, move to clustering when you need more memory or write throughput.

The coherence problem is real. Pick a strategy that matches your consistency requirements. Eventual consistency via pub/sub invalidation works for most use cases. Strong consistency is expensive.

Monitor everything. In distributed systems, you cannot eyeball whether the cache is healthy.

Category

Related Posts

Caching Strategies: Cache-Aside, Write-Through, and More

Master five caching strategies for production systems. Learn cache-aside vs write-through, avoid cache stampede, and scale with these patterns.

#caching #system-design #performance

Cache Stampede Prevention: Protecting Your Cache

Learn how single-flight, request coalescing, and probabilistic early expiration prevent cache stampedes that can overwhelm your database.

#cache #cache-stampede #performance

Microservices vs Monolith: Choosing the Right Architecture

Understand the fundamental differences between monolithic and microservices architectures, their trade-offs, and how to decide which approach fits your project.

#microservices #monolith #architecture