block_request_service.rs

Overview

This file defines the BlockRequestService component responsible for handling incoming block requests in a distributed blockchain or ledger system. It manages requests for blocks within specified ranges from other nodes, resends finalized blocks, and coordinates state synchronization processes. The service interacts with the underlying block repository, block state repository, network communication channels, and caches for unprocessed blocks, ensuring efficient and timely delivery of block data to requesting nodes.

The service runs as a dedicated thread, receiving BlockRequestParams through a channel, processing these requests by retrieving and sending the appropriate blocks, and managing synchronization requests when needed.


Key Structures and Types

BlockRequestParams

Represents parameters for a block request received from a remote node.


BlockRequestService

The main service struct responsible for processing block requests.


Functions and Methods

BlockRequestService::start

Starts the block request service thread.

pub fn start(
    config: Config,
    shared_services: SharedServices,
    repository: RepositoryImpl,
    block_state_repository: BlockStateRepository,
    network_direct_tx: NetDirectSender<NodeIdentifier, NetworkMessage>,
    metrics: Option<BlockProductionMetrics>,
    unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection,
) -> anyhow::Result<(Sender<BlockRequestParams>, JoinHandle<()>)>

BlockRequestService::on_incoming_block_request

Processes an incoming block request.

fn on_incoming_block_request(&self, params: BlockRequestParams) -> anyhow::Result<()>

BlockRequestService::inner

Core logic for fulfilling a block request.

fn inner(
    &self,
    from_inclusive: BlockSeqNo,
    to_exclusive: BlockSeqNo,
    node_id: NodeIdentifier,
    thread_id: &ThreadIdentifier,
    at_least_n_blocks: Option<usize>,
) -> anyhow::Result<()>

BlockRequestService::send_sync_from

Sends a synchronization directive message to a node.

fn send_sync_from(
    &self,
    node_id: NodeIdentifier,
    thread_id: &ThreadIdentifier,
    from_seq_no: BlockSeqNo,
) -> anyhow::Result<()>

BlockRequestService::resend_finalized

Resends a chain of finalized blocks up to a cutoff sequence number.

fn resend_finalized(
    &self,
    thread_id: &ThreadIdentifier,
    destination_node_id: NodeIdentifier,
    tail: BlockIdentifier,
    cutoff: BlockSeqNo,
) -> anyhow::Result<usize>

BlockRequestService::resend

Sends a collection of candidate blocks to a destination node.

fn resend(
    &self,
    thread_id: &ThreadIdentifier,
    destination: NodeIdentifier,
    blocks: Vec<Arc<<Self as NodeAssociatedTypes>::CandidateBlock>>,
) -> anyhow::Result<usize>

BlockRequestService::send_candidate_block

Sends a single candidate block to a node.

pub(crate) fn send_candidate_block(
    &self,
    candidate_block: <Self as NodeAssociatedTypes>::CandidateBlock,
    node_id: NodeIdentifier,
) -> anyhow::Result<()>

Important Implementation Details and Algorithms


Interaction With Other System Components


Visual Diagram: Class Structure and Method Relationships

classDiagram
class BlockRequestService {
-config: Config
-repository: RepositoryImpl
-shared_services: SharedServices
-block_state_repository: BlockStateRepository
-network_direct_tx: NetDirectSender
-rx: Receiver<BlockRequestParams>
-metrics: Option<BlockProductionMetrics>
-unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection
+start()
-on_incoming_block_request()
-inner()
-send_sync_from()
-resend_finalized()
-resend()
+send_candidate_block()
}
class BlockRequestParams {
+start: BlockSeqNo
+end: BlockSeqNo
+node_id: NodeIdentifier
+at_least_n_blocks: Option<usize>
+last_state_sync_executed: Arc<Mutex<std::time::Instant>>
+is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>
+thread_id: ThreadIdentifier
}
BlockRequestService "1" --o "1" BlockRequestParams : uses

Usage Example

// Initialize the service and get the sender handle and thread handle
let (block_request_sender, block_request_handle) = BlockRequestService::start(
    config,
    shared_services,
    repository,
    block_state_repository,
    network_direct_tx,
    Some(metrics),
    unprocessed_blocks_cache,
)?;

// Send a block request
let params = BlockRequestParams {
    start: 100,
    end: 150,
    node_id: requesting_node_id,
    at_least_n_blocks: Some(10),
    last_state_sync_executed: Arc::new(Mutex::new(std::time::Instant::now())),
    is_state_sync_requested: Arc::new(Mutex::new(None)),
    thread_id: thread_id,
};

block_request_sender.send(params)?;

This example shows how to start the service and send a block request for a range of blocks to be processed asynchronously.


References