mod.rs
Overview
This file defines the core Node struct and its initialization logic, which serves as a central component for managing blockchain node operations in a multi-threaded, distributed environment. The Node struct encapsulates essential services and state management mechanisms required for block production, validation, synchronization, and network communication within the node's thread context. It integrates with cryptographic components, networking channels, storage, and consensus-related services to facilitate the node's participation in the blockchain network.
Struct: Node<TStateSyncService, TRandomGenerator>
The Node struct is generic over two types:
TStateSyncService: A service that implements theStateSyncServicetrait, responsible for state synchronization.TRandomGenerator: A random number generator implementingrand::Rng, used for randomized processes like block keeper selection and producer election.
Fields
repository (
RepositoryImpl): Interface for blockchain data repository operations.shared_services (
SharedServices): Shared utilities and services used across node components.state_sync_service (
TStateSyncService): Service handling blockchain state synchronization.network_rx (
XInstrumentedReceiver<(NetworkMessage, SocketAddr)>): Receiver channel for incoming network messages paired with their sender addresses.network_broadcast_tx (
NetBroadcastSender<NetworkMessage>): Sender channel for broadcasting network messages.network_direct_tx (
NetDirectSender<NodeIdentifier, NetworkMessage>): Sender channel for direct network messages to specific nodes.raw_block_tx (
InstrumentedSender<(NodeIdentifier, Vec<u8>)>): Channel for sending raw block data.bls_keys_map (
Arc<Mutex<HashMap<PubKey, (Secret, RndSeed)>>): Thread-safe map of BLS public keys to their secret keys and random seeds.last_block_attestations (
Arc<Mutex<CollectedAttestations>>): Thread-safe storage of attestations for the last block.received_acks (
Arc<Mutex<Vec<Envelope<GoshBLS, AckData>>>>): Received acknowledgment envelopes.sent_acks (
BTreeMap<BlockSeqNo, Envelope<GoshBLS, AckData>>): Sent acknowledgment envelopes indexed by block sequence number.received_nacks (
Arc<Mutex<Vec<Envelope<GoshBLS, NackData>>>>): Received negative acknowledgment envelopes.config (
Config): Configuration parameters for the node.received_attestations (
BTreeMap<BlockSeqNo, HashMap<BlockIdentifier, HashSet<SignerIndex>>>): Attestations received indexed by block sequence number.block_keeper_rng (
TRandomGenerator): RNG for block keeper selection.producer_election_rng (
TRandomGenerator): RNG for block producer election.attestations_to_send (
BTreeMap<BlockSeqNo, Vec<Envelope<GoshBLS, AttestationData>>>): Attestations queued for sending.ack_cache (
BTreeMap<BlockSeqNo, Vec<Envelope<GoshBLS, AckData>>>): Cache for acknowledgments.nack_cache (
BTreeMap<BlockSeqNo, Vec<Envelope<GoshBLS, NackData>>>): Cache for negative acknowledgments.thread_id (
ThreadIdentifier): Identifier for the node thread.is_spawned_from_node_sync (
bool): Flag indicating if the node was spawned from a synchronization operation.block_state_repository (
BlockStateRepository): Repository for block states.block_processor_service (
BlockProcessorService): Service managing block processing.attestation_send_service (
AttestationSendServiceHandler): Service handling sending of attestations.validation_service (
ValidationServiceInterface): Service for block and transaction validation.skipped_attestation_ids (
Arc<Mutex<HashSet<BlockIdentifier>>>): Set of attestation IDs that were skipped.message_db (
MessageDurableStorage): Storage for durable external messages.last_broadcasted_produced_candidate_block_time (
std::time::Instant): Timestamp of the last broadcasted produced candidate block.finalization_loop (
JoinHandle<()>): Handle for the finalization thread loop.producer_service (
ProducerService): Service responsible for block production.metrics (
Option<BlockProductionMetrics>): Optional metrics collector for block production.external_messages (
ExternalMessagesThreadState): State management for external messages.is_state_sync_requested (
Arc<Mutex<Option<BlockSeqNo>>>): Flag indicating if a state sync request is active.blk_req_tx (
Sender<BlockRequestParams>): Channel sender for block requests.ext_msg_receiver (
JoinHandle<()>): Handle for the external messages receiver thread.authority_state (
Arc<Mutex<Authority>>): Authority state management.unprocessed_blocks_cache (
UnfinalizedCandidateBlockCollection): Cache of unfinalized candidate blocks.stop_result_tx (
Sender<()>): Channel sender to signal stop results.stalled_threads (
Arc<Mutex<HashSet<ThreadIdentifier>>>): Set of stalled thread identifiers.last_synced_state (
Option<(BlockIdentifier, BlockSeqNo, HashMap<ThreadIdentifier, BlockIdentifier>)>): Last synced blockchain state recorded.chain_pulse_monitor (
Sender<ChainPulseEvent>): Channel sender for chain pulse monitoring events.authority_handler (
JoinHandle<()>): Handle for the authority management thread.
Usage
The Node struct represents an active participant in the blockchain network, managing block production, validation, and synchronization within a dedicated thread. It interacts extensively with services such as StateSyncService for state synchronization, BlockProcessorService for processing incoming blocks, and ProducerService for block production. Networking channels facilitate message exchange, while cryptographic keys and attestations ensure security and consensus.
Method: new
pub fn new(
shared_services: SharedServices,
state_sync_service: TStateSyncService,
production_process: TVMBlockProducerProcess,
repository: RepositoryImpl,
network_rx: XInstrumentedReceiver<(NetworkMessage, SocketAddr)>,
network_broadcast_tx: NetBroadcastSender<NetworkMessage>,
network_direct_tx: NetDirectSender<NodeIdentifier, NetworkMessage>,
raw_block_tx: InstrumentedSender<(NodeIdentifier, Vec<u8>)>,
bls_keys_map: Arc<Mutex<HashMap<PubKey, (Secret, RndSeed)>>>,
config: Config,
block_keeper_rng: TRandomGenerator,
producer_election_rng: TRandomGenerator,
thread_id: ThreadIdentifier,
feedback_sender: InstrumentedSender<ExtMsgFeedbackList>,
_update_producer_group: bool,
block_state_repository: BlockStateRepository,
block_processor_service: BlockProcessorService,
attestations_target_service: AttestationTargetsService,
validation_service: ValidationServiceInterface,
skipped_attestation_ids: Arc<Mutex<HashSet<BlockIdentifier>>>,
metrics: Option<BlockProductionMetrics>,
self_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>,
external_messages: ExternalMessagesThreadState,
message_db: MessageDurableStorage,
last_block_attestations: Arc<Mutex<CollectedAttestations>>,
bp_production_count: Arc<AtomicI32>,
blk_req_tx: Sender<BlockRequestParams>,
attestation_send_service: AttestationSendServiceHandler,
ext_msg_receiver: JoinHandle<()>,
authority_state: Arc<Mutex<Authority>>,
unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection,
stop_result_tx: Sender<()>,
stalled_threads: Arc<Mutex<HashSet<ThreadIdentifier>>>,
chain_pulse_monitor: Sender<ChainPulseEvent>,
authority_handler: JoinHandle<()>,
self_authority_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>,
save_optimistic_service_sender: InstrumentedSender<Arc<OptimisticStateImpl>>,
) -> Self
Parameters
shared_services: Shared utility services accessed by the node.state_sync_service: Service responsible for synchronizing state with the network.production_process: Encapsulates the logic for producing TVM blocks.repository: Blockchain data repository interface.network_rx: Receiver channel for network messages.network_broadcast_tx: Sender for broadcasting messages to the network.network_direct_tx: Sender for direct node-to-node messaging.raw_block_tx: Sender for raw block data.bls_keys_map: Shared BLS key pairs map.config: Node configuration settings.block_keeper_rng: RNG for block keeper selection.producer_election_rng: RNG for producer election.thread_id: Identifier for this node thread.feedback_sender: Channel for sending feedback on external messages._update_producer_group: Flag for producer group updates (currently unused).block_state_repository: Repository managing block state data.block_processor_service: Service handling block processing.attestations_target_service: Service managing attestation targets.validation_service: Service interface for validation.skipped_attestation_ids: Set of attestation IDs skipped during processing.metrics: Optional metrics collector for block production.self_tx: Instrumented sender for self-sent network messages.external_messages: State for handling external messages.message_db: Storage for durable external messages.last_block_attestations: Attestations collected for the last block.bp_production_count: Atomic counter for block production.blk_req_tx: Channel sender for block requests.attestation_send_service: Service for sending attestations.ext_msg_receiver: Handle for external messages receiver thread.authority_state: Authority state management.unprocessed_blocks_cache: Cache for unprocessed candidate blocks.stop_result_tx: Channel sender to signal stop results.stalled_threads: Set of stalled thread identifiers.chain_pulse_monitor: Channel sender for chain pulse events.authority_handler: Handle for authority handler thread.self_authority_tx: Instrumented sender for authority-related messages.save_optimistic_service_sender: Sender for saving optimistic state.
Returns
A fully initialized instance of
Node.
Description
The new method initializes the Node with all its dependencies and starts critical internal services and threads, including:
Registers block producer control channel with the authority.
Initializes caches and collections for attestations and acknowledgments.
Starts a dedicated thread for block finalization processing, which runs the
finalization_loopfunction.Starts the block producer service with provided parameters.
The method ensures thread-safe setup of state and communication channels and integrates the various subsystems required for node operation.
Usage Example
let node = Node::new(
shared_services,
state_sync_service,
production_process,
repository,
network_rx,
network_broadcast_tx,
network_direct_tx,
raw_block_tx,
bls_keys_map,
config,
block_keeper_rng,
producer_election_rng,
thread_id,
feedback_sender,
false,
block_state_repository,
block_processor_service,
attestations_target_service,
validation_service,
skipped_attestation_ids,
Some(metrics),
self_tx,
external_messages,
message_db,
last_block_attestations,
bp_production_count,
blk_req_tx,
attestation_send_service,
ext_msg_receiver,
authority_state,
unprocessed_blocks_cache,
stop_result_tx,
stalled_threads,
chain_pulse_monitor,
authority_handler,
self_authority_tx,
save_optimistic_service_sender,
);
Implementation Details and Algorithms
Threaded Finalization Loop: The file spawns a dedicated thread named
"Block finalization loop {thread_id}"that runs thefinalization_loopfunction. This loop handles the finalization of blocks in the blockchain, involving repository interaction, block state updates, and message broadcasting.Authority Registration: The
Noderegisters its block producer control channel and self-node authority sender with theAuthoritystate, allowing for controlled block production and authority state changes.Block Production Service: The
ProducerServiceis started within the node context, coordinating block production timing, feedback handling, attestation management, and interaction with the shared services and state synchronization service.Use of
ArcandMutex: For thread-safe shared mutable state, fields likebls_keys_map,last_block_attestations,received_acks,received_nacks, and others are wrapped inArc<Mutex<...>>ensuring safe concurrent access.Randomness Sources: Two separate random number generators (
block_keeper_rngandproducer_election_rng) are maintained for different randomized consensus or election tasks.Metrics Collection: Optionally collects and reports metrics related to block production if a
BlockProductionMetricsinstance is provided.Message Channels: Utilizes instrumented channels (
XInstrumentedReceiver,InstrumentedSender) for network communication and internal messaging, enabling telemetry and observability.
Interactions with Other System Components
State Synchronization Service (
TStateSyncService): Coordinates synchronization of blockchain state across the network.Repository Layer (
RepositoryImpl,BlockStateRepository): Interfaces with persistent storage for blockchain data and block states.Network Messaging (
network_messagemodule): ExchangesNetworkMessageinstances over network channels to communicate with other nodes.Cryptographic Services (
blsmodules): Uses BLS cryptography for signing and verifying attestations, acknowledgments, and other consensus messages.Block Processing Services: Collaborates with
BlockProcessorServiceandValidationServiceInterfacefor validating and processing incoming blocks.Attestation Handling: Uses
AttestationSendServiceHandlerand attestation-related data structures to manage consensus attestations.Authority Management: Coordinates with
Authorityto manage authority state and block production control.External Messages Handling: Manages external messages via
ExternalMessagesThreadStateand related channels.Unprocessed Blocks Cache: Maintains a cache of unfinalized candidate blocks for later processing.
Metrics and Monitoring: Reports metrics and monitors chain pulse events to track blockchain progress.
Visual Diagram
classDiagram
class Node {
-repository: RepositoryImpl
-shared_services: SharedServices
-state_sync_service: TStateSyncService
-network_rx: XInstrumentedReceiver
-network_broadcast_tx: NetBroadcastSender
-network_direct_tx: NetDirectSender
-raw_block_tx: InstrumentedSender
-bls_keys_map: Arc<Mutex<HashMap>>
-last_block_attestations: Arc<Mutex<CollectedAttestations>>
-received_acks: Arc<Mutex<Vec<Envelope<AckData>>>>
-sent_acks: BTreeMap<BlockSeqNo, Envelope<AckData>>
-received_nacks: Arc<Mutex<Vec<Envelope<NackData>>>>
-config: Config
-received_attestations: BTreeMap<BlockSeqNo, HashMap<BlockIdentifier, HashSet<SignerIndex>>>
-block_keeper_rng: TRandomGenerator
-producer_election_rng: TRandomGenerator
-attestations_to_send: BTreeMap<BlockSeqNo, Vec<Envelope<AttestationData>>>
-ack_cache: BTreeMap<BlockSeqNo, Vec<Envelope<AckData>>>
-nack_cache: BTreeMap<BlockSeqNo, Vec<Envelope<NackData>>>
-thread_id: ThreadIdentifier
-is_spawned_from_node_sync: bool
-block_state_repository: BlockStateRepository
-block_processor_service: BlockProcessorService
-attestation_send_service: AttestationSendServiceHandler
-validation_service: ValidationServiceInterface
-skipped_attestation_ids: Arc<Mutex<HashSet<BlockIdentifier>>>
-message_db: MessageDurableStorage
-last_broadcasted_produced_candidate_block_time: Instant
-finalization_loop: JoinHandle
-producer_service: ProducerService
-metrics: Option<BlockProductionMetrics>
-external_messages: ExternalMessagesThreadState
-is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>
-blk_req_tx: Sender<BlockRequestParams>
-ext_msg_receiver: JoinHandle
-authority_state: Arc<Mutex<Authority>>
-unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection
-stop_result_tx: Sender
-stalled_threads: Arc<Mutex<HashSet<ThreadIdentifier>>>
-last_synced_state: Option<(BlockIdentifier, BlockSeqNo, HashMap<ThreadIdentifier, BlockIdentifier>)>
-chain_pulse_monitor: Sender<ChainPulseEvent>
-authority_handler: JoinHandle
+new()
}
Node --> "1" RepositoryImpl
Node --> "1" SharedServices
Node --> "1" TStateSyncService
Node --> "1" BlockProcessorService
Node --> "1" AttestationSendServiceHandler
Node --> "1" ValidationServiceInterface
Node --> "1" BlockStateRepository
Node --> "1" ProducerService
Node --> "1" Authority
Node --> "1" UnfinalizedCandidateBlockCollection
Node --> "1" ExternalMessagesThreadState
Node --> "many" NetworkMessage
Node --> "many" Envelope
Node --> "many" BlockIdentifier
Node --> "many" BlockSeqNo
This diagram shows the primary properties and key relationships of the Node struct with core services and data types it depends on or manages.