mod.rs
Overview
This file implements the AuthoritySwitchService, a core service responsible for handling the AuthoritySwitchProtocol network messages within the consensus or authority switch mechanism. It processes different variants of the AuthoritySwitch enum, including requests for switching authority, switches success notifications, rejections, and failure cases. The service interacts with various components such as the network layer, block state repository, unprocessed blocks cache, and chain pulse monitor to coordinate authority transitions in a thread-specific manner.
The service maintains synchronization state, verifies signatures on blocks and attestations, handles broadcasting and direct messaging, and manages the lifecycle of authority switching rounds. It is designed to run in a loop, continuously receiving and processing incoming network messages related to authority switching.
Structures
AuthoritySwitchService
A service struct responsible for processing authority switch protocol messages.
Fields
self_addr: SocketAddr
The socket address of the local node.rx: XInstrumentedReceiver<(NetworkMessage, SocketAddr)>
Receiver channel for incoming network messages paired with their sender addresses.self_node_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>
Sender channel for network messages originating from this node.network_direct_tx: NetDirectSender<NodeIdentifier, NetworkMessage>
Channel for sending direct network messages to other nodes.thread_id: ThreadIdentifier
Identifier for the thread this service is associated with.unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection
Cache storing unprocessed candidate blocks.thread_authority: Arc<Mutex<ThreadAuthority>>
Shared, mutex-protected state of the thread's authority logic.network_broadcast_tx: NetBroadcastSender<NetworkMessage>
Channel for broadcasting network messages to all nodes.block_state_repository: BlockStateRepository
Repository managing the state of blocks.chain_pulse_monitor: Sender<ChainPulseEvent>
Sender channel for chain pulse events, such as block prefinalization notifications.sync_timeout_duration: Duration
Timeout duration used to trigger synchronization if needed.
Methods
fn run(&mut self) -> anyhow::Result<()>
Starts the main processing loop of the AuthoritySwitchService. It continuously listens for incoming messages on rx and handles them according to their variant.
Behavior
Receives
(NetworkMessage, SocketAddr)tuples.Handles only NetworkMessage::AuthoritySwitchProtocol messages with variants:
AuthoritySwitch::RequestAuthoritySwitch::SwitchedAuthoritySwitch::RejectAuthoritySwitch::RejectTooOldAuthoritySwitch::Failed
Ignores or panics on unexpected message types.
For requests, it processes incoming authority switch requests, potentially triggering block production or broadcasting switch success.
For switched messages, it verifies signatures, checks consistency, and calls
on_authority_switch_success.For reject messages, it verifies the prefinalized blocks and updates block states accordingly.
For reject-too-old messages, triggers synchronization if the last sync was beyond the timeout.
For failed messages, resends candidate blocks and attestations.
Returns an error if the receiver disconnects.
Usage Example
let mut service = AuthoritySwitchService::builder()
// initialize builder fields
.build();
service.run()?;
fn on_authority_switch_success(&mut self, next_round_success: Envelope<GoshBLS, NextRoundSuccess>) -> anyhow::Result<()>
Processes a successful authority switch notification.
Parameters
next_round_success: Envelope containing theNextRoundSuccessdata with signatures and proposed block.
Processing Steps
Sends the proposed block as a
NetworkMessage::Candidatetoself_node_tx.If attestations are aggregated:
Checks if attestation counts meet required thresholds.
Verifies signatures against the block key set.
Marks the block as prefinalized in the repository.
Sends a
ChainPulseEvent::block_prefinalizedevent.
If attestation target is missing, adds detached attestations to the block state.
Sends block attestation messages for the aggregated attestations.
Updates
thread_authoritystate with the new successful round.Checks for block round mismatches to detect stalled producers and triggers appropriate recovery.
Return
Ok(())on success or an error if any step fails.
Usage Example
let success_msg: Envelope<GoshBLS, NextRoundSuccess> = ...;
service.on_authority_switch_success(success_msg)?;
Important Implementation Details
Signature Verification:
The service verifies signatures on proposed blocks and attestations using the block key set retrieved from the parent block's state. This is critical to ensure message authenticity and prevent malicious activity.Thread Safety:
Thethread_authoritystate is wrapped in anArc<Mutex<_>>, ensuring safe concurrent access when handling incoming requests and state mutations.Network Messaging:
The service uses both direct and broadcast network senders to communicate with other nodes. It sends wrapped messages with labels for telemetry and tracing.Error Handling and Panics:
The service will panic on critical failures in sending messages unless the shutdown flag is set, indicating intentional shutdown.Synchronization Timeout:
The service keeps track of the last synchronization execution time and triggers a new synchronization event if aRejectTooOldmessage arrives and the timeout has elapsed.Unprocessed Blocks Cache:
Clones of this cache are passed to authority logic for handling incoming requests, ensuring up-to-date block data is available.Chain Pulse Notifications:
Upon prefinalization of blocks, the service emits chain pulse events to notify other components about block state progress.
Interactions with Other Components
Network Layer:
Receives and sendsNetworkMessagevariants, interacting with nodes using both direct and broadcast communication channels.Block State Repository:
Reads and mutates block states, including marking blocks as prefinalized and managing attestations.Thread Authority:
Coordinates the authority logic for block production and reaction to incoming switch requests.Unfinalized Candidate Blocks Collection:
Provides access to unprocessed blocks needed for request evaluation.Chain Pulse Monitor:
Receives events about block finalization stages.Shutdown Flag:
Controls panic behavior on critical network send failures.
Visual Diagram
classDiagram
class AuthoritySwitchService {
+self_addr: SocketAddr
+rx: XInstrumentedReceiver
+self_node_tx: XInstrumentedSender
+network_direct_tx: NetDirectSender
+thread_id: ThreadIdentifier
+unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection
+thread_authority: Arc<Mutex<ThreadAuthority>>
+network_broadcast_tx: NetBroadcastSender
+block_state_repository: BlockStateRepository
+chain_pulse_monitor: Sender<ChainPulseEvent>
+sync_timeout_duration: Duration
+run()
-on_authority_switch_success()
}
AuthoritySwitchService --> "1" XInstrumentedReceiver : rx
AuthoritySwitchService --> "1" XInstrumentedSender : self_node_tx
AuthoritySwitchService --> "1" NetDirectSender : network_direct_tx
AuthoritySwitchService --> "1" NetBroadcastSender : network_broadcast_tx
AuthoritySwitchService --> "1" Arc~Mutex~ThreadAuthority~ : thread_authority
AuthoritySwitchService --> "1" BlockStateRepository : block_state_repository
AuthoritySwitchService --> "1" UnfinalizedCandidateBlockCollection : unprocessed_blocks_cache
AuthoritySwitchService --> "1" Sender~ChainPulseEvent~ : chain_pulse_monitor
Message Handling Workflow
flowchart TD
Start[Start Receiving Messages]
ReceiveMsg[Receive NetworkMessage from rx]
CheckMsgType{Is AuthoritySwitchProtocol?}
HandleRequest[Handle AuthoritySwitch::Request]
HandleSwitched[Handle AuthoritySwitch::Switched]
HandleReject[Handle AuthoritySwitch::Reject]
HandleRejectTooOld[Handle AuthoritySwitch::RejectTooOld]
HandleFailed[Handle AuthoritySwitch::Failed]
UnexpectedMsg[Unexpected Message - Panic]
LoopBack[Back to Receiving Messages]
Error[Error / Disconnected]
Start --> ReceiveMsg --> CheckMsgType
CheckMsgType -- Yes -->|Match variant| HandleRequest
CheckMsgType -- Yes -->|Match variant| HandleSwitched
CheckMsgType -- Yes -->|Match variant| HandleReject
CheckMsgType -- Yes -->|Match variant| HandleRejectTooOld
CheckMsgType -- Yes -->|Match variant| HandleFailed
CheckMsgType -- No --> UnexpectedMsg
HandleRequest --> LoopBack
HandleSwitched --> LoopBack
HandleReject --> LoopBack
HandleRejectTooOld --> LoopBack
HandleFailed --> LoopBack
UnexpectedMsg --> LoopBack
ReceiveMsg -- Err --> Error