Skip to main content
data systems mechanics invariants in distributed architectures

Coordination Services

5 min read Chapter 20 of 28
Summary

This section examines coordination primitives in distributed systems,...

This section examines coordination primitives in distributed systems, focusing on how services like ZooKeeper implement distributed locks using ephemeral sequential nodes and watch mechanisms. The core guarantee is mutual exclusion (safety), enforced by creating ephemeral sequential nodes under a lock path where the client with the smallest sequence number holds the lock. Other clients watch their immediate predecessor node, acquiring the lock upon deletion notification. Session expiry automatically cleans up ephemeral nodes, ensuring liveness through failure recovery. The section includes a modern Python 3.11+ implementation demonstrating lock acquisition, waiting on predecessor nodes via watches, lock release, and automatic cleanup on session expiration. Key concepts illustrated include: ephemeral nodes for liveness tracking, sequential ordering for deterministic lock acquisition, watch-based notification to avoid polling, and session-based failure recovery as the fundamental trade-off between timely failure detection and false positives.

Coordination Services

Coordination services, such as Apache ZooKeeper and etcd, are foundational to distributed systems, providing strongly consistent primitives that enable reliable distributed coordination. These systems are designed around the principle that failure is the default state; thus, their mechanisms prioritize safety and automatic recovery over liveness. They support higher-level abstractions—such as distributed locks, leader election, and barriers—by enforcing strict invariants through linearizable operations and session semantics.

The design of these services embodies immutable tradeoffs rooted in the CAP theorem: ZooKeeper, for instance, is a CP system, meaning it prioritizes consistency and partition tolerance over availability. During network partitions, lock acquisition may fail (unavailable), but mutual exclusion—the core safety property—is preserved. This guarantee is not incidental; it is enforced by the system’s linearizable write ordering, which serves as the bedrock for all coordination primitives.

Distributed Locks

Distributed locks must ensure mutual exclusion even in the presence of failures, a requirement that dictates their implementation mechanics. The safety of the lock—ensuring only one client holds it at any time—depends on ZooKeeper’s linearizable writes. Liveness, however, is contingent on session timeout configuration, which presents an unavoidable tradeoff: too short a timeout causes false session expiry under transient load or GC pauses, leading to spurious lock releases; too long a timeout delays recovery when a client actually fails, increasing downtime. This tension is an immutable constraint of distributed systems with failure detection based on heartbeats.

Ephemeral nodes must be used because they are automatically deleted upon session termination, whether due to explicit closure or session expiry. This mechanism ensures that client failure results in automatic lock release, preventing deadlocks from crashed or partitioned clients. The lock implementation assumes failure as the default; without ephemeral nodes, this safety net would vanish.

Watches are one-time triggers because they are level-triggered, not edge-triggered. This design prevents missed events in asynchronous systems and ensures clients re-check state after a watch fires, maintaining correctness in the face of concurrent modifications.

Implementation Example

The following Python code illustrates a distributed lock implementation using ZooKeeper-like primitives, demonstrating how invariants are enforced through session semantics and linearizable operations:

import asyncio
from typing import Optional
import uuid

# Simulated ZooKeeper Client Interface
class MockZKClient:
    """Mock client simulating ZooKeeper's ephemeral node and watch mechanics."""
    def __init__(self):
        self.session_id = str(uuid.uuid4())
        self.ephemeral_nodes = {}  # path -> (data, session_id)
        self.sequence_counter = 0
        self.watch_handlers = {}  # path -> list of callbacks

    async def create_ephemeral_sequential(self, path_prefix: str, data: bytes = b"") -> str:
        """Create an ephemeral, sequential node. Returns the full path."""
        self.sequence_counter += 1
        seq_num = f"{self.sequence_counter:010d}"
        full_path = f"{path_prefix}{seq_num}"
        self.ephemeral_nodes[full_path] = (data, self.session_id)
        print(f"Created ephemeral sequential node: {full_path}")
        return full_path

    def get_children(self, path: str) -> list[str]:
        """Get sorted list of child node names under a path."""
        children = [p for p in self.ephemeral_nodes.keys() if p.startswith(path + "/")]
        # Extract sequence numbers and sort
        children_sorted = sorted(children, key=lambda x: int(x.split("-")[-1]))
        return children_sorted

    def set_watch(self, path: str, callback):
        """Set a one-time watch on a node. Watch fires on deletion."""
        if path not in self.watch_handlers:
            self.watch_handlers[path] = []
        self.watch_handlers[path].append(callback)

    async def delete(self, path: str):
        """Delete a node, triggering any watches."""
        if path in self.ephemeral_nodes:
            del self.ephemeral_nodes[path]
            print(f"Deleted node: {path}")
            # Fire watch
            if path in self.watch_handlers:
                for cb in self.watch_handlers.pop(path):
                    asyncio.create_task(cb())

    async def close_session(self):
        """Simulate session expiry: delete all this client's ephemeral nodes."""
        to_delete = [path for path, (_, sid) in self.ephemeral_nodes.items() if sid == self.session_id]
        for path in to_delete:
            await self.delete(path)


class DistributedLock:
    """Implements a distributed lock using ephemeral sequential nodes and watches."""
    def __init__(self, zk_client: MockZKClient, lock_path: str):
        self.zk = zk_client
        self.lock_path = lock_path
        self.my_node_path: Optional[str] = None
        self.lock_acquired = asyncio.Event()

    async def acquire(self) -> bool:
        """Attempt to acquire the lock. Blocks until acquired."""
        # 1. Create an ephemeral sequential node under the lock path.
        self.my_node_path = await self.zk.create_ephemeral_sequential(f"{self.lock_path}/lock-")
        
        while True:
            children = self.zk.get_children(self.lock_path)
            # 2. Determine if we hold the lock (smallest sequence number).
            my_index = children.index(self.my_node_path)
            if my_index == 0:
                # We have the smallest sequence number -> lock acquired.
                print(f"Lock acquired: {self.my_node_path}")
                self.lock_acquired.set()
                return True
            else:
                # 3. Watch the node immediately preceding ours.
                predecessor_path = children[my_index - 1]
                watch_fired = asyncio.Event()
                def on_watch():
                    watch_fired.set()
                
                self.zk.set_watch(predecessor_path, on_watch)
                print(f"Lock not acquired. Watching predecessor: {predecessor_path}")
                await watch_fired.wait()
                # Watch fired, loop again to re-check children.
                print(f"Watch fired on {predecessor_path}, re-attempting lock acquisition.")

    async def release(self):
        """Release the lock by deleting our ephemeral node."""
        if self.my_node_path:
            await self.zk.delete(self.my_node_path)
            self.my_node_path = None
            self.lock_acquired.clear()
            print("Lock released.")


async def main():
    """Demonstrate lock acquisition and release with session expiry."""
    zk = MockZKClient()
    lock = DistributedLock(zk, "/shared_lock")
    
    # Client acquires lock
    print("\n--- Client acquiring lock ---")
    await lock.acquire()
    
    # Simulate some work
    await asyncio.sleep(0.5)
    
    # Client releases lock
    print("\n--- Client releasing lock ---")
    await lock.release()
    
    # Simulate session expiry (e.g., network partition)
    print("\n--- Simulating session expiry ---")
    await zk.close_session()
    # After expiry, the ephemeral node is gone, lock is automatically released.

if __name__ == "__main__":
    asyncio.run(main())