server.rs
Overview
This file implements the UDP-based server component for the Chitchat gossip protocol, facilitating peer-to-peer communication within a distributed system cluster. It provides the core logic to send and receive gossip messages, manage node liveness, and perform periodic gossip exchanges to synchronize cluster state. The server also handles DNS resolution for seed nodes with dynamic hostnames, ensuring cluster resilience and partition tolerance.
Key functionalities include:
Running an asynchronous UDP server to receive and process Chitchat messages.
Periodic gossip with a subset of live, dead, and seed nodes.
Managing the lifecycle of the server including shutdown and explicit gossip triggering.
Resolving DNS for seed nodes that require hostname resolution.
Providing a handle (
ChitchatHandle) to control and interact with the running server.
Public Types and Functions
ChitchatHandle
A handle representing the running Chitchat server instance. It encapsulates control channels and the running task handle.
Fields
chitchat_id: ChitchatId— Identifier of the local node.command_tx: UnboundedSender<Command>— Channel to send commands like gossip or shutdown.chitchat: ChitchatRef— Thread-safe reference to the internalChitchatstate.join_handle: JoinHandle<Result<(), anyhow::Error>>— Tokio task handle for the server process.
Methods
abort(&self)Aborts the server task immediately.
chitchat_id(&self) -> &ChitchatIdReturns a reference to the local node's
ChitchatId.chitchat(&self) -> ChitchatRefReturns a clone of the internal
ChitchatReffor direct access.with_chitchat<F, T>(&self, fun: F) -> T where F: FnMut(&mut Chitchat) -> TAllows executing a closure with mutable access to the underlying
Chitchatinstance asynchronously.shutdown(self) -> Result<(), anyhow::Error>Triggers a graceful shutdown by sending a shutdown command and awaiting the server task completion.
gossip(&self, addr: SocketAddr) -> Result<(), anyhow::Error>Manually initiates a gossip exchange with a specified remote peer.
Usage Example
let handle: ChitchatHandle = spawn_chitchat(config, initial_kvs, &transport).await?;
handle.gossip(peer_addr)?;
handle.shutdown().await?;
spawn_chitchat
pub async fn spawn_chitchat(
config: ChitchatConfig,
initial_key_values: Vec<(String, String)>,
transport: &dyn Transport,
) -> anyhow::Result<ChitchatHandle>
Starts a new Chitchat UDP server as a Tokio background task.
Parameters:
config: ChitchatConfig— Configuration including listen address, cluster ID, seed nodes, etc.initial_key_values: Vec<(String, String)>— Initial key-value pairs to seed the gossip state.transport: &dyn Transport— Abstraction over the UDP transport layer.
Returns:
Result<ChitchatHandle, anyhow::Error>— Handle to control the spawned server or error on failure.Details:
Opens a UDP socket on the configured listen address.
Spawns a task running the
Serverstruct's main loop.Initiates the DNS refresh loop for seed nodes requiring hostname resolution.
ChitchatRef
A concurrency-safe wrapper around Chitchat that provides mutex-guarded access.
Wraps
Chitchatinside anArc<parking_lot::Mutex<Chitchat>>.Methods:
new(chitchat: Chitchat) -> Selflock(&self) -> parking_lot::MutexGuard<'_, Chitchat>
Internal Types and Functions
Server
The core UDP server struct that handles message reception, processing, and gossiping.
Fields
command_rx: UnboundedReceiver<Command>— Receives commands such as gossip triggers or shutdown.chitchat: ChitchatRef— Shared state of the local node.socket: Box<dyn Socket>— UDP socket abstraction.rng: SmallRng— Random number generator used for gossip peer selection.
Methods
async fn new(command_rx: UnboundedReceiver<Command>, chitchat: ChitchatRef, socket: Box<dyn Socket>) -> SelfCreates a new server instance with initialized RNG.
async fn run(&mut self) -> anyhow::Result<()>Main event loop that concurrently:
Receives and processes UDP messages.
Periodically triggers gossip with peers.
Responds to commands such as gossip requests or shutdown.
async fn handle_message(&mut self, from_addr: SocketAddr, message: ChitchatMessage) -> anyhow::Result<()>Processes an incoming message and sends a reply if applicable.
async fn gossip_multiple(&mut self)Selects multiple nodes (live, dead, seed) and gossips with them.
async fn gossip(&mut self, addr: SocketAddr) -> anyhow::Result<()>Sends a SYN message to a specified peer to initiate gossip.
Command Enum
enum Command {
Gossip(SocketAddr),
Shutdown,
}
Commands sent to the server task for control operations:
Gossip(SocketAddr)— Request to perform gossip with the specified address.Shutdown— Request to stop the server gracefully.
DNS Seed Resolution
spawn_dns_refresh_loop(seeds: &[String]) -> watch::Receiver<HashSet<SocketAddr>>Spawns a background task that periodically resolves DNS for seed nodes that are hostnames instead of IP addresses.
dns_refresh_loop(...)The actual loop that performs hostname resolution every 60 seconds and updates listeners via a watch channel.
resolve_seed_host(seed_host: &str, seed_addrs: &mut HashSet<SocketAddr>)Performs asynchronous DNS lookup for a seed hostname and updates the set of seed addresses.
Gossip Peer Selection
select_nodes_for_gossip<R>(...) -> (Vec<SocketAddr>, Option<SocketAddr>, Option<SocketAddr>)Selects a set of nodes for gossiping:
Picks
GOSSIP_COUNTlive nodes (or peers if no live nodes).Probabilistically includes one random dead node.
Probabilistically includes one random seed node.
select_dead_node_to_gossip_with<R>(...) -> Option<SocketAddr>Chooses a dead node with a probability proportional to the number of dead nodes.
select_seed_node_to_gossip_with<R>(...) -> Option<SocketAddr>Chooses a seed node with probability related to seed node count and live/dead node counts.
Implementation Details and Algorithms
Periodic Gossiping: The server uses a Tokio interval timer based on
gossip_intervalfrom the config to trigger gossip rounds. Each round selects a mix of live, dead, and seed nodes to gossip with, balancing network traffic and cluster state convergence.DNS Refreshing for Seed Nodes: Seed nodes specified as hostnames are re-resolved every 60 seconds to handle dynamic IP changes (e.g., in Kubernetes environments), preventing cluster split scenarios.
Concurrency Control: Shared
Chitchatstate is protected by a mutex (parking_lot::Mutex) wrapped in anArc. Server operations lock this mutex during message processing and gossip peer selection.Message Processing Timeout: Incoming message handling is constrained by a 2-second timeout to prevent blocking the server event loop.
Randomized Peer Selection: Uses
rand::preludeRNG for probabilistic selection of gossip peers, ensuring randomized and fair gossip propagation.Commands Handling: Commands are received over an unbounded Tokio MPSC channel, allowing external triggers to initiate gossip or shutdown.
Interaction with Other Components
Chitchat: The server holds and manipulates the
Chitchatinstance that contains the cluster state, node information, and message processing logic.Transport: Abstracted via the
Transporttrait andSockettrait, allowing the server to operate over different UDP socket implementations.Message: Uses
ChitchatMessageenum to encode/decode gossip messages exchanged between nodes.Tracing: Uses the
tracingcrate to log debug, info, warning, and error messages for observability.Tokio Runtime: Relies on Tokio's async runtime for concurrency, timing, and task spawning.
Visual Diagram
classDiagram
class ChitchatHandle {
+chitchat_id: ChitchatId
+command_tx: UnboundedSender<Command>
+chitchat: ChitchatRef
+join_handle: JoinHandle<Result<(), anyhow::Error>>
+abort()
+chitchat_id()
+chitchat()
+with_chitchat()
+shutdown()
+gossip()
}
class Server {
-command_rx: UnboundedReceiver<Command>
-chitchat: ChitchatRef
-socket: Box<dyn Socket>
-rng: SmallRng
+new()
+run()
+handle_message()
+gossip_multiple()
+gossip()
}
class ChitchatRef {
-inner: Arc<Mutex<Chitchat>>
+new()
+lock()
}
class Command {
<<enum>>
Gossip
Shutdown
}
ChitchatHandle "1" o-- "1" ChitchatRef
Server "1" o-- "1" ChitchatRef
Server "1" ..> Command : receives
ChitchatHandle "1" --> Command : sends
Testing
The module includes extensive tests validating:
Gossip message exchange (Syn,
SynAck,BadCluster).Seed node resolution and gossip initiation.
Heartbeat propagation and cluster membership changes.
Gossip peer selection logic with mocked RNG.
Tests use in-memory channel transport and simulated cluster configurations to verify behavior without real network dependencies.
Constants
GOSSIP_COUNT: usize = 3— Number of peers selected per gossip round.DNS_POLLING_DURATION: Duration = 60 seconds— Interval for DNS refresh for seed hostnames.
Related Topics
Chitchat Protocol Overview — Details the gossip protocol and message types.
Transport Abstraction — Interfaces for UDP socket operations.
Cluster Membership Management — Managing node states and liveness in the cluster.