state.rs
Overview
This file implements the core data structures and logic for managing the state of nodes and clusters in a distributed gossip protocol system. It maintains versioned key-value data per node, tracks heartbeats, and applies state deltas to reconcile differences between nodes, supporting efficient incremental synchronization. The module also supports garbage collection of deleted entries (tombstones) and prioritizes stale nodes for gossip updates based on a staleness metric.
Key responsibilities include:
Representing the state of individual nodes, including their key-value entries and metadata.
Managing a cluster-wide view of multiple node states.
Applying deltas (incremental updates) to node states.
Computing digests and partial deltas for efficient synchronization.
Garbage collecting keys marked for deletion after a grace period.
Prioritizing nodes for gossip based on the freshness of their data.
Structs and Their Functionality
NodeState
Represents the state of a single node in the cluster. Each NodeState contains:
chitchat_id: Unique identifier of the node.heartbeat: Heartbeat counter indicating node liveliness.key_values: Sorted map of keys toVersionedValues, storing the node's key-value data.listeners: Event listeners for key changes (not serialized).max_version: Maximum version number of the last known update.last_gc_version: Version number of the last tombstone garbage collection.
Important Notes on last_gc_version and max_version
max_versionreflects the latest version of the snapshot including non-deleted key-values.last_gc_versionmarks the oldest version from which deltas can be safely emitted, reflecting tombstone garbage collection progress.After a reset or node join, it is possible for
last_gc_versionto be higher thanmax_version.
Methods
new(chitchat_id, listeners) -> NodeStateCreates a new
NodeStatewith default values.for_test() -> NodeStateReturns a test instance with a fixed
chitchat_id.Accessors:
chitchat_id() -> &ChitchatIdheartbeat() -> Heartbeatmax_version() -> Versionlast_gc_version() -> Version
Mutators:
set_max_version(version: Version)set_last_gc_version(version: Version)
Key-value access:
key_values_including_deleted() -> Iterator<Item = (&str, &VersionedValue)>Returns an iterator over all key-values including those marked for deletion.
key_values() -> Iterator<Item = (&str, &str)>Returns an iterator over non-deleted key-values.
iter_prefix(prefix: &str) -> Iterator<Item = (&str, &VersionedValue)>Returns key-values with keys starting with the given prefix, excluding deleted keys.
num_key_values() -> usizeReturns count of non-deleted key-values.
contains_key(key: &str) -> boolChecks if a non-deleted key exists.
get(key: &str) -> Option<&str>Returns the value of a key if it exists and is not deleted.
get_versioned(key: &str) -> Option<&VersionedValue>Returns the full versioned value including tombstones.
Key-value modifications:
set(key: impl ToString, value: impl ToString)Sets or updates a key-value pair. Increments
max_versionif the value changes.set_with_ttl(key: impl ToString, value: impl ToString)Sets a key-value pair with a time-to-live, marking it for deletion after TTL.
delete(key: &str)Marks a key as deleted immediately, creating a tombstone.
delete_after_ttl(key: &str)Schedules key deletion after a grace period (TTL).
set_versioned_value(key: String, versioned_value_update: VersionedValue)Sets a versioned value; updates
max_versionif necessary. Ignores obsolete versions.set_with_version(key, value, version)Sets a key-value pair with an explicit version.
remove_key_value_internal(key: &str)Removes a key-value pair without tombstoning (internal use).
Heartbeat management:
inc_heartbeat()Increments the node's heartbeat.
try_set_heartbeat(new_heartbeat: Heartbeat) -> boolAttempts to update heartbeat if the new value is greater.
Delta application:
prepare_apply_delta(node_delta: &NodeDelta) -> boolChecks if the delta can be applied without reset; resets state if necessary.
apply_delta(node_delta: NodeDelta, now: Instant)Applies a delta to the node state, updating key-values and versions.
Garbage collection:
gc_keys_marked_for_deletion(grace_period: Duration)Removes keys whose tombstones have exceeded the grace period.
Utility:
digest() -> NodeDigestCreates a digest summarizing heartbeat, max_version, and last_gc_version.
Usage Example
let mut node_state = NodeState::new(chitchat_id, listeners);
node_state.set("key1", "value1");
let value = node_state.get("key1");
node_state.delete("key1");
node_state.gc_keys_marked_for_deletion(Duration::from_secs(60));
ClusterState
Represents the cluster-wide state, managing multiple NodeStates.
Fields:
node_states: BTreeMap<ChitchatId, NodeState>: Map of node IDs to their states.seed_addrs: watch::Receiver<HashSet<SocketAddr>>: Receiver for seed addresses in the cluster.listeners: Listeners: Event listeners for cluster-wide events.
Methods
with_seed_addrs(seed_addrs: watch::Receiver<HashSet<SocketAddr>>) -> ClusterStateCreates a new cluster state with initial seed addresses.
node_state_mut(chitchat_id: &ChitchatId) -> &mut NodeStateReturns a mutable reference to the node state, creating it if missing.
node_state(chitchat_id: &ChitchatId) -> Option<&NodeState>Returns an immutable reference to the node state if present.
nodes() -> Iterator<Item = &ChitchatId>Iterator over all node IDs.
seed_addrs() -> HashSet<SocketAddr>Returns the current set of seed addresses.
remove_node(chitchat_id: &ChitchatId)Removes a node from the cluster state.
apply_delta(delta: Delta)Applies a cluster-wide delta composed of multiple node deltas.
compute_digest(scheduled_for_deletion: &HashSet<&ChitchatId>) -> DigestComputes a digest of all nodes except those scheduled for deletion.
gc_keys_marked_for_deletion(grace_period: Duration)Runs garbage collection on all nodes' tombstones.
compute_partial_delta_respecting_mtu(digest: &Digest, mtu: usize, scheduled_for_deletion: &HashSet<&ChitchatId>) -> DeltaComputes a partial delta for synchronization, respecting a maximum transmission unit (MTU) and prioritizing nodes by staleness.
Usage Example
let cluster_state = ClusterState::with_seed_addrs(seed_addrs_rx);
let node_state = cluster_state.node_state_mut(&some_chitchat_id);
node_state.set("foo", "bar");
let digest = cluster_state.compute_digest(&HashSet::new());
let delta = cluster_state.compute_partial_delta_respecting_mtu(&digest, 1024, &HashSet::new());
cluster_state.apply_delta(delta);
Staleness
A struct representing the staleness score of a node, used to prioritize gossip updates. Fields:
is_unknown: bool: Whether the node is unknown (floor_version == 0).max_version: u64: Maximum version of the node state.num_stale_key_values: usize: Number of key-values newer than the floor version.
Ordering for prioritization:
Unknown nodes are prioritized first, with lower
max_versionfirst.Known nodes are prioritized by the number of stale key-values, descending.
SortedStaleNodes<'a>
Maintains a sorted collection of stale nodes for prioritization.
stale_nodes: BTreeMap<Staleness, Vec<StaleNode<'a>>>
Methods:
offer(chitchat_id, node_state, from_version_excluded)Adds a stale node if it has fresher key-values than the floor version.
into_iter() -> impl Iterator<Item=StaleNode<'a>>Returns an iterator over nodes sorted by staleness, shuffling nodes with equal staleness for fairness.
StaleNode<'a>
Represents a node with stale information compared to a digest.
Fields:
chitchat_id: &ChitchatIdnode_state: &NodeStatefrom_version_excluded: u64
Methods:
stale_key_values() -> Iterator<Item=(&str, &VersionedValue)>Returns an iterator over key-values with versions strictly greater than
from_version_excluded.
ClusterStateSnapshot
Serializable snapshot of the cluster state used for persistence or transmission.
Fields:
node_states: Vec<NodeState>seed_addrs: HashSet<SocketAddr>
Conversion:
From<&ClusterState>: Converts a live cluster state into a snapshot.
Important Functionality and Algorithms
Delta Application and Reset Logic (NodeState::prepare_apply_delta and apply_delta)
Checks if an incoming delta is applicable based on
from_version_excludedandlast_gc_version.Determines if a reset of the node state is necessary due to garbage collection or version gaps.
If reset is required, the node state is cleared and re-initialized with the delta's
chitchat_idand versions.Applies each key-value mutation from the delta, updating or skipping based on version comparisons and deletion status.
Garbage Collection of Tombstones (gc_keys_marked_for_deletion)
Iterates over key-values checking if they are marked for deletion.
Compares the tombstone's deletion start time with the grace period.
Removes entries whose tombstones have expired.
Updates
last_gc_versionto reflect the highest version of deleted entries.
Partial Delta Computation with MTU Consideration (ClusterState::compute_partial_delta_respecting_mtu)
Iterates over all nodes excluding those scheduled for deletion.
For each node, determines if a reset is needed based on garbage collection versions.
Uses
SortedStaleNodesto prioritize nodes with the most stale key-values.Adds nodes and key-values incrementally to a
DeltaSerializer, stopping when MTU is reached.Ensures that even nodes with no key-values to send contribute a max version update to maintain synchronization.
Staleness Score Calculation (staleness_score)
Calculates the number of key-values newer than a given floor version.
Prioritizes unknown nodes (floor version == 0) by count of key-values.
Returns
Noneif no stale key-values exist, excluding the node from gossip.
Interaction with Other Modules
Uses types from
crate::deltasuch asDelta,NodeDelta,DeltaSerializer.Uses
DigestandNodeDigestfromcrate::digestfor summarization.Employs
Listenersfromcrate::listenerto trigger events on key changes.Integrates
DeletionStatusandDeletionStatusMutationfromcrate::typesto manage tombstones.Uses
ChitchatId,Heartbeat,Version, andVersionedValuetypes defined elsewhere in the crate.Relies on
tokio::sync::watchfor receiving cluster seed addresses.Uses randomization from
randto shuffle nodes with equal staleness.Makes use of the
tracingcrate for debug and warning logs.
Visual Diagram: Class Structure and Relationships
classDiagram
class NodeState {
-chitchat_id: ChitchatId
-heartbeat: Heartbeat
-key_values: BTreeMap<String, VersionedValue>
-listeners: Listeners
-max_version: Version
-last_gc_version: Version
+new()
+set()
+set_with_ttl()
+delete()
+delete_after_ttl()
+apply_delta()
+prepare_apply_delta()
+gc_keys_marked_for_deletion()
+get()
+get_versioned()
+contains_key()
+iter_prefix()
}
class ClusterState {
-node_states: BTreeMap<ChitchatId, NodeState>
-seed_addrs: watch::Receiver<HashSet<SocketAddr>>
-listeners: Listeners
+with_seed_addrs()
+node_state_mut()
+node_state()
+apply_delta()
+compute_digest()
+compute_partial_delta_respecting_mtu()
+gc_keys_marked_for_deletion()
}
class Staleness {
-is_unknown: bool
-max_version: u64
-num_stale_key_values: usize
+cmp()
}
class SortedStaleNodes {
-stale_nodes: BTreeMap<Staleness, Vec<StaleNode>>
+offer()
+into_iter()
}
class StaleNode {
-chitchat_id: &ChitchatId
-node_state: &NodeState
-from_version_excluded: u64
+stale_key_values()
}
class ClusterStateSnapshot {
-node_states: Vec<NodeState>
-seed_addrs: HashSet<SocketAddr>
}
ClusterState "1" o-- "many" NodeState : manages
SortedStaleNodes "1" o-- "many" StaleNode : contains
StaleNode "1" --> NodeState : references
NodeState ..> Listeners : uses
ClusterState ..> Listeners : uses
ClusterStateSnapshot ..> ClusterState : created from
Notes on Key Implementation Details
The versioning system uses monotonically increasing
Versionnumbers to track changes.Tombstones are key-value pairs marked with a deletion status and a timestamp, allowing delayed physical deletion.
Deltas carry a
from_version_excludedversion to indicate the minimal version known by the target node.Reset logic ensures that nodes can recover from missed tombstones due to garbage collection.
The gossip protocol uses staleness ordering to prioritize nodes with the most updates to propagate.
The use of
BTreeMapforkey_valuesallows efficient prefix queries (iter_prefix).Random shuffling of nodes with equal staleness prevents starvation and promotes fairness.
The cluster state maintains a watch channel for seed addresses, facilitating dynamic cluster membership.
Additional Details
The file contains extensive test coverage verifying behavior such as delta application, garbage collection, staleness ordering, and serialization.
Uses conditional compilation to provide deterministic random generators during testing.
Extensive logging via
tracingis included for debugging and monitoring during delta application and state resets.