lib.rs
Overview
This file implements a core component of a distributed gossip protocol system designed for cluster membership and state synchronization. The main abstraction is the Chitchat struct, which maintains cluster state, detects node failures, processes gossip messages, and manages node liveness. It provides mechanisms for nodes to exchange state digests and deltas, detect membership changes, and notify listeners about key-value updates in the cluster.
The file also defines constants, utility types, and test cases validating the system’s behavior under various cluster conditions such as node joins, leaves, network partitions, and key-value synchronization.
Constants
MAX_UDP_DATAGRAM_PAYLOAD_SIZE: usize
Defines the maximum UDP datagram payload size as 65,507 bytes.
This large size accommodates sending full self-digests without fragmentation issues in typical Ethernet MTU environments.
Relevant to message size constraints and fragmentation handling in the gossip protocol.
Structs and Types
Chitchat
Central struct representing a node's view and participation in the cluster gossip protocol.
Fields
config: ChitchatConfig— Configuration parameters controlling node behavior, failure detection, and cluster identification.cluster_state: ClusterState— The local view of the cluster membership and key-value states.failure_detector: FailureDetector— Tracks heartbeat information to infer node liveness.previous_live_nodes: HashMap<ChitchatId, Version>— Cache of previously known live nodes and their versions for change detection.live_nodes_watcher_tx: watch::Sender<BTreeMap<ChitchatId, NodeState>>— Async channel sender notifying listeners of live node updates.live_nodes_watcher_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>— Receiver counterpart for live nodes watcher stream.max_datagram_payload_size: usize— Max UDP payload size used to respect MTU constraints in messaging.
Implementation Details
Uses a failure detector with Phi Accrual algorithm to determine node health.
Maintains cluster state with versioned key-value pairs and tracks deletions.
Supports asynchronous notification for membership changes via Tokio watch channels.
Manages message processing, including SYN, SYN-ACK, ACK, and BadCluster messages to implement gossip handshake.
Implements garbage collection of keys marked for deletion and removal of dead nodes.
Supports subscription to key change events with callbacks filtered by key prefix.
Key Methods
with_chitchat_id_and_seeds
pub fn with_chitchat_id_and_seeds(
config: ChitchatConfig,
seed_addrs: watch::Receiver<HashSet<SocketAddr>>,
initial_key_values: Vec<(String, String)>,
max_datagram_payload_size: usize,
) -> Self
Constructs a new
Chitchatinstance with the given configuration, seeds, initial key-values, and payload size.Initializes failure detector, cluster state with seeds, and live nodes watcher channels.
Marks the current node as alive and sets initial key-values.
Usage example:
let chitchat = Chitchat::with_chitchat_id_and_seeds(
config,
seed_addrs_rx,
vec![("config_version".to_string(), "1".to_string())],
MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
);
create_syn_message
pub(crate) fn create_syn_message(&self) -> ChitchatMessage
Creates a SYN message containing the cluster ID and a digest of the local node's state.
Used to initiate gossip handshakes with peers.
process_message
pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option<ChitchatMessage>
Processes incoming gossip messages.
Handles message types:
Syn: validates cluster ID, reports heartbeats, computes digest, and returnsSynAck.SynAck: reports heartbeats, applies delta, computes delta, and returnsAck.Ack: applies delta; no response.BadCluster: logs warning; no response.
Updates own heartbeat before processing.
Returns an optional response message for further gossip exchange.
update_nodes_liveness
pub(crate) fn update_nodes_liveness(&mut self)
Updates node liveness states based on failure detector results.
Sends notifications via
live_nodes_watcher_txwhen live nodes change.Performs garbage collection of nodes considered dead.
Removes nodes that have been garbage collected from cluster state.
reset_node_state
pub fn reset_node_state(
&mut self,
chitchat_id: &ChitchatId,
key_values: impl Iterator<Item = (String, VersionedValue)>,
max_version: Version,
last_gc_version: Version,
)
Resets the entire state of a node given by
chitchat_id.Updates key-values with the provided iterator; removes keys not present in the input.
Only applies if the new max version exceeds the current max version.
Ensures node is tracked by the failure detector to prevent premature GC.
Triggers listeners for updated keys.
Usage example:
chitchat.reset_node_state(
&node_id,
vec![
("key1".to_string(), VersionedValue::new("value1".to_string(), 2, false)),
].into_iter(),
2,
100,
);
subscribe_event
pub fn subscribe_event(
&self,
key_prefix: impl ToString,
callback: impl Fn(KeyChangeEvent) + 'static + Send + Sync,
) -> ListenerHandle
Subscribes a callback to be invoked whenever keys with the specified prefix are inserted or updated.
The callback receives a
KeyChangeEventcontaining the key suffix, new value, and originating node.The callback must be lightweight and should not access cluster state.
Returns a handle to manage the subscription.
Deleted keys do not trigger callbacks.
live_nodes_watch_stream
pub fn live_nodes_watch_stream(&self) -> WatchStream<BTreeMap<ChitchatId, NodeState>>
Returns an asynchronous stream emitting updates whenever the live node set changes.
Useful for consumers interested in membership changes and node version updates.
Other Methods
node_states,node_state,self_node_state— Accessors for node state information.live_nodes,dead_nodes,scheduled_for_deletion_nodes— Iterators over node sets based on liveness.seed_nodes— Returns the set of seed node addresses.cluster_id,self_chitchat_id— Accessors for cluster and self node identifiers.state_snapshot— Produces a serializable snapshot of the entire cluster state.gc_keys_marked_for_deletion— Cleans up keys marked for deletion after grace period.update_self_heartbeat— Increments local node's heartbeat counter.cluster_state— Returns a reference to the current cluster state.compute_digest— Computes a digest summarizing the cluster state for delta computations.
KeyChangeEvent<'a>
Represents an event triggered when a key matching a subscribed prefix is inserted or updated.
Fields
key: &'a str— The key without the subscription prefix.value: &'a str— The new value of the key.node: &'a ChitchatId— The node owning the key-value pair.
Methods
strip_key_prefix(&self, prefix: &str) -> Option<KeyChangeEvent<'_>>
Returns a newKeyChangeEventwith the prefix stripped from the key if it matches, otherwiseNone.
Important Implementation Details
Failure Detection: The file uses a configurable failure detector (likely Phi Accrual) to track node heartbeats and determine liveness, dead nodes, and nodes scheduled for deletion.
State Synchronization: Uses digests and deltas to efficiently exchange cluster state updates during gossip handshakes, respecting MTU and payload size constraints.
Garbage Collection: Dead nodes and keys marked for deletion are cleaned up after configured grace periods, preventing stale data buildup.
Event Subscription: Provides a mechanism to allow external components to subscribe to key change notifications filtered by key prefix, enabling reactive behavior on cluster state changes.
Concurrency and Async: Uses Tokio's watch channels and streams to asynchronously notify about live node changes.
Interactions with Other Modules
configuration: DefinesChitchatConfigused for node and cluster configuration.delta: ProvidesDeltastructs representing changes applied to cluster state.digest: ImplementsDigeststructures summarizing cluster membership and state versions.failure_detector: ContainsFailureDetectorand related config; manages heartbeat tracking and node liveness.listener: Manages event listeners for key changes (ListenerHandle).message: DefinesChitchatMessagetypes used for gossip protocol communication.server: Provides functions and handles for spawning and interacting with the gossip server (spawn_chitchat,ChitchatHandle,ChitchatRef).state: ImplementsClusterState,NodeState, and snapshot types representing cluster membership and key-value state.serialize: Exposes serialization traits (Serializable,Deserializable) used for network message encoding.transport: Abstracts network transport layer for message delivery.types: Defines types likeChitchatId,Heartbeat,Version, andVersionedValueused throughout the system.
Visual Diagram
classDiagram
class Chitchat {
-config: ChitchatConfig
-cluster_state: ClusterState
-failure_detector: FailureDetector
-previous_live_nodes: HashMap<ChitchatId, Version>
-live_nodes_watcher_tx: watch::Sender<BTreeMap<ChitchatId, NodeState>>
-live_nodes_watcher_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>
-max_datagram_payload_size: usize
+with_chitchat_id_and_seeds()
+create_syn_message()
+process_message()
+update_nodes_liveness()
+reset_node_state()
+subscribe_event()
+live_nodes_watch_stream()
+gc_keys_marked_for_deletion()
+update_self_heartbeat()
+node_states()
+node_state()
+self_node_state()
+live_nodes()
+dead_nodes()
+scheduled_for_deletion_nodes()
+seed_nodes()
+cluster_id()
+self_chitchat_id()
+state_snapshot()
+compute_digest()
}
class KeyChangeEvent {
+key: &str
+value: &str
+node: &ChitchatId
+strip_key_prefix()
}
Chitchat "1" o-- "1" ClusterState
Chitchat "1" o-- "1" FailureDetector
Chitchat "1" o-- "*" ListenerHandle
Chitchat "1" o-- "1" ChitchatConfig
ChitchatMessage <|-- Chitchat
KeyChangeEvent ..> ChitchatId
Tests
The file contains comprehensive unit and integration tests validating handshake protocols, node liveness transitions, cluster state synchronization, event subscriptions, garbage collection, and resilience under network partitions.
Tests use mock transports and simulate multiple nodes joining, leaving, and rejoining the cluster.
Includes async tests using Tokio to verify timing-sensitive behaviors like heartbeat detection and catch-up callback invocation.
Validates that key-value pairs are correctly synchronized and that event listeners receive appropriate notifications.
Usage Example of Gossip Handshake
let mut node1 = Chitchat::with_chitchat_id_and_seeds(config1, seed_addrs1, vec![], MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(config2, seed_addrs2, vec![], MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let syn = node1.create_syn_message();
if let Some(syn_ack) = node2.process_message(syn) {
if let Some(ack) = node1.process_message(syn_ack) {
node2.process_message(ack);
}
}
This sequence demonstrates the typical three-way handshake for cluster state exchange.
Related Topics
Failure Detector Configuration — For tuning liveness detection sensitivity.
Cluster State Management — For understanding
ClusterStateandNodeStatedata structures.Gossip Protocol Messages — For message types and serialization formats.
Event Listener System — For subscribing to key-value change events.
Transport Layer Abstraction — For network message delivery mechanisms.