lib.rs

Overview

This file implements a core component of a distributed gossip protocol system designed for cluster membership and state synchronization. The main abstraction is the Chitchat struct, which maintains cluster state, detects node failures, processes gossip messages, and manages node liveness. It provides mechanisms for nodes to exchange state digests and deltas, detect membership changes, and notify listeners about key-value updates in the cluster.

The file also defines constants, utility types, and test cases validating the system’s behavior under various cluster conditions such as node joins, leaves, network partitions, and key-value synchronization.


Constants

MAX_UDP_DATAGRAM_PAYLOAD_SIZE: usize


Structs and Types

Chitchat

Central struct representing a node's view and participation in the cluster gossip protocol.

Fields

Implementation Details

Key Methods

with_chitchat_id_and_seeds
pub fn with_chitchat_id_and_seeds(
    config: ChitchatConfig,
    seed_addrs: watch::Receiver<HashSet<SocketAddr>>,
    initial_key_values: Vec<(String, String)>,
    max_datagram_payload_size: usize,
) -> Self
let chitchat = Chitchat::with_chitchat_id_and_seeds(
    config,
    seed_addrs_rx,
    vec![("config_version".to_string(), "1".to_string())],
    MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
);
create_syn_message
pub(crate) fn create_syn_message(&self) -> ChitchatMessage
process_message
pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option<ChitchatMessage>
update_nodes_liveness
pub(crate) fn update_nodes_liveness(&mut self)
reset_node_state
pub fn reset_node_state(
    &mut self,
    chitchat_id: &ChitchatId,
    key_values: impl Iterator<Item = (String, VersionedValue)>,
    max_version: Version,
    last_gc_version: Version,
)
chitchat.reset_node_state(
    &node_id,
    vec![
        ("key1".to_string(), VersionedValue::new("value1".to_string(), 2, false)),
    ].into_iter(),
    2,
    100,
);
subscribe_event
pub fn subscribe_event(
    &self,
    key_prefix: impl ToString,
    callback: impl Fn(KeyChangeEvent) + 'static + Send + Sync,
) -> ListenerHandle
live_nodes_watch_stream
pub fn live_nodes_watch_stream(&self) -> WatchStream<BTreeMap<ChitchatId, NodeState>>

Other Methods


KeyChangeEvent<'a>

Represents an event triggered when a key matching a subscribed prefix is inserted or updated.

Fields

Methods


Important Implementation Details


Interactions with Other Modules


Visual Diagram

classDiagram
class Chitchat {
-config: ChitchatConfig
-cluster_state: ClusterState
-failure_detector: FailureDetector
-previous_live_nodes: HashMap<ChitchatId, Version>
-live_nodes_watcher_tx: watch::Sender<BTreeMap<ChitchatId, NodeState>>
-live_nodes_watcher_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>
-max_datagram_payload_size: usize
+with_chitchat_id_and_seeds()
+create_syn_message()
+process_message()
+update_nodes_liveness()
+reset_node_state()
+subscribe_event()
+live_nodes_watch_stream()
+gc_keys_marked_for_deletion()
+update_self_heartbeat()
+node_states()
+node_state()
+self_node_state()
+live_nodes()
+dead_nodes()
+scheduled_for_deletion_nodes()
+seed_nodes()
+cluster_id()
+self_chitchat_id()
+state_snapshot()
+compute_digest()
}
class KeyChangeEvent {
+key: &str
+value: &str
+node: &ChitchatId
+strip_key_prefix()
}
Chitchat "1" o-- "1" ClusterState
Chitchat "1" o-- "1" FailureDetector
Chitchat "1" o-- "*" ListenerHandle
Chitchat "1" o-- "1" ChitchatConfig
ChitchatMessage <|-- Chitchat
KeyChangeEvent ..> ChitchatId

Tests


Usage Example of Gossip Handshake

let mut node1 = Chitchat::with_chitchat_id_and_seeds(config1, seed_addrs1, vec![], MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let mut node2 = Chitchat::with_chitchat_id_and_seeds(config2, seed_addrs2, vec![], MAX_UDP_DATAGRAM_PAYLOAD_SIZE);

let syn = node1.create_syn_message();
if let Some(syn_ack) = node2.process_message(syn) {
    if let Some(ack) = node1.process_message(syn_ack) {
        node2.process_message(ack);
    }
}

This sequence demonstrates the typical three-way handshake for cluster state exchange.


Related Topics