block_producer.rs
Overview
The block_producer.rs file implements the BlockProducer struct, which is responsible for orchestrating the production of blocks within a specific thread of the blockchain system. It manages the lifecycle of block production, including starting and stopping block generation, handling production timeouts, updating candidate blocks with required attestations, and broadcasting produced blocks to the network.
This module interacts heavily with block state repositories, the underlying repository implementation, cryptographic BLS signing utilities, network messaging, and various supporting services such as attestation target evaluation and optimistic state management. It also manages synchronization with external commands and ensures proper handling of production continuation after restarts.
Key Components
Structs and Enums
ProductionStatus
Purpose: Keeps track of the current production parameters and the last produced block for a block producer thread.
Fields:
init_params: StartBlockProducerThreadInitialParameters- Initial parameters used to start the block production thread.last_produced: Option<(BlockIdentifier, BlockSeqNo)>- Optionally stores the last produced block's identifier and sequence number.
BlockProducer
Purpose: Main struct responsible for managing block production for a particular thread.
Fields:
self_addr: SocketAddr- Network address of the current node.thread_id: ThreadIdentifier- Identifier of the thread this producer manages.repository: RepositoryImpl- Main repository interface to blockchain data.block_state_repository: BlockStateRepository- Repository for managing block states.production_process: TVMBlockProducerProcess- The process managing actual block production logic.feedback_sender: InstrumentedSender<ExtMsgFeedbackList>- Channel to send feedback messages externally.received_acks: Arc<Mutex<Vec<Envelope<GoshBLS, AckData>>>>- Thread-safe storage for received acknowledgments.received_nacks: Arc<Mutex<Vec<Envelope<GoshBLS, NackData>>>>- Thread-safe storage for received negative acknowledgments.shared_services: SharedServices- Shared services used across the node.bls_keys_map: Arc<Mutex<HashMap<PubKey, (Secret, RndSeed)>>>- BLS cryptographic keys map.last_broadcasted_produced_candidate_block_time: std::time::Instant- Timestamp of the last broadcasted candidate block.last_block_attestations: Arc<Mutex<CollectedAttestations>>- Storage for collected attestations on the last block.attestations_target_service: AttestationTargetsService- Service for evaluating attestation targets.self_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>- Channel for sending messages to self.self_authority_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>- Channel for sending authority-related messages.broadcast_tx: NetBroadcastSender<NetworkMessage>- Network broadcast sender.node_identifier: NodeIdentifier- Identifier of the node.production_timeout: Duration- Timeout duration for production cycles.save_state_frequency: u32- Frequency of saving state snapshots.bp_production_count: Arc<AtomicI32>- Atomic counter for active block producers.production_status: Option<ProductionStatus>- Current production status.producing_status: bool- Flag indicating if production is currently active.external_messages: ExternalMessagesThreadState- State of external messages.is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>- Flag and sequence number for state sync requests.control_rx: std::sync::mpsc::Receiver<BlockProducerCommand>- Receiver for control commands.save_optimistic_service_sender: InstrumentedSender<Arc<OptimisticStateImpl>>- Sender for optimistic state saving service.
UpdateCommonSectionResult
Purpose: Enum to express the result of attempting to update the common section of a candidate block.
Variants:
Success - Update succeeded.
FailedToBuildChainToTheLastFinalizedBlock - Unable to build a valid chain to the last finalized block.
NotReadyYet - Insufficient data or conditions to update.
AncestorIsNotReadyYet - Ancestor blocks are not ready.
AbortNotInBKSet- The node is no longer in the block keeper set, abort production.
Functions and Methods
BlockProducer::main_loop
pub fn main_loop(&mut self) -> anyhow::Result<()>
Description: This is the main execution loop for the block producer thread. It listens for control commands, manages production state, starts or stops block production accordingly, and handles timeouts.
Behavior:
Listens for
BlockProducerCommandmessages to control production.Starts production if not currently producing.
Handles production timeout via
on_production_timeout.Manages production status and updates metrics.
Sleeps for appropriate durations based on production activity.
Return: Returns
anyhow::Result<()>indicating success or failure.Usage Example:
let mut producer = BlockProducer::builder()...build(); producer.main_loop()?;
BlockProducer::start_production
pub fn start_production(&mut self, next_bp_command: Option<BlockProducerCommand>) -> anyhow::Result<Option<(BlockIdentifier, BlockSeqNo)>>
Description: Initiates block production based on the latest command and internal state.
Parameters:
next_bp_command: Optional command to guide production start.
Returns: Optionally returns
(BlockIdentifier, BlockSeqNo)tuple indicating the block to continue production from.Details:
Calls
find_thread_last_block_id_this_node_can_continueto find the last block to continue production from.Starts the production process if possible.
Updates production status flags.
Errors: Returns error if production cannot start.
Usage Example:
let continuation = producer.start_production(Some(BlockProducerCommand::Start(params)))?;
BlockProducer::find_thread_last_block_id_this_node_can_continue
pub(crate) fn find_thread_last_block_id_this_node_can_continue(
&mut self,
next_bp_command: Option<BlockProducerCommand>
) -> anyhow::Result<Option<(BlockIdentifier, BlockSeqNo, BlockRound)>>
Description: Determines the last block ID and sequence number that the node's thread can continue producing from.
Parameters:
next_bp_command: Optional command that may update production parameters.
Returns: Optionally returns a tuple containing block identifier, sequence number, and round number to continue from.
Implementation Details:
Processes control commands from the
control_rx.Evaluates if production status needs updating.
Checks against the repository for the last finalized block.
Returns
Noneif production cannot continue.
Errors: Returns errors if control channel is disconnected unexpectedly.
Usage Example:
let last_block = producer.find_thread_last_block_id_this_node_can_continue(None)?;
BlockProducer::on_production_timeout
pub(crate) fn on_production_timeout(
&mut self,
producer_tails: &mut Option<(BlockIdentifier, BlockSeqNo)>,
memento: Option<BlockProducerMemento>
) -> anyhow::Result<(bool, Option<BlockProducerMemento>)>
Description: Handles production timeout events by processing and broadcasting any produced blocks, updating block states, and managing production continuation or termination.
Parameters:
producer_tails: Mutable reference to current production continuation state.memento: Optional memento capturing production state.
Returns: Tuple containing a boolean indicating if broadcasting occurred and an optional updated memento.
Key Steps:
Waits for attestation notifications if necessary.
Processes produced blocks in order.
Checks block validity against the latest finalized blocks.
Updates candidate blocks' common sections.
Signs blocks and prepares network messages.
Broadcasts candidate blocks and stores optimistic states.
Stops production if invalidation conditions are met.
Errors: Returns errors on failure to process or broadcast.
Notes: Uses synchronization via mutexes and conditional variables on attestations.
Usage Example:
let (broadcasted, new_memento) = producer.on_production_timeout(&mut tails, memento)?;
BlockProducer::_does_block_have_a_valid_sibling
pub(crate) fn _does_block_have_a_valid_sibling(
&self,
candidate_block: &Envelope<GoshBLS, AckiNackiBlock>
) -> anyhow::Result<bool>
Description: Checks if a block has any sibling blocks (blocks with the same parent) already known to the block state repository.
Parameters:
candidate_block: The candidate block envelope to check.
Returns: Boolean indicating presence of valid sibling blocks.
Usage: This method is currently unused but available for potential block validation logic.
BlockProducer::update_candidate_common_section
fn update_candidate_common_section(
&mut self,
candidate_block: &mut AckiNackiBlock,
share_state_ids: Option<HashMap<ThreadIdentifier, BlockIdentifier>>,
optimistic_state: &OptimisticStateImpl,
) -> anyhow::Result<UpdateCommonSectionResult>
Description: Updates the common section of a candidate block with aggregated attestations, directives, and producer selector information.
Parameters:
candidate_block: Mutable reference to the candidate block to update.share_state_ids: Optional map of thread identifiers to block identifiers for state sharing directives.optimistic_state: The optimistic state associated with the block.
Returns: An
UpdateCommonSectionResultindicating the success or failure of the update.Implementation Details:
Aggregates attestations required by ancestor blocks.
Evaluates attestation targets via
AttestationTargetsService.Updates block directives and thread tables if sharing state.
Adjusts the producer selector according to the block keeper set.
Handles errors related to block state readiness and invalidation.
Usage Example:
let result = producer.update_candidate_common_section(&mut block, share_state, &optimistic_state)?;
BlockProducer::get_producer_selector
pub fn get_producer_selector(
&self,
parent_block_id: &BlockIdentifier,
) -> anyhow::Result<ProducerSelector>
Description: Retrieves or constructs the
ProducerSelectorfor the producer based on the parent block.Parameters:
parent_block_id: Reference to the parent block identifier.
Returns: A
ProducerSelectorinstance.Details:
If the thread is spawning from the last finalized block, creates a new selector.
Otherwise, attempts to retrieve the selector from the block state repository.
Errors: Returns error if the selector cannot be found.
Usage Example:
let selector = producer.get_producer_selector(&parent_block_id)?;
BlockProducer::broadcast_candidate_block
pub(crate) fn broadcast_candidate_block(
&self,
block_id: &BlockIdentifier,
candidate_block: NetworkMessage,
mut ext_msg_feedbacks: ExtMsgFeedbackList,
) -> anyhow::Result<()>
Description: Broadcasts a candidate block network message to peers and sends any external message feedback.
Parameters:
block_id: Identifier of the block being broadcast.candidate_block: The network message encapsulating the candidate block.ext_msg_feedbacks: Feedback messages related to external messages.
Returns: Result indicating success or failure.
Details:
Sends the candidate block over the broadcast channel.
Updates feedback messages with the block hash.
Sends feedback messages via the
feedback_sender.
Usage Example:
producer.broadcast_candidate_block(&block_id, net_msg, feedbacks)?;
BlockProducer::_broadcast_candidate_block_that_was_possibly_produced_by_another_node
pub(crate) fn _broadcast_candidate_block_that_was_possibly_produced_by_another_node(
&self,
candidate_block: Envelope<GoshBLS, AckiNackiBlock>,
) -> anyhow::Result<()>
Description: Rebroadcasts a candidate block that might have been produced by another node, ensuring dissemination in the network.
Parameters:
candidate_block: The envelope of the candidate block.
Returns: Result indicating success or failure.
Usage: Used internally for propagating blocks received from others.
BlockProducer::execute_restarted_producer
fn execute_restarted_producer(
&mut self,
_block_id_to_continue: BlockIdentifier,
_block_seq_no_to_continue: BlockSeqNo,
) -> anyhow::Result<SynchronizationResult<NetworkMessage>>
Description: Placeholder method intended for logic to execute when the producer thread is restarted and needs to continue production from a specified block.
Parameters:
_block_id_to_continue: The block identifier to continue from._block_seq_no_to_continue: The sequence number of the block.
Returns: A synchronization result wrapped in a Result.
Current Implementation: Returns an immediate
Ok(SynchronizationResult::Ok)with commented-out logic for possible future implementation.Notes: The commented code implies future support for resending blocks upon restart to synchronize network state.
Important Implementation Details and Algorithms
Production Control Loop: The
main_loopmethod implements an event-driven loop that reacts to control commands and production timeouts. It coordinates starting and stopping production and ensures the node remains synchronized with the network state.Block Production Continuation: The
find_thread_last_block_id_this_node_can_continuemethod ensures that production continues from the correct block, respecting finalized block boundaries and reacting to control messages.Timeout Handling and Block Broadcasting: The
on_production_timeoutmethod processes blocks produced during the production window, validates them, updates their common sections with aggregated attestations, signs, and broadcasts them. It also manages optimistic state caching and triggers production halts if necessary.Attestation Aggregation and Verification: The block producer aggregates attestations required by ancestor blocks using the
AttestationTargetsServiceto verify that the candidate block can be finalized.Producer Selector Management: The producer selector is dynamically adjusted based on the current block keeper set to ensure correct producer rotation and block assignment.
Thread-safe Data Structures: The use of
Arc<Mutex<>>for shared mutable state (received acks, nacks, attestations, BLS keys) ensures thread-safe concurrent access within asynchronous production workflows.Memento Pattern for Production State: Production state is optionally saved and restored via
BlockProducerMementoto allow for recovery and continuation after interruptions.
Interactions with Other System Components
Repository and Block State Repository: Reads and writes block states and optimistic states to persistent and cache layers.
TVMBlockProducerProcess: Delegates actual block production and generation tasks.
Network Messaging: Sends and broadcasts
NetworkMessagetypes representing candidate blocks or authority switch messages.BLS Cryptography Modules: Uses BLS envelope signing for block validation and proof of authenticity.
Attestation Targets Service: Evaluates if blocks meet attestation thresholds required for finalization.
External Messages and Feedback: Interacts with external messaging systems to provide feedback on block processing.
Control Channel: Receives commands to start or stop production, allowing external control over block production behavior.
Optimistic State Service: Sends optimistic states to be saved asynchronously.
Visual Diagram: Class Diagram of BlockProducer
classDiagram
class BlockProducer {
+self_addr: SocketAddr
+thread_id: ThreadIdentifier
+repository: RepositoryImpl
+block_state_repository: BlockStateRepository
+production_process: TVMBlockProducerProcess
+feedback_sender: InstrumentedSender<ExtMsgFeedbackList>
+received_acks: Arc<Mutex<Vec<Envelope>>>
+received_nacks: Arc<Mutex<Vec<Envelope>>>
+shared_services: SharedServices
+bls_keys_map: Arc<Mutex<HashMap>>
+last_broadcasted_produced_candidate_block_time: Instant
+last_block_attestations: Arc<Mutex<CollectedAttestations>>
+attestations_target_service: AttestationTargetsService
+self_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>
+self_authority_tx: XInstrumentedSender<(NetworkMessage, SocketAddr)>
+broadcast_tx: NetBroadcastSender<NetworkMessage>
+node_identifier: NodeIdentifier
+production_timeout: Duration
+save_state_frequency: u32
+bp_production_count: Arc<AtomicI32>
+production_status: Option<ProductionStatus>
+producing_status: bool
+external_messages: ExternalMessagesThreadState
+is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>
+control_rx: Receiver<BlockProducerCommand>
+save_optimistic_service_sender: InstrumentedSender<Arc<OptimisticStateImpl>>
+main_loop()
+start_production()
+find_thread_last_block_id_this_node_can_continue()
+on_production_timeout()
+update_candidate_common_section()
+get_producer_selector()
+broadcast_candidate_block()
+execute_restarted_producer()
}
class ProductionStatus {
+init_params: StartBlockProducerThreadInitialParameters
+last_produced: Option<(BlockIdentifier, BlockSeqNo)>
}
BlockProducer --> ProductionStatus
Usage Notes and Examples
Starting the Production Loop:
InstantiateBlockProducerusing the builder pattern (via TypedBuilder), configure necessary parameters, and callmain_loop()to start production in the thread context.Handling Commands:
The block producer listens oncontrol_rxforBlockProducerCommandmessages such