process.rs
Overview
This file implements the core logic for block production within a multi-threaded blockchain node environment. It defines the TVMBlockProducerProcess struct, which manages block production threads, state transitions, and communication with other system components such as repositories, shared services, and external message queues.
The primary responsibilities include managing the lifecycle of block production threads, orchestrating the production of new blocks using the TVM (TON Virtual Machine) executor, aggregating acknowledgments (acks) and negative acknowledgments (nacks) from other nodes, handling cross-thread references, and coordinating synchronization with external state sharing services.
Main Structs and Enums
TVMBlockProducerProcess
A builder-constructible struct that encapsulates state and services required for producing blocks in the TVM environment.
Fields
node_config: Config
Configuration parameters for the node operation.blockchain_config: Arc<BlockchainConfig>
Shared blockchain configuration.repository: RepositoryImpl
Repository interface for blockchain data storage and retrieval.produced_blocks: Arc<Mutex<Vec<ProducedBlock>>>
Thread-safe collection of blocks produced by this process.active_producer_thread: Option<(JoinHandle<OptimisticStateImpl>, InstrumentedSender<()>)>
Handle and control channel to the currently active block production thread.block_produce_timeout: Arc<Mutex<Duration>>
Timeout duration controlling the maximum allowed block production time.optimistic_state_cache: Option<Arc<OptimisticStateImpl>>
Optional cached optimistic state used for efficient block production.epoch_block_keeper_data_senders: Option<InstrumentedSender<BlockKeeperData>>
Sender channel for epoch-specific data updates.shared_services: SharedServices
Services shared across node components, including synchronization and cross-thread reference services.producer_node_id: NodeIdentifier
Identifier of the node producing blocks.thread_count_soft_limit: usize
Soft limit on the number of threads used concurrently for block production.parallelization_level: usize
Level of parallelization allowed during block production.block_keeper_epoch_code_hash: String
Hash identifying the epoch block keeper code.block_keeper_preepoch_code_hash: String
Hash identifying the pre-epoch block keeper code.metrics: Option<BlockProductionMetrics>
Optional metrics collector for block production activities.wasm_cache: WasmNodeCache
Cache for WebAssembly modules used during block production.share_service: Option<ExternalFileSharesBased>
Optional external service for sharing state snapshots.save_optimistic_service_sender: InstrumentedSender<Arc<OptimisticStateImpl>>
Channel used to send optimistic states to other system components for persistence or further processing.
Usage
TVMBlockProducerProcess is instantiated via the builder pattern and manages starting, stopping, and controlling block production for individual threads in the blockchain node.
ProcudeNextResult
An enum representing the result of attempting to produce the next block.
Variants:
Stopped
Indicates that block production was stopped.Continues
Indicates that block production should continue.
Key Methods and Functions
produce_next
fn produce_next(
node_config: Config,
initial_state: &mut OptimisticStateImpl,
blockchain_config: Arc<BlockchainConfig>,
producer_node_id: NodeIdentifier,
thread_count_soft_limit: usize,
parallelization_level: usize,
block_keeper_epoch_code_hash: String,
block_keeper_preepoch_code_hash: String,
produced_blocks: Arc<Mutex<Vec<ProducedBlock>>>,
timeout: Arc<Mutex<Duration>>,
timeout_correction: &mut ProductionTimeoutCorrection,
thread_id_clone: ThreadIdentifier,
epoch_block_keeper_data_rx: &InstrumentedReceiver<BlockKeeperData>,
shared_services: &mut SharedServices,
active_block_producer_threads: &mut Vec<(Cell, ActiveThread)>,
received_acks: Arc<Mutex<Vec<Envelope<GoshBLS, AckData>>>>,
received_nacks: Arc<Mutex<Vec<Envelope<GoshBLS, NackData>>>>,
block_state_repo: BlockStateRepository,
accounts_repo: AccountsRepository,
external_control_rx: &InstrumentedReceiver<()>,
metrics: Option<BlockProductionMetrics>,
wasm_cache: WasmNodeCache,
external_messages_queue: &mut ExternalMessagesThreadState,
repository: &RepositoryImpl,
is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>,
share_service: Option<ExternalFileSharesBased>,
round: BlockRound,
parent_block_state: BlockState,
) -> anyhow::Result<(ProcudeNextResult, BlockState)>
Description
Executes one iteration of the block production process. This involves:
Reading external messages and aggregating acks and nacks.
Constructing a
TVMBlockProducerwith the current context.Gathering cross-thread references and applying delay logic for finalized blocks.
Optionally sharing state snapshots if requested.
Spawning a dedicated thread to produce the block within specified time limits.
Handling timing and control signals to stop production.
Updating the optimistic state and managed active threads.
Persisting produced block information and cross-thread reference data.
Reporting metrics for block production time.
Parameters
node_config: Node configuration parameters.initial_state: Mutable reference to the current optimistic state.blockchain_config: Blockchain configuration shared across the node.producer_node_id: Identifier of the producer node.thread_count_soft_limit: Soft limit for thread count during production.parallelization_level: Level of parallelization allowed.block_keeper_epoch_code_hash: Epoch keeper code hash.block_keeper_preepoch_code_hash: Pre-epoch keeper code hash.produced_blocks: Shared collection of produced blocks.timeout: Timeout duration for block production.timeout_correction: Correction adjustments to production timeouts.thread_id_clone: Identifier of the thread producing the block.epoch_block_keeper_data_rx: Receiver for epoch-specific data.shared_services: Shared node services.active_block_producer_threads: Mutable list of active block producer threads.received_acks: Shared collection of received acknowledgments.received_nacks: Shared collection of received negative acknowledgments.block_state_repo: Repository for block states.accounts_repo: Repository for account data.external_control_rx: Receiver for external control signals.metrics: Optional metrics collector.wasm_cache: Cache for WebAssembly execution.external_messages_queue: Queue of external messages.repository: General blockchain repository.is_state_sync_requested: Flag indicating if state synchronization is requested.share_service: Optional service for external state sharing.round: Current block round.parent_block_state: State of the parent block.
Returns
An anyhow::Result containing a tuple:
ProcudeNextResult: Indicates whether production should continue or stop.BlockState: The new block state after production.
Example Usage
let (result, new_state) = TVMBlockProducerProcess::produce_next(
node_config,
&mut initial_state,
blockchain_config,
producer_node_id,
thread_count_soft_limit,
parallelization_level,
epoch_code_hash,
preepoch_code_hash,
produced_blocks,
timeout,
&mut timeout_correction,
thread_id,
&epoch_data_rx,
&mut shared_services,
&mut active_threads,
received_acks,
received_nacks,
block_state_repo,
accounts_repo,
&external_control_rx,
metrics,
wasm_cache,
&mut external_messages,
&repository,
is_state_sync_requested,
share_service,
round,
parent_block_state,
)?;
start_thread_production
pub fn start_thread_production(
&mut self,
thread_id: &ThreadIdentifier,
prev_block_id: &BlockIdentifier,
received_acks: Arc<Mutex<Vec<Envelope<GoshBLS, AckData>>>>,
received_nacks: Arc<Mutex<Vec<Envelope<GoshBLS, NackData>>>>,
block_state_repository: BlockStateRepository,
external_messages: ExternalMessagesThreadState,
is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>,
initial_round: BlockRound,
) -> anyhow::Result<()>
Description
Starts a new block production thread for the given blockchain thread identifier. It initializes the optimistic state either from cache or repository, sets up necessary communication channels, and spawns a dedicated thread that runs an infinite loop producing blocks until an interrupt signal is received.
Parameters
thread_id: The identifier of the blockchain thread for which production is started.prev_block_id: Identifier of the previous block to base production on.received_acks: Shared collection of acknowledgments received.received_nacks: Shared collection of negative acknowledgments received.block_state_repository: Repository interface for block states.external_messages: External messages queue state.is_state_sync_requested: Flag indicating whether a state sync is requested.initial_round: The initial round number for block production.
Returns
anyhow::Result<()> indicating success or failure in starting the production thread.
Implementation Details
Checks if a production thread is already active to avoid duplication.
Loads optimistic state from cache or repository.
Sets up control and epoch data channels.
Spawns a thread with a production loop calling
produce_next.Stores handles and control senders for managing the thread.
stop_thread_production
pub fn stop_thread_production(&mut self, thread_id: &ThreadIdentifier) -> anyhow::Result<()>
Description
Stops the active block production thread for the specified blockchain thread. Sends a control signal to terminate the production loop, waits for thread completion, caches the last optimistic state, and clears produced blocks.
Parameters
thread_id: Identifier of the thread to stop production for.
Returns
anyhow::Result<()> indicating success or failure.
get_produced_blocks
pub fn get_produced_blocks(&mut self) -> Vec<ProducedBlock>
Description
Retrieves and clears the list of blocks produced by the process. If the active production thread has finished, it clears the active thread handle and returns an empty vector.
The returned blocks are sorted by sequence number, and their memento initialization time is updated to the current instant.
Returns
A sorted Vec<ProducedBlock> containing blocks produced so far.
set_timeout
pub fn set_timeout(&mut self, timeout: Duration)
Description
Sets the block production timeout duration used to limit production time.
Parameters
timeout: The new timeout duration.
send_epoch_message
pub fn send_epoch_message(&self, data: BlockKeeperData)
Description
Sends epoch-specific data to the block production thread via the configured channel.
Parameters
data: TheBlockKeeperDatamessage to send.
Helper Functions
aggregate_acks
fn aggregate_acks(
received_acks: Vec<Envelope<GoshBLS, AckData>>
) -> anyhow::Result<Vec<Envelope<GoshBLS, AckData>>>
Aggregates multiple acknowledgment envelopes by merging signature occurrences and combining aggregated BLS signatures.
Uses a hashmap keyed by block identifier to accumulate merged acknowledgments.
Merges signatures using the underlying
GoshBLSsignature scheme.Clears the input vector after aggregation.
aggregate_nacks
fn aggregate_nacks(
received_nacks: Vec<Envelope<GoshBLS, NackData>>
) -> anyhow::Result<Vec<Envelope<GoshBLS, NackData>>>
Aggregates negative acknowledgment envelopes similarly to aggregate_acks. Note that the aggregation logic for nacks is marked with a TODO indicating potential limitations in aggregation based solely on block IDs.
Important Implementation Details and Algorithms
Threaded Block Production: The design spawns dedicated OS threads for each blockchain thread's block production. These threads run infinite loops producing blocks, controlled via instrumented channels for signaling.
Optimistic State Management: The block production processes maintain and update optimistic states representing the latest known blockchain state, which supports efficient block generation.
Cross-thread Reference Handling: To handle dependencies across blockchain threads, the production process collects and filters cross-thread references, sometimes delaying references to finalized blocks based on configurable features.
Acknowledgment Aggregation: Received acks and nacks are aggregated to reduce communication overhead and improve consensus efficiency. This is done by merging BLS signatures and maintaining counts of signature occurrences.
Timeout and Correction Logic: Production timeouts are enforced with corrections to handle timing variances, ensuring production respects configured limits while allowing some flexibility.
State Sharing: Optionally integrates with external file-sharing services to share snapshots of the blockchain state, facilitating synchronization and redundancy.
Instrumentation and Metrics: Uses tracing and telemetry utilities to instrument execution, collect metrics, and trace block flow, which aids in monitoring and debugging.
Interaction with Other System Components
Repository (
RepositoryImpl): Used extensively to retrieve and store blockchain state, optimistic states, and to access account and message databases.Shared Services (
SharedServices): Provides access to thread synchronization, cross-thread reference data services, and other node-wide shared resources.Block Producer (
TVMBlockProducer): The actual block production logic is delegated to this component, which uses the TVM executor to create blocks.External Messages (
ExternalMessagesThreadState): Manages incoming external messages that influence block production.BLS Signature Scheme (
GoshBLS): Handles cryptographic signature aggregation and verification during consensus message processing.Metrics (
BlockProductionMetrics): Collects and reports production performance metrics.External File Shares (
ExternalFileSharesBased): Used for sharing blockchain state snapshots externally.Control Channels: Use instrumented channels for thread control and data passing between block production threads and the main process.
Visual Diagram
classDiagram
class TVMBlockProducerProcess {
-node_config: Config
-blockchain_config: Arc<BlockchainConfig>
-repository: RepositoryImpl
-produced_blocks: Arc<Mutex<Vec<ProducedBlock>>>
-active_producer_thread: Option<(JoinHandle, InstrumentedSender)>
-block_produce_timeout: Arc<Mutex<Duration>>
-optimistic_state_cache: Option<Arc<OptimisticStateImpl>>
-epoch_block_keeper_data_senders: Option<InstrumentedSender<BlockKeeperData>>
-shared_services: SharedServices
-producer_node_id: NodeIdentifier
-thread_count_soft_limit: usize
-parallelization_level: usize
-block_keeper_epoch_code_hash: String
-block_keeper_preepoch_code_hash: String
-metrics: Option<BlockProductionMetrics>
-wasm_cache: WasmNodeCache
-share_service: Option<ExternalFileSharesBased>
-save_optimistic_service_sender: InstrumentedSender<OptimisticStateImpl>
+start_thread_production()
+stop_thread_production()
+produce_next()
+get_produced_blocks()
+set_timeout()
+send_epoch_message()
}
class TVMBlockProducer {
+produce()
}
TVMBlockProducerProcess "1" --> "1" TVMBlockProducer : uses
TVMBlockProducerProcess "1" --> "1" RepositoryImpl : interacts with
TVMBlockProducerProcess "1" --> "1" SharedServices : uses
TVMBlockProducerProcess "1" --> "*" ProducedBlock : manages
TVMBlockProducerProcess "1" --> "*" Thread : spawns
TVMBlockProducerProcess "1" --> "*" InstrumentedSender : sends control and data
Additional Notes on Testing
The module contains integration tests under the tests submodule which simulate a block producer process lifecycle, including starting production, receiving blocks, measuring production time, and stopping production. The test uses in-memory repositories and stub services to isolate the block production logic.