process.rs

Overview

This file implements the core logic for block production within a multi-threaded blockchain node environment. It defines the TVMBlockProducerProcess struct, which manages block production threads, state transitions, and communication with other system components such as repositories, shared services, and external message queues.

The primary responsibilities include managing the lifecycle of block production threads, orchestrating the production of new blocks using the TVM (TON Virtual Machine) executor, aggregating acknowledgments (acks) and negative acknowledgments (nacks) from other nodes, handling cross-thread references, and coordinating synchronization with external state sharing services.

Main Structs and Enums

TVMBlockProducerProcess

A builder-constructible struct that encapsulates state and services required for producing blocks in the TVM environment.

Fields

Usage

TVMBlockProducerProcess is instantiated via the builder pattern and manages starting, stopping, and controlling block production for individual threads in the blockchain node.


ProcudeNextResult

An enum representing the result of attempting to produce the next block.

Variants:


Key Methods and Functions

produce_next

fn produce_next(
    node_config: Config,
    initial_state: &mut OptimisticStateImpl,
    blockchain_config: Arc<BlockchainConfig>,
    producer_node_id: NodeIdentifier,
    thread_count_soft_limit: usize,
    parallelization_level: usize,
    block_keeper_epoch_code_hash: String,
    block_keeper_preepoch_code_hash: String,
    produced_blocks: Arc<Mutex<Vec<ProducedBlock>>>,
    timeout: Arc<Mutex<Duration>>,
    timeout_correction: &mut ProductionTimeoutCorrection,
    thread_id_clone: ThreadIdentifier,
    epoch_block_keeper_data_rx: &InstrumentedReceiver<BlockKeeperData>,
    shared_services: &mut SharedServices,
    active_block_producer_threads: &mut Vec<(Cell, ActiveThread)>,
    received_acks: Arc<Mutex<Vec<Envelope<GoshBLS, AckData>>>>,
    received_nacks: Arc<Mutex<Vec<Envelope<GoshBLS, NackData>>>>,
    block_state_repo: BlockStateRepository,
    accounts_repo: AccountsRepository,
    external_control_rx: &InstrumentedReceiver<()>,
    metrics: Option<BlockProductionMetrics>,
    wasm_cache: WasmNodeCache,
    external_messages_queue: &mut ExternalMessagesThreadState,
    repository: &RepositoryImpl,
    is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>,
    share_service: Option<ExternalFileSharesBased>,
    round: BlockRound,
    parent_block_state: BlockState,
) -> anyhow::Result<(ProcudeNextResult, BlockState)>

Description

Executes one iteration of the block production process. This involves:

Parameters

Returns

An anyhow::Result containing a tuple:

Example Usage

let (result, new_state) = TVMBlockProducerProcess::produce_next(
    node_config,
    &mut initial_state,
    blockchain_config,
    producer_node_id,
    thread_count_soft_limit,
    parallelization_level,
    epoch_code_hash,
    preepoch_code_hash,
    produced_blocks,
    timeout,
    &mut timeout_correction,
    thread_id,
    &epoch_data_rx,
    &mut shared_services,
    &mut active_threads,
    received_acks,
    received_nacks,
    block_state_repo,
    accounts_repo,
    &external_control_rx,
    metrics,
    wasm_cache,
    &mut external_messages,
    &repository,
    is_state_sync_requested,
    share_service,
    round,
    parent_block_state,
)?;

start_thread_production

pub fn start_thread_production(
    &mut self,
    thread_id: &ThreadIdentifier,
    prev_block_id: &BlockIdentifier,
    received_acks: Arc<Mutex<Vec<Envelope<GoshBLS, AckData>>>>,
    received_nacks: Arc<Mutex<Vec<Envelope<GoshBLS, NackData>>>>,
    block_state_repository: BlockStateRepository,
    external_messages: ExternalMessagesThreadState,
    is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>,
    initial_round: BlockRound,
) -> anyhow::Result<()>

Description

Starts a new block production thread for the given blockchain thread identifier. It initializes the optimistic state either from cache or repository, sets up necessary communication channels, and spawns a dedicated thread that runs an infinite loop producing blocks until an interrupt signal is received.

Parameters

Returns

anyhow::Result<()> indicating success or failure in starting the production thread.

Implementation Details


stop_thread_production

pub fn stop_thread_production(&mut self, thread_id: &ThreadIdentifier) -> anyhow::Result<()>

Description

Stops the active block production thread for the specified blockchain thread. Sends a control signal to terminate the production loop, waits for thread completion, caches the last optimistic state, and clears produced blocks.

Parameters

Returns

anyhow::Result<()> indicating success or failure.


get_produced_blocks

pub fn get_produced_blocks(&mut self) -> Vec<ProducedBlock>

Description

Retrieves and clears the list of blocks produced by the process. If the active production thread has finished, it clears the active thread handle and returns an empty vector.

The returned blocks are sorted by sequence number, and their memento initialization time is updated to the current instant.

Returns

A sorted Vec<ProducedBlock> containing blocks produced so far.


set_timeout

pub fn set_timeout(&mut self, timeout: Duration)

Description

Sets the block production timeout duration used to limit production time.

Parameters


send_epoch_message

pub fn send_epoch_message(&self, data: BlockKeeperData)

Description

Sends epoch-specific data to the block production thread via the configured channel.

Parameters


Helper Functions

aggregate_acks

fn aggregate_acks(
    received_acks: Vec<Envelope<GoshBLS, AckData>>
) -> anyhow::Result<Vec<Envelope<GoshBLS, AckData>>>

Aggregates multiple acknowledgment envelopes by merging signature occurrences and combining aggregated BLS signatures.

aggregate_nacks

fn aggregate_nacks(
    received_nacks: Vec<Envelope<GoshBLS, NackData>>
) -> anyhow::Result<Vec<Envelope<GoshBLS, NackData>>>

Aggregates negative acknowledgment envelopes similarly to aggregate_acks. Note that the aggregation logic for nacks is marked with a TODO indicating potential limitations in aggregation based solely on block IDs.


Important Implementation Details and Algorithms


Interaction with Other System Components


Visual Diagram

classDiagram
class TVMBlockProducerProcess {
-node_config: Config
-blockchain_config: Arc<BlockchainConfig>
-repository: RepositoryImpl
-produced_blocks: Arc<Mutex<Vec<ProducedBlock>>>
-active_producer_thread: Option<(JoinHandle, InstrumentedSender)>
-block_produce_timeout: Arc<Mutex<Duration>>
-optimistic_state_cache: Option<Arc<OptimisticStateImpl>>
-epoch_block_keeper_data_senders: Option<InstrumentedSender<BlockKeeperData>>
-shared_services: SharedServices
-producer_node_id: NodeIdentifier
-thread_count_soft_limit: usize
-parallelization_level: usize
-block_keeper_epoch_code_hash: String
-block_keeper_preepoch_code_hash: String
-metrics: Option<BlockProductionMetrics>
-wasm_cache: WasmNodeCache
-share_service: Option<ExternalFileSharesBased>
-save_optimistic_service_sender: InstrumentedSender<OptimisticStateImpl>
+start_thread_production()
+stop_thread_production()
+produce_next()
+get_produced_blocks()
+set_timeout()
+send_epoch_message()
}
class TVMBlockProducer {
+produce()
}
TVMBlockProducerProcess "1" --> "1" TVMBlockProducer : uses
TVMBlockProducerProcess "1" --> "1" RepositoryImpl : interacts with
TVMBlockProducerProcess "1" --> "1" SharedServices : uses
TVMBlockProducerProcess "1" --> "*" ProducedBlock : manages
TVMBlockProducerProcess "1" --> "*" Thread : spawns
TVMBlockProducerProcess "1" --> "*" InstrumentedSender : sends control and data

Additional Notes on Testing

The module contains integration tests under the tests submodule which simulate a block producer process lifecycle, including starting production, receiving blocks, measuring production time, and stopping production. The test uses in-memory repositories and stub services to isolate the block production logic.


References