Apache ZooKeeper: Consensus and Coordination

Explore ZooKeeper's Zab consensus protocol, hierarchical znodes, watches, leader election, and practical use cases for distributed coordination.

published: reading time: 21 min read

Apache ZooKeeper: Consensus and Coordination for Distributed Systems

Yahoo! Research started ZooKeeper in 2007 as a Hadoop subproject. The problem they tackled is universal in distributed systems: coordinating multiple processes is hard, and everyone was writing their own coordination code badly.

The insight: distributed coordination has common patterns. Rather than every system reinventing leader election, distributed locks, and configuration management, ZooKeeper could provide these as building blocks.

ZooKeeper became the coordination layer for HBase, Kafka, Drill, and many others relying on leader election and cluster state management.


Data Model: Hierarchical Namespaces

ZooKeeper exposes a hierarchical namespace of znodes, like a filesystem. Each znode holds data and can have children, forming a tree.

/zookeeper           # Reserved namespace for ZooKeeper internals
/config              # Application configuration
/config/cluster      # Cluster-wide settings
/services            # Service registry
/services/api       # API service endpoints
/locks               # Distributed locks
/election            # Leader election nodes

Unlike filesystems, ZooKeeper znodes are not directories for general use. They are lightweight coordination primitives for small amounts of critical data: configuration values, lock states, leader information.

# Create a znode (persistent by default)
create /config/database "db.example.com:5432"

# Get znode data
get /config/database

# Create an ephemeral znode (deleted when client disconnects)
create -e /services/api/worker-1 "192.168.1.10:8080"

# Create a sequential znode (ZooKeeper appends a monotonically increasing counter)
create -s /locks/job- "worker-1"
# Result: /locks/job-0000000001

Znodes have special flags:

  • Ephemeral: Automatically deleted when client session ends
  • Sequential: ZooKeeper appends a monotonically increasing counter to the name
  • Container: For leader election and similar patterns; deleted when empty

Sessions and Watches

Client sessions are the foundation. Connect to ZooKeeper and establish a session with a configurable timeout. Sessions are maintained via periodic pings; miss too many and the session expires.

// Java client example
ZooKeeper zk = new ZooKeeper(
    "zoo1:2181,zoo2:2181,zoo3:2181",  // Connection string
    30000,                              // Session timeout (ms)
    event -> {
        switch (event.getState()) {
            case SyncConnected:
                System.out.println("Connected to ZooKeeper");
                break;
            case Disconnected:
                System.out.println("Disconnected from ZooKeeper");
                break;
            case Expired:
                System.out.println("Session expired");
                break;
        }
    }
);

Watches are one-time notifications when a znode changes. Set a watch, the znode changes, ZooKeeper sends a notification.

// Set a watch and handle changes
zk.getData("/config/database", event -> {
    if (event.getType() == EventType.NodeDataChanged) {
        System.out.println("Configuration changed!");
        // Reload configuration
    }
}, new Stat());
sequenceDiagram
    participant C as Client
    participant Z as ZooKeeper

    C->>Z: getData("/config", watch=true)
    Z-->>C: Current data

    Note over Z: Data changed
    Z->>C: Watch triggered
    C->>Z: getData("/config", watch=true)
    Z-->>C: New data

Watches are one-time triggers. After a watch fires, set a new watch to keep monitoring. Simplifies the implementation but requires client re-arming on each notification.


The Zab Protocol

ZooKeeper uses ZooKeeper Atomic Broadcast (Zab) for consensus. Zab is similar to Paxos in guarantees (linearizable writes) but different in approach, optimized for ZooKeeper’s workload.

Zab has two main phases:

Recovery (Catch-up): Leader fails, new leader takes over, followers synchronize. New leader has the most recent committed updates, followers replay missed transactions.

Broadcast: During normal operation, leader broadcasts transactions to followers. Transaction commits when a majority acknowledges.

graph TD
    A[Leader] -->|PROPOSE| B[Follower 1]
    A -->|PROPOSE| C[Follower 2]
    A -->|PROPOSE| D[Follower 3]

    B -->|ACK| A
    C -->|ACK| A
    D -->|ACK| A

    A -->|COMMIT| B
    A -->|COMMIT| C
    A -->|COMMIT| D

The key property: ZooKeeper guarantees FIFO client ordering per client. If a client submits A then B, ZooKeeper ensures A is applied before B, even across leader changes.


Leader Election

ZooKeeper implements leader election through ephemeral sequential znodes:

public class LeaderElection {
    private static final String ELECTION_PATH = "/election";

    public void runForLeadership() throws KeeperException {
        // Create ephemeral sequential znode
        String znode = zk.create(
            ELECTION_PATH + "/candidate-",
            myAddress.getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EphemeralSequential
        );

        // Get all candidates sorted by znode name (which includes sequence number)
        List<String> candidates = zk.getChildren(ELECTION_PATH, false);
        Collections.sort(candidates);

        // If I'm the first, I'm the leader
        String smallest = candidates.get(0);
        if (smallest.equals(znode.substring(znode.lastIndexOf('/') + 1))) {
            System.out.println("I am the leader!");
        }
    }
}

Leader crashes, ephemeral znode disappears, next candidate becomes leader. Kafka uses this pattern for controller leader election.


Distributed Locks

ZooKeeper implements distributed locks via ephemeral znodes and watches:

  1. Create an ephemeral sequential znode under the lock path
  2. Get all children and sort them
  3. If your znode is first, you hold the lock
  4. If not, watch the previous znode
  5. Previous znode disappears, you may hold the lock
public class DistributedLock {
    private static final String LOCK_PATH = "/locks/my-resource";

    public boolean acquire() throws KeeperException {
        // Create ephemeral sequential znode
        myZnode = zk.create(LOCK_PATH + "/lock-",
            myId.getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EphemeralSequential
        );

        // Check if we are the smallest
        while (true) {
            List<String> locks = zk.getChildren(LOCK_PATH, false);
            Collections.sort(locks);

            if (locks.get(0).equals(getLocalName())) {
                return true;  // We hold the lock
            }

            // Watch the previous lock holder
            String previous = locks.get(getIndex() - 1);
            zk.getData(LOCK_PATH + "/" + previous, watch, new Stat());
            return false;  // Not our turn yet
        }
    }

    public void release() throws KeeperException {
        zk.delete(myZnode, -1);
    }
}

This lock implementation is correct but has a scalability limitation: every acquisition creates a znode and reads the children list. For high-contention locks, consider etcd or Redis optimized for this workload.


Common Use Cases

ZooKeeper handles several coordination patterns:

Service Discovery: Register services as ephemeral znodes, discover by reading the directory. Service dies, znode disappears automatically.

/services/http/api    # Ephemeral znode for each API instance

Configuration Management: Store configuration in ZooKeeper, watch for changes. Config updates notify all interested parties.

/config/application   # Contains JSON or properties
/config/database
/config/features

Leader Election: As shown above, for selecting a primary in a replicated system.

Barrier: Coordinate when multiple processes should proceed. Create a container znode; when enough processes signal, they all proceed.

Double Barrier: Ensure processes start and end together. Enter when N processes arrive, release when N exit.


Working with ZooKeeper

The ZooKeeper distribution includes a Java client and CLI. For most languages, use Curator which adds retry logic, connection management, and higher-level recipes.

# Start ZooKeeper in standalone mode (testing only)
bin/zkServer.sh start-foreground

# CLI commands
zkCli.sh -server localhost:2181
[zookeeper] ls /
[zookeeper] create /test "hello"
[zookeeper] get /test
[zookeeper] set /test "world"
[zookeeper] delete /test
// Using Curator for higher-level patterns
CuratorFramework client = CuratorFrameworkFactory.newClient(
    "zoo1:2181,zoo2:2181,zoo3:2181",
    new RetryNTimes(3, 1000)
);
client.start();

// Leader latch (simpler leader election)
LeaderLatch latch = new LeaderLatch(client, ELECTION_PATH);
latch.start();
latch.await();
if (latch.hasLeadership()) {
    // I am the leader
}

Limitations and Best Practices

Limitations:

  • Not for data storage: ZooKeeper is for coordination, not application data. Keep znodes small (KB range).
  • Scalability ceiling: Single ensemble handles roughly 10k-100k requests per second. For more, use multiple ensembles or etcd.
  • Watch reliability: Watches are one-time; handle missed notifications during reconnection.
  • Session timeout: Too-aggressive timeouts cause spurious disconnections during GC pauses.

Best practices:

  • Use an odd number of nodes (3, 5, 7) for fault tolerance
  • Co-locate ZooKeeper with low-latency network links
  • Monitor the outstanding requests queue length
  • Set appropriate tick time and init/max connections limits

When to Use / When Not to Use ZooKeeper

When to Use ZooKeeper

ZooKeeper shines for distributed locking and leader election. It offers simple, proven recipes using ephemeral nodes and sequences — locks, barriers, and leader election all come out of the box without custom implementation. Configuration management works well too: you store shared configuration in ZooKeeper and watch for changes, updating all interested parties in real time. Service discovery fits naturally since you can register endpoints, discover active instances, and track health through ephemeral nodes. If you need barrier or queue patterns, ZooKeeper’s built-in recipes cover most coordination needs.

When Not to Use ZooKeeper

ZooKeeper stumbles when data access is frequent. It’s not a database, so storing anything beyond small coordination metadata quickly causes performance problems. At very large scale, a single ensemble maxes out around 10k-100k requests per second — if you need more, split across multiple ensembles or consider migrating to etcd. Multi-tenant environments also pose challenges since without namespace isolation, workloads can interfere with each other. And if you’re running cloud-native workloads without StatefulSets, the operational overhead of managing ZooKeeper becomes significant — operators or managed services are easier paths.

Production Failure Scenarios

FailureImpactMitigation
Ensemble loses quorum (N/2+ nodes down)ZooKeeper becomes unavailable; all coordination operations haltRun an odd number of nodes (3, 5, 7); co-locate ensemble members in different availability zones; monitor ZK service health
Leader election stormNetwork blip triggers leader election; all in-flight operations failSet appropriate tickTime, initLimit, and syncLimit; ensure network is stable; avoid GC pauses that look like failures
Session timeout misconfigurationClients get disconnected during normal GC pauses; ephemeral nodes disappear; locks are releasedSet tickTime to 2000ms minimum; set maxSessionTimeout generously; handle Disconnected events gracefully
Watch delivery delayDuring leader election, watch notifications are delayed; clients miss state changesMonitor watch count per client; set appropriate session timeout; avoid watch storms
Snapshot size explosionDatabase snapshot grows too large; restart takes too longSet snapCount appropriately; enable auto-purge; keep data directory on fast storage
Client reconnect floodWhen ZooKeeper recovers, thousands of clients reconnect simultaneously; overwhelm the leaderUse exponential backoff on reconnect; stagger client startup times; rate-limit reconnection attempts
Znode data too largeStoring multi-MB data in znode causes performance degradationKeep znode data under 1MB; use separate storage for large data; reference paths to S3/HDFS instead

ACLs and Authentication

ZooKeeper supports fine-grained access control through ACLs. Each znode has its own ACL list governing who can read, write, delete, or administer it.

SASL/Kerberos authentication:

# In zoo.cfg - enable SASL authentication
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
# Requires kinit setup on each ZooKeeper node and client

IP-based ACLs:

// Restrict a znode to specific IP addresses
List<ACL> acl = Arrays.asList(
    new ACL(Perms.READ, new Id("ip", "192.168.1.0/24")),
    new ACL(Perms.ALL, new Id("ip", "192.168.1.100"))
);
zk.create("/config/database", data, acl, CreateMode.PERSISTENT);

The danger of OPEN_ACL_UNSAFE:

// This is in the ZooKeeper source code - OPEN_ACL_UNSAFE grants full permissions to anyone
ZooDefs.Ids.OPEN_ACL_UNSAFE  // = Perms.ALL for scheme "world", id "anyone"

// NEVER use OPEN_ACL_UNSAFE in production:
zk.create("/locks/my-lock", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
// Any client that can reach ZooKeeper can delete or modify this znode!

Production ACL patterns:

// Use world:anyone with READ only for public znodes
new ACL(Perms.READ, new Id("world", "anyone"))

// Use auth: anyone authenticated can access
new ACL(Perms.ALL, new Id("auth", ""))

// Digest authentication - username/password based
Id digestId = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
new ACL(Perms.ALL, digestId)

SASL (Kerberos) in a Hadoop-style cluster:

# In zoo.cfg for a Kerberized cluster
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
kerberos.removeHostAddress=true
kerberos.removeUserAddress=true

# Client JVM needs jaas.conf:
# Client {
#   com.sun.security.auth.module.Krb5LoginModule required
#   useKeyTab=true keyTab="/path/to/keytab" principal="zookeeper/node@REALM";
# };

Multi-Cluster Deployment

Running separate ZooKeeper ensembles for different applications provides fault isolation and prevents noisy-neighbor problems.

When to use separate ensembles:

ScenarioRecommendation
Kafka and HBase share a clusterSplit into dedicated ensembles
Production and staging shareAlways use separate ensembles
Different security requirementsSeparate ensembles with different ACLs
Different latency requirementsSeparate ensembles co-located per tier

Deployment architecture:

# Ensemble A - for Kafka (3 nodes, low latency)
/zoo-a/  # 10.0.1.1:2181, 10.0.1.2:2181, 10.0.1.3:2181

# Ensemble B - for HBase (3 nodes, low latency)
/zoo-b/  # 10.0.2.1:2181, 10.0.2.2:2181, 10.0.2.3:2181

# Ensemble C - for monitoring/coordination (3 nodes)
/zoo-c/  # 10.0.3.1:2181, 10.0.3.2:2181, 10.0.3.3:2181

Client configuration for specific ensemble:

// Connect to only the correct ensemble
ZooKeeper zk = new ZooKeeper(
    "zoo-a-1:2181,zoo-a-2:2181,zoo-a-3:2181",  // Only Kafka ensemble
    30000,
    event -> { /* handler */ }
);

Naming conventions in shared namespace deployments:

If you run one ensemble but need logical separation, use top-level znodes as namespaces:

# Logical namespace per application (but same physical ensemble)
/
/kafka        # Kafka's znodes
/hbase        # HBase's znodes
/monitoring   # Monitoring service's znodes

This is less isolated than separate ensembles but simpler to operate. Use separate ensembles when one application’s load or faults could impact another.


Rolling Upgrade Procedures

ZooKeeper supports rolling upgrades where you upgrade one node at a time while keeping the ensemble available.

Pre-upgrade checklist:

# 1. Verify ensemble is healthy
echo "srvr" | nc localhost 2181
# Look for mode: leader or mode: follower, and all connections established

# 2. Check for any pending transactions
echo "mntr" | nc localhost 2181 | grep zk_pending
# Should be near zero before upgrading

# 3. Back up current data
tar -czf /backup/zookeeper-$(date +%Y%m%d).tar.gz /var/lib/zookeeper/data

Rolling upgrade steps:

# For each ZooKeeper node (one at a time):

# 1. Stop the node
systemctl stop zookeeper

# 2. Update ZooKeeper binary to new version
# rpm -Uvh zookeeper-3.9.x-server.rpm  (or apt, tarball, etc.)

# 3. Start the node
systemctl start zookeeper

# 4. Wait for node to rejoin and catch up
sleep 10
echo "srvr" | nc localhost 2181
# Verify node is in sync:Mode should be follower or leader

# 5. Verify ensemble health before proceeding to next node
echo "srvr" | nc any-healthy-node 2181
# Check that all nodes are connected: num_alive_connections = 2 (for 3-node)

Version compatibility:

Upgrade PathBehavior
3.4.x -> 3.5.x (patch)Rolling upgrade supported
3.4.x -> 3.6.xRolling upgrade supported
3.5.x -> 3.6.xRolling upgrade supported
3.4/3.5 -> 3.9.xRolling upgrade supported
Major version jumpRequires full cluster restart

Never upgrade more than one minor version at a time. For example, 3.4 -> 3.6 should go through 3.5.


Disk I/O Sensitivity

ZooKeeper is extremely sensitive to disk I/O latency, particularly WAL fsync operations.

Why ZooKeeper is I/O sensitive:

# Every write operation:
// 1. Append to WAL (write + fsync)
// 2. Write to transaction log
// 3. Update in-memory state
// 4. fsync before returning to client

# ZooKeeper fsync latency directly controls write throughput:
// If fsync P99 = 10ms -> max ~100 writes/second per node
// If fsync P99 = 1ms -> max ~1000 writes/second per node

SSD requirements for ZooKeeper:

EnvironmentMinimumRecommended
DevelopmentSSD (SATA)NVMe SSD
ProductionNVMe SSD mandatoryNVMe SSD with >50K IOPS
WAN deploymentNVMe SSD + separate diskDedicated NVMe for dataDir

Disk configuration:

# Use separate disks for dataDir and dataLogDir (transaction log)
# dataLogDir should be on the fastest disk (SSD/NVMe)
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/txnlog  # NVMe SSD

# In zoo.cfg - Preallocates transaction log files
# Increase if you have many writes
preAllocSize=65536

# Snapshots - less I/O sensitive but still important
# Increase snapshot count to reduce frequency
snapCount=100000

# Monitor disk I/O latency
iostat -x 1
# Key metric: avgqu-sz should stay < 4 for sustained periods
# If avgqu-sz > 16, your disk is severely overloaded

Do not use network-attached storage (NAS, NFS, EBS network volumes) for ZooKeeper data or transaction logs. Network latency adds directly to fsync latency and causes follower lag that triggers leader elections.


Curator Recipes Catalog

Curator is the most widely-used ZooKeeper Java client. Beyond locks, it provides higher-level recipes:

GroupMember recipe (for service membership):

// GroupMember tracks which nodes are currently active members of a group
// Much simpler than building this with ephemeral znodes manually

CuratorFramework client = CuratorFrameworkFactory.newClient(
    "zoo1:2181,zoo2:2181,zoo3:2181",
    new RetryNTimes(3, 1000)
);
client.start();

GroupMember<String> group = new GroupMember<String>(
    client,
    "/services/myapp/members",  // Group path
    UUID.randomUUID().toString()  // Our member ID
);

// Listen for membership changes
group.addMembershipChangeListener(event -> {
    System.out.println("Members: " + event.getMembers());
});

// Start - this creates our ephemeral znode
group.start();

// On shutdown
group.close();
client.close();

DistributedDoubleBarrier recipe (processes start and end together):

// DoubleBarrier - enter N processes, block until all arrive, then release all together
// Useful for distributed batch jobs that need all participants ready before starting

DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
    client,
    "/barriers/batch-job",
    3  // Wait for 3 processes
);

barrier.enter();  // Blocks until 3 processes call enter()
System.out.println("All participants ready - starting batch job");

// ... do batch work ...

barrier.leave();  // Blocks until 3 processes call leave()
System.out.println("All participants finished");

Other useful Curator recipes:

// LeaderLatch - simple leader election (non-blocking)
LeaderLatch latch = new LeaderLatch(client, "/election/leader");
latch.start();
latch.await();  // Block until we become leader
if (latch.hasLeadership()) {
    // We are leader
}

// DistributedQueue - FIFO queue backed by ZooKeeper znodes
DistributedQueue<String> queue = new DistributedQueue<String>(
    client,
    "/queues/work-queue",
    new StringCodec()
);
queue.put("task-1");
String task = queue.take();  // Blocks until item available

// DistributedPriorityQueue - items consumed in priority order
DistributedPriorityQueue<Integer> priorityQueue =
    new DistributedPriorityQueue<Integer>(client, "/queues/priority", 5);

When to use Curator recipes vs. raw ZooKeeper:

Use CaseRecommendation
Leader electionCurator LeaderLatch (simpler)
Distributed locksCurator InterProcessMutex
Double barriersCurator DistributedDoubleBarrier
Service discovery/membershipCurator GroupMember
Configuration managementRaw ZooKeeper + watches (Curator is overkill)

Four-Letter Command Monitoring

ZooKeeper exposes management commands via its admin port (2181) using four-letter commands.

The three most useful four-letter commands:

# srvr - server statistics
echo "srvr" | nc localhost 2181
# Output:
# Zookeeper version: 3.9.3
# Latency min/avg/max: 0/0/3
# Packets: sent=12345 recv=12344
# Outstanding: 0
# Zxid: 0x10000002c
# Mode: leader
# Node count: 1543

# stat - connection statistics + srvr
echo "stat" | nc localhost 2181
# Adds: Connected count, IP list, Session info

# mntr - Prometheus-friendly metrics output
echo "mntr" | nc localhost 2181
# zk_version=3.9.3
# zk_server_state=leader
# zk_avg_latency=0.5
# zk_max_latency=15
# zk_min_latency=0
# zk_packets_sent=12345
# zk_packets_received=12344
# zk_num_alive_connections=2
# zk_pending_syncs=0
# zk_znode_count=1543
# zk_ephemerals_count=45
# zk_approximate_data_size=524288

Key mntr metrics to alert on:

MetricWarning ThresholdCritical Threshold
zk_num_alive_connections> 100> 500
zk_pending_syncs> 10> 100
zk_outstanding_requests> 10> 100
zk_avg_latency> 10ms> 100ms
zk_max_latency> 100ms> 1000ms
zk_ephemerals_count> 10000> 50000
zk_watch_count> 50000> 200000

Automating four-letter commands in scripts:

#!/bin/bash
# health_check.sh - run via cron or monitoring agent

ZK_HOST="${ZK_HOST:-localhost}"
ZK_PORT="${ZK_PORT:-2181}"

# Check if ZooKeeper is responding
if echo "ruok" | nc -w 3 $ZK_HOST $ZK_PORT | grep -q "imok"; then
    echo "ZooKeeper is healthy"
else
    echo "ZooKeeper is NOT healthy"
    exit 1
fi

# Check latency
LATENCY=$(echo "mntr" | nc -w 3 $ZK_HOST $ZK_PORT | grep "^zk_avg_latency" | cut -f2)
if (( $(echo "$LATENCY > 10" | bc -l) )); then
    echo "WARNING: ZooKeeper latency is ${LATENCY}ms"
fi

Enable four-letter commands in production only on internal network interfaces, never on public IPs. They provide detailed cluster state that could aid attackers.


initLimit and syncLimit Tuning for WAN Deployments

ZooKeeper has two timing parameters that become critical when ensemble members are spread across WAN locations.

tickTime (base timing unit in ms, default: 2000):

# tickTime = 2000ms (default)
# All other timing values are expressed in tickTime units

initLimit (ticks allowed for follower to initial sync with leader):

# initLimit = 10 (default)
# Time allowed = 10 * tickTime = 20 seconds
# Increase this if followers need more time to catch up on initial startup

syncLimit (ticks allowed for follower to sync with leader during normal operation):

# syncLimit = 5 (default)
# Time allowed = 5 * tickTime = 10 seconds
# This is how far behind a follower can drift before being considered dead

WAN tuning example (us-east to us-west):

# WAN RTT between regions: ~70ms
# tickTime = 2000ms
# initLimit = 100  # 200 seconds for initial sync (followers might need to replay years of WAL)
# syncLimit = 10   # 20 seconds for sync (catches transient network blips)

# In zoo.cfg:
tickTime=2000
initLimit=100
syncLimit=10

Practical syncLimit tuning guidance:

Network TypeRTTsyncLimit Recommendation
Same datacenter< 1mssyncLimit=5 (default)
Same region (AZ)1-5mssyncLimit=5-10
Cross-region20-100mssyncLimit=10-50
WAN/multi-region100-300mssyncLimit=50-150

initLimit tuning for large data volumes:

# If your ZooKeeper database is large (>1GB),
# a follower may need significant time to load snapshot + replay WAL

# Estimate initLimit needed:
# initLimit >= (time to load snapshot + WAL replay) / tickTime

# For 5GB database with slow disk:
# Snapshot load: 30 seconds
# WAL replay (100K entries): 20 seconds
# Total: 50 seconds
# initLimit = 50 / 2 = 25 (at tickTime=2000ms)

# Recommendation: always over-provision initLimit for WAN
# better to have extra time than to have followers excluded

Monitoring when syncLimit is too tight:

# Check for disconnections in logs
grep "closing client connection" /var/log/zookeeper/zookeeper.log

# Or via four-letter command
echo "srvr" | nc localhost 2181 | grep "Outstanding"
# If Outstanding > 0 continuously, syncLimit might be too tight


Summary

ZooKeeper provides essential coordination primitives for distributed systems:

  • Leader election: Through ephemeral sequential znodes
  • Distributed locks: Through znode ordering and watches
  • Configuration management: Through persistent znodes with watches
  • Service discovery: Through ephemeral znodes for ephemeral services

The Zab protocol delivers linearizable writes essential for coordination correctness. Without this guarantee, distributed locks could have race conditions that appear correct but fail under leader elections.

ZooKeeper’s age shows in some design decisions: one-time watches need client re-arming, the C client library has rough edges. But for leader election and coordination, ZooKeeper remains solid and widely-deployed.

For greenfield projects, etcd is often a better choice today with a cleaner API and better performance. But ZooKeeper’s maturity and battle-testing in production Hadoop and Kafka clusters keep it relevant, especially when integrating with existing systems.


Observer Nodes: Read Scaling Extension

Standard ZooKeeper ensembles require a majority to acknowledge writes. This limits write scalability but also read scalability since followers handle both reads and writes.

Observer nodes solve this by participating in ZooKeeper without voting. They receive proposals from the leader and apply transactions, but do not count toward majority for write commitment.

graph TD
    A[Leader] -->|PROPOSE| B[Follower 1]
    A -->|PROPOSE| C[Follower 2]
    A -->|PROPOSE| D[Observer 1]
    A -->|PROPOSE| E[Observer 2]

    B -->|ACK| A
    C -->|ACK| A
    D -.->|No ACK| A
    E -.->|No ACK| A

    A -->|COMMIT| B
    A -->|COMMIT| C
    A -->|COMMIT| D
    A -->|COMMIT| E

Why use observers:

Without ObserversWith Observers
Reads limited by follower countScale reads independently
Cross-region reads add WAN latencyObservers in each region for local reads
All nodes must handle write quorumOnly voting members needed for writes
Adding nodes increases write loadAdd observers without affecting writes

Configuration:

# In zoo.cfg for observer node
peerType=observer
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
server.4=zoo4:2888:3888:observer   # Observer

Use case - geo-distributed reads:

us-east-1: 3 voters (leader preferred here)
us-west-2: 2 observers for local reads
eu-west-1: 1 observer for local reads

Clients in us-west-2 can read from their local observer without crossing the WAN. Writes still go to us-east-1 majority.

Category

Related Posts

etcd: Distributed Key-Value Store for Configuration

Deep dive into etcd architecture using Raft consensus, watches for reactive configuration, leader election patterns, and Kubernetes integration.

#distributed-systems #databases #etcd

Google Chubby: The Lock Service That Inspired ZooKeeper

Explore Google Chubby's architecture, lock-based coordination, Paxos integration, cell hierarchy, and its influence on distributed systems design.

#distributed-systems #databases #google

Leader Election in Distributed Systems

Leader election is the process of designating a single node as the coordinator among a set of distributed nodes, critical for consensus protocols.

#distributed-systems #leader-election #consensus