failure_detector.rs
Overview
This file implements a phi accrual failure detector mechanism. The failure detector is used to monitor the health of nodes (identified by ChitchatId) in a distributed system by analyzing heartbeat intervals and computing a suspicion level (phi value) that indicates the likelihood of node failure.
The primary goal is to classify nodes as live or dead based on the statistical analysis of heartbeat arrival times, and to support garbage collection of nodes that have been dead for a configurable grace period. The implementation includes smoothing techniques to handle early startup behavior and avoids false positives due to transient network or node delays.
Main Components
FailureDetector
Description
The core struct that maintains heartbeat samples for each node, tracks live and dead nodes, and applies failure detection logic based on phi accrual.
Fields
node_samples: HashMap<ChitchatId, SamplingWindow>
Stores heartbeat intervals (sampling windows) indexed by node identifiers.config: FailureDetectorConfig
Configuration parameters for the failure detector.live_nodes: HashSet<ChitchatId>
Set of nodes currently considered alive.dead_nodes: HashMap<ChitchatId, Instant>
Map of dead nodes to the time they were marked dead.
Methods
new(config: FailureDetectorConfig) -> Self
Creates a new failure detector instance with the specified configuration.get_or_create_sampling_window(&mut self, chitchat_id: &ChitchatId) -> &mut SamplingWindow
Retrieves or initializes aSamplingWindowfor a given node. Used internally to track heartbeat intervals.report_heartbeat(&mut self, chitchat_id: &ChitchatId)
Records a heartbeat arrival for the node, updating its sampling window.update_node_liveness(&mut self, chitchat_id: &ChitchatId)
Updates the liveness state of the node by computing its phi value and comparing it to the configured threshold.If
phi ≤ phi_threshold, the node is marked alive.Otherwise, the node is marked dead, and its sampling window is reset for fresh tracking upon recovery.
garbage_collect(&mut self) -> Vec<ChitchatId>
Removes nodes that have been dead longer than the configured grace period from internal tracking structures and returns the list of such nodes.live_nodes(&self) -> impl Iterator<Item = &ChitchatId>
Returns an iterator over currently live nodes.dead_nodes(&self) -> impl Iterator<Item = &ChitchatId>
Returns an iterator over currently dead nodes.scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId>
Returns nodes that have been dead for at least half of the dead node grace period and are candidates for imminent garbage collection.phi(&mut self, chitchat_id: &ChitchatId) -> Option<f64>
Computes and returns the current phi value for a node if sufficient heartbeat data exists (at least two samples); otherwise returnsNone.
Usage Example
let config = FailureDetectorConfig::default();
let mut detector = FailureDetector::new(config);
let node_id = ChitchatId::for_local_test(42);
detector.report_heartbeat(&node_id);
detector.update_node_liveness(&node_id);
for live_node in detector.live_nodes() {
println!("Live node: {}", live_node.node_id);
}
FailureDetectorConfig
Description
Configuration struct defining parameters controlling the behavior and sensitivity of the failure detector.
Fields
phi_threshold: f64
Threshold above which a node is considered faulty.sampling_window_size: usize
Number of heartbeat intervals to track in the sampling window.max_interval: Duration
Maximum heartbeat interval considered valid; longer intervals are ignored.initial_interval: Duration
Initial guess for heartbeat interval used during startup smoothing.dead_node_grace_period: Duration
Time after which dead nodes are eligible for removal from the cluster.
Methods
new(phi_threshold: f64, sampling_window_size: usize, max_interval: Duration, initial_interval: Duration, dead_node_grace_period: Duration) -> Self
Creates a new configuration instance.default() -> Self
Provides default configuration values:phi_threshold = 8.0sampling_window_size = 1000max_interval = 10 secondsinitial_interval = 5 secondsdead_node_grace_period = 24 hours
SamplingWindow
Description
A fixed-size window that stores recent heartbeat intervals for a single node. It computes the phi value to estimate node failure likelihood. Uses additive smoothing to mitigate fluctuations during startup.
Fields
intervals: BoundedArrayStats
Circular buffer holding recent heartbeat intervals.last_heartbeat: Option<Instant>
Timestamp of the last recorded heartbeat.max_interval: Duration
Maximum interval duration considered valid for inclusion.additive_smoothing: AdditiveSmoothing
Smoothing parameters used to compute a stable mean.
Methods
new(window_size: usize, max_interval: Duration, prior_interval: Duration) -> Self
Creates a new sampling window with given size and smoothing parameters.report_heartbeat(&mut self)
Records a new heartbeat arrival time and updates the interval statistics if the interval is within allowed bounds.reset(&mut self)
Clears all recorded intervals, typically called when a node is marked dead.phi(&self) -> Option<f64>
Computes the phi value using the formula:[
\phi = \frac{\text{elapsed_time_since_last_heartbeat}}{\text{smoothed_mean_interval}}
]Returns
Noneif insufficient data exists (less than 2 heartbeat intervals).
Implementation Details
Additive Smoothing:
Uses a prior mean and weight to smooth the mean interval calculation, improving stability during initial heartbeat samples.Heartbeat Interval Filtering:
Intervals longer thanmax_intervalare ignored to avoid skewing statistics due to network delays or node pauses.
AdditiveSmoothing
Description
Utility struct that applies additive smoothing to interval mean calculations to reduce volatility when sample sizes are small.
Fields
prior_mean: f64
The prior expected mean interval (seconds).prior_weight: f64
The weight of the prior in the smoothing calculation.
Methods
compute_mean(&self, len: NonZeroUsize, sum: f64) -> f64
Computes the smoothed mean interval based on observed samples and prior values.
BoundedArrayStats
Description
A fixed-size circular buffer storing floating-point values representing heartbeat intervals. It tracks the sum of values for efficient mean calculation.
Fields
values: Box<[f64]>
Fixed-size array storing interval values.is_filled: bool
Indicates if the buffer has been filled at least once.index: usize
Current insertion position.sum: f64
Sum of values in the buffer.
Methods
with_capacity(capacity: usize) -> Self
Creates a new bounded array with specified capacity.sum(&self) -> f64
Returns the sum of stored values.append(&mut self, interval: f64)
Adds a new interval value, updating the sum and overwriting oldest values if full.clear(&mut self)
Clears all stored values and resets state.len(&self) -> usize
Returns the number of stored values (capacity if filled, else current index).
Algorithms and Implementation Details
Phi Accrual Failure Detection:
The failure detector calculates a phi value for each node representing the suspicion level of failure, defined as the ratio of time elapsed since last heartbeat to the expected mean heartbeat interval. Nodes with phi exceeding thephi_thresholdare marked dead.Additive Smoothing:
To avoid false positives especially at startup when few heartbeat samples exist, the mean interval calculation incorporates a prior mean and weight. This smooths the mean and stabilizes phi computations.Garbage Collection:
Dead nodes are tracked with timestamps. Once a node has been dead longer than the configured grace period, it is removed from the internal state to free resources.Heartbeat Interval Filtering:
Heartbeat intervals longer thanmax_intervalare discarded to prevent skewing statistics with outliers or network delays.
Interaction with Other Components
Uses
ChitchatIdto uniquely identify nodes in the system.Relies on
tokio::time::Instantfor precise timing and interval measurement.Uses
serde::{Serialize, Deserialize}for configuration serialization support.Employs logging via
tracing::debugto monitor heartbeat reporting and node liveness updates.Integrates with the rest of the system by providing APIs to report heartbeats, update node states, and retrieve lists of live/dead nodes.
Can be integrated with cluster membership or failure management modules to trigger recovery or removal actions.
Visual Diagram
classDiagram
class FailureDetector {
- node_samples: HashMap<ChitchatId, SamplingWindow>
- config: FailureDetectorConfig
- live_nodes: HashSet<ChitchatId>
- dead_nodes: HashMap<ChitchatId, Instant>
+ new()
+ report_heartbeat()
+ update_node_liveness()
+ garbage_collect()
+ live_nodes()
+ dead_nodes()
+ scheduled_for_deletion_nodes()
- phi()
}
class FailureDetectorConfig {
+ phi_threshold: f64
+ sampling_window_size: usize
+ max_interval: Duration
+ initial_interval: Duration
+ dead_node_grace_period: Duration
+ new()
+ default()
}
class SamplingWindow {
- intervals: BoundedArrayStats
- last_heartbeat: Option<Instant>
- max_interval: Duration
- additive_smoothing: AdditiveSmoothing
+ new()
+ report_heartbeat()
+ reset()
+ phi()
}
class AdditiveSmoothing {
- prior_mean: f64
- prior_weight: f64
+ compute_mean()
}
class BoundedArrayStats {
- values: Box<[f64]>
- is_filled: bool
- index: usize
- sum: f64
+ with_capacity()
+ sum()
+ append()
+ clear()
+ len()
}
FailureDetector --> FailureDetectorConfig
FailureDetector --> SamplingWindow
SamplingWindow --> BoundedArrayStats
SamplingWindow --> AdditiveSmoothing