chain_pulse_monitor.rs
Overview
This file implements the ChainPulseMonitor, a module responsible for monitoring the activity and health of threads involved in block processing within a blockchain system. It handles timeouts and deadlines for thread operations, detects stalled block producer threads, and interacts with the system authority to respond appropriately to these conditions. The monitor listens for chain pulse events such as block finalizations or thread startups and maintains internal deadlines per thread to track progress.
The core functionality revolves around managing deadlines for thread operations, identifying stalled threads, and triggering authority actions to maintain consensus and block production liveness.
Structs and Types
Deadline
#[derive(TypedBuilder, Clone, Getters)]
pub struct Deadline {
block_height: Option<BlockHeight>,
timestamp: Instant,
}
Purpose: Represents a deadline associated with a specific thread operation, optionally linked to a particular blockchain height.
Fields:
block_height: Option<BlockHeight>: The block height relevant to the deadline, if any.timestamp: Instant: The exact time by which the deadline expires.
Derives:
TypedBuilderfor ergonomic construction.Clonefor duplication.Gettersto access private fields.
Usage Example:
let deadline = Deadline::builder() .block_height(Some(100)) .timestamp(Instant::now() + Duration::from_secs(5)) .build();
ChainPulseMonitor
pub struct ChainPulseMonitor {
_handler: JoinHandle<()>,
monitor: Sender<ChainPulseEvent>,
stalled_threads: Arc<Mutex<HashSet<ThreadIdentifier>>>,
}
Purpose: Manages the lifecycle and monitoring of blockchain thread activity by receiving chain pulse events and tracking stalled threads.
Fields:
_handler: JoinHandle<()>: Handle to the background thread running the monitoring loop.monitor: Sender<ChainPulseEvent>: Channel sender for submitting chain pulse events to the monitor.stalled_threads: Arc<Mutex<HashSet<ThreadIdentifier>>>: Shared, thread-safe collection of currently stalled thread identifiers.
Methods:
monitor(&mut self) -> Sender<ChainPulseEvent>: Returns a clone of the event sender to submit new chain pulse events.stalled_threads(&self) -> Arc<Mutex<HashSet<ThreadIdentifier>>>: Returns a shared reference to the set of stalled threads.
Usage Example:
let mut monitor = ChainPulseMonitor::bind(authority_arc); let tx = monitor.monitor(); tx.send(ChainPulseEvent::BlockFinalized(block_event)).unwrap(); let stalled = monitor.stalled_threads();
Functions
move_deadline
fn move_deadline(
deadlines: &mut HashMap<ThreadIdentifier, Deadline>,
thread_identifier: &ThreadIdentifier,
next_deadline: Deadline,
)
Purpose: Updates or inserts a deadline entry for a given thread identifier in the
deadlineshashmap.Parameters:
deadlines: Mutable reference to a hashmap mapping thread IDs to their deadlines.thread_identifier: Reference to the thread identifier for which to set the deadline.next_deadline: The newDeadlineto assign or compare against the existing one.
Behavior: Updates the existing deadline only if the new deadline has a different block height (and is defined) or has a later timestamp than the current one.
Usage: Used internally to maintain up-to-date deadlines for each thread.
Main Implementation Detail: bind
pub fn bind(authority: Arc<Mutex<Authority>>) -> ChainPulseMonitor
Purpose: Creates and starts the monitoring thread that listens for chain pulse events, tracks deadlines, detects stalled threads, and interacts with the
Authority.Parameters:
authority: Shared, thread-safe reference to theAuthoritycomponent, which manages consensus and thread control.
Returns: A
ChainPulseMonitorinstance with an active background thread and communication channel.Functionality:
Creates an MPSC channel (
tx,rx) for receivingChainPulseEvents.Maintains a
HashMapof deadlines for each thread.Maintains a
HashMapof block candidates per thread.Runs an infinite loop in a dedicated thread to:
Check for shutdown flag and exit if set.
Calculate the next deadline among all tracked threads.
Wait for the next event or timeout according to deadlines.
On timeout, identify stalled threads by comparing current time with deadlines.
Update the
stalled_threadsset.Invoke methods on
Authorityfor stalled threads to handle recovery or next round initiation.On receiving chain pulse events (
BlockFinalized,BlockPrefinalized,StartThread,BlockApplied), update deadlines and remove threads from stalled sets.
Interaction:
Uses
Authorityto trigger consensus recovery mechanisms on stall detection.Uses
SHUTDOWN_FLAGto gracefully terminate monitoring.
Important Timings:
Deadlines are typically set as
now + 660ms(2 * 330ms), but this is marked TODO for configuration.
Event Handling:
ChainPulseEvent::BlockFinalizedand similar events update deadlines and clear stalled status.ChainPulseEvent::StartThreadinitializes block candidates and deadlines.On stall detection,
Authority::on_block_producer_stalled()is called to handle recovery.
Event Types Interaction
The monitor reacts to variants of ChainPulseEvent (imported from chain_pulse::events), such as:
BlockFinalizedBlockPrefinalizedStartThreadBlockApplied
These events notify about block processing stages or thread lifecycle changes and influence the deadlines and stalled threads tracking.
Thread Safety and Synchronization
Uses
Arc<Mutex<...>>for sharing mutable state (stalled_threadsandAuthority) safely across threads.Uses
parking_lot::Mutexfor efficient locking.Employs a channel (
std::sync::mpsc::channel) for asynchronous event communication.Implements
AllowGuardedMuttrait to enable guarded mutation on collections.
Interaction with Other System Components
Authority: The monitor delegates the response to stalled threads to the
Authoritycomponent, which presumably manages consensus authority and thread control logic.ChainPulseEvent: Receives events from other parts of the system reporting block processing progress.
SHUTDOWN_FLAG: A global shutdown indicator used to exit the monitoring loop cleanly.
Block Processing: Works closely with block producer threads by tracking their deadlines and stalled states.
Visual Diagram: ChainPulseMonitor Structure and Workflow
flowchart TD
A[ChainPulseMonitor]
A -->|spawns| B[Monitor Thread]
B -->|receives| C["ChainPulseEvent Channel (rx)"]
B -->|updates| D[Deadlines HashMap]
B -->|updates| E[BlockCandidates HashMap]
B -->|updates| F[StalledThreads Set]
F -->|shared| G[Arc<Mutex<HashSet<ThreadIdentifier>>]
B -->|invokes| H[Authority GuardedMut]
C -->|receives events| I[BlockFinalized]
C -->|receives events| J[BlockPrefinalized]
C -->|receives events| K[StartThread]
C -->|receives events| L[BlockApplied]
H -->|handles| M[BlockProducerStalled]
B -->|checks| N[SHUTDOWN_FLAG]
Explanation:
The
ChainPulseMonitorstarts a background thread that listens to chain pulse events.Events update deadlines and block candidate collections.
The monitor tracks stalled threads and informs the
Authorityif needed.The shared
stalled_threadsset can be accessed externally for status.The process terminates when the global shutdown flag is set.
References to Related Topics
For details on the
Authorityrole and methods, see action_lock::Authority.For the types of chain pulse events handled, see chain_pulse::events::ChainPulseEvent.
For understanding thread synchronization and guarded mutability, see utilities::guarded.
For block height and thread identifier types, see types::BlockHeight and types::ThreadIdentifier.
For thread spawning utilities and critical thread handling, see utilities::thread_spawn_critical::SpawnCritical.