block_request_service.rs
Overview
This file defines the BlockRequestService component responsible for handling incoming block requests in a distributed blockchain or ledger system. It manages requests for blocks within specified ranges from other nodes, resends finalized blocks, and coordinates state synchronization processes. The service interacts with the underlying block repository, block state repository, network communication channels, and caches for unprocessed blocks, ensuring efficient and timely delivery of block data to requesting nodes.
The service runs as a dedicated thread, receiving BlockRequestParams through a channel, processing these requests by retrieving and sending the appropriate blocks, and managing synchronization requests when needed.
Key Structures and Types
BlockRequestParams
Represents parameters for a block request received from a remote node.
Fields:
start: BlockSeqNo— The starting sequence number of blocks requested (inclusive).end: BlockSeqNo— The ending sequence number of blocks requested (exclusive).node_id: NodeIdentifier— The identifier of the node making the request.at_least_n_blocks: Option<usize>— Optional minimum number of blocks to send.last_state_sync_executed: Arc<Mutex<std::time::Instant>>— Timestamp of the last state synchronization executed.is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>— Optionally tracks the sequence number at which state sync was requested.thread_id: ThreadIdentifier— Identifier of the thread (shard) for which blocks are requested.
BlockRequestService
The main service struct responsible for processing block requests.
Fields:
config: Config— Configuration parameters.repository: RepositoryImpl— Repository interface for finalized blocks.shared_services: SharedServices— Shared services including throttling.block_state_repository: BlockStateRepository— Repository for block states.network_direct_tx: NetDirectSender<NodeIdentifier, NetworkMessage>— Direct network channel sender to send messages to nodes.rx: Receiver<BlockRequestParams>— Receiver channel for incoming block requests.metrics: Option<BlockProductionMetrics>— Optional metrics for block production and resends.unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection— Cache of unprocessed candidate blocks.
Functions and Methods
BlockRequestService::start
Starts the block request service thread.
pub fn start(
config: Config,
shared_services: SharedServices,
repository: RepositoryImpl,
block_state_repository: BlockStateRepository,
network_direct_tx: NetDirectSender<NodeIdentifier, NetworkMessage>,
metrics: Option<BlockProductionMetrics>,
unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection,
) -> anyhow::Result<(Sender<BlockRequestParams>, JoinHandle<()>)>
Parameters:
Configuration, repositories, shared services, network sender, optional metrics, and unprocessed block cache.
Returns:
A tuple containing a
Senderto sendBlockRequestParamsto the service and aJoinHandlefor the spawned thread.
Usage:
Call this method to initialize and run the block request service in a dedicated thread.
Implementation Details:
Creates an MPSC channel.
Spawns a thread named
"BlockRequestService".The thread listens on the receiver channel and delegates requests to
on_incoming_block_request.
BlockRequestService::on_incoming_block_request
Processes an incoming block request.
fn on_incoming_block_request(&self, params: BlockRequestParams) -> anyhow::Result<()>
Parameters:
params: Struct with block range, node ID, thread ID, and synchronization state.
Returns:
Result indicating success or failure.
Behavior:
Attempts to fulfill the request by calling
inner.On error, logs the failure.
Checks time elapsed since last state sync; if sufficient time passed, triggers state synchronization by calling
send_sync_from.
Important:
Uses locking and guarded access to shared synchronization timestamps.
Ensures that state sync is only triggered under controlled timing constraints.
BlockRequestService::inner
Core logic for fulfilling a block request.
fn inner(
&self,
from_inclusive: BlockSeqNo,
to_exclusive: BlockSeqNo,
node_id: NodeIdentifier,
thread_id: &ThreadIdentifier,
at_least_n_blocks: Option<usize>,
) -> anyhow::Result<()>
Parameters:
from_inclusive,to_exclusive: Range of block sequence numbers requested.node_id: Requesting node.thread_id: Thread/shard identifier.at_least_n_blocks: Minimum number of blocks to send.
Returns:
Result indicating success or failure.
Functionality:
Throttles requests per node.
Retrieves the last finalized block for the thread.
Resends finalized blocks starting from the last finalized block to the requested start sequence number.
Collects demanded blocks from unprocessed blocks cache and block state repository.
Sends candidate blocks that meet the demand and fall within the requested sequence range.
Implementation Notes:
Uses breadth-first search-like iteration over descendants to determine blocks to resend.
Uses guarded access to block state.
Interacts with caches and repositories to optimize block delivery.
Calls
resend_finalizedandresendhelper methods.
BlockRequestService::send_sync_from
Sends a synchronization directive message to a node.
fn send_sync_from(
&self,
node_id: NodeIdentifier,
thread_id: &ThreadIdentifier,
from_seq_no: BlockSeqNo,
) -> anyhow::Result<()>
Parameters:
node_id: Node to send sync directive.thread_id: Thread/shard ID.from_seq_no: Sequence number from which to sync.
Returns:
Result indicating success or failure.
Behavior:
Sends a
NetworkMessage::SyncFrommessage over the direct sender channel.Logs the event.
Usage:
Used internally to instruct peers to synchronize from a given block sequence number.
BlockRequestService::resend_finalized
Resends a chain of finalized blocks up to a cutoff sequence number.
fn resend_finalized(
&self,
thread_id: &ThreadIdentifier,
destination_node_id: NodeIdentifier,
tail: BlockIdentifier,
cutoff: BlockSeqNo,
) -> anyhow::Result<usize>
Parameters:
thread_id: Target thread/shard.destination_node_id: Node to send blocks to.tail: The starting block identifier (typically the last finalized block).cutoff: Sequence number cutoff; resend stops when reaching below this.
Returns:
Number of blocks sent.
Implementation Details:
Iteratively traverses the chain backward from
tailusing parent references.Collects blocks into a cache vector.
Enforces a maximum resend limit (
MAX_TO_RESEND= 200) to avoid excessive history requests.Calls
resendto send collected blocks.Uses guarded access to block states.
Error Handling:
Returns errors if blocks or states are missing or if exceeding max resend limit.
BlockRequestService::resend
Sends a collection of candidate blocks to a destination node.
fn resend(
&self,
thread_id: &ThreadIdentifier,
destination: NodeIdentifier,
blocks: Vec<Arc<<Self as NodeAssociatedTypes>::CandidateBlock>>,
) -> anyhow::Result<usize>
Parameters:
thread_id: Thread/shard for metrics.destination: Node to send blocks.blocks: Vector of candidate blocks to send.
Returns:
Number of blocks sent.
Functionality:
Reports resend metrics if enabled.
Iterates over blocks, sending each with
send_candidate_block.
Usage:
Called internally after assembling blocks to be sent.
BlockRequestService::send_candidate_block
Sends a single candidate block to a node.
pub(crate) fn send_candidate_block(
&self,
candidate_block: <Self as NodeAssociatedTypes>::CandidateBlock,
node_id: NodeIdentifier,
) -> anyhow::Result<()>
Parameters:
candidate_block: The block to send.node_id: Destination node.
Returns:
Success or error.
Behavior:
Logs sending action.
Traces block flow for diagnostics.
Sends the block wrapped in a
NetworkMessageover the direct network sender.
Usage:
Used internally by
resendand other sending operations.
Important Implementation Details and Algorithms
Throttling: Requests are throttled per node using
shared_services.throttle.Block Resending: Combines finalized blocks and cached unprocessed blocks to fulfill requests.
Breadth-First Descendant Traversal: To collect candidate blocks for resending, the service performs a breadth-first expansion of children blocks up to the requested count.
State Synchronization: When block requests fail or when conditions meet, the service triggers state synchronization by sending a
SyncFrommessage to requesting nodes, ensuring eventual consistency.Concurrency and Synchronization: Uses
Arc<Mutex<>>wrapped timestamps and optional sequence numbers to manage synchronization state safely across threads.Error Handling: Uses
anyhow::Resultfor error propagation; logs errors and may bail out on critical failures (e.g., network receiver gone).
Interaction With Other System Components
Network Layer: Uses
NetDirectSenderto sendNetworkMessageinstances directly to nodes. This is the communication channel for block and sync messages.Repositories: Interacts with
RepositoryImplfor finalized blocks andBlockStateRepositoryfor block states and metadata.Shared Services: Uses shared services to implement request throttling and potentially other cross-service utilities.
Block Cache: Uses
UnfinalizedCandidateBlockCollectionto access unprocessed candidate blocks which may not yet be finalized.Metrics: Optionally reports resend metrics via
BlockProductionMetrics.Thread Model: Operates per thread/shard (
ThreadIdentifier), ensuring sharded or parallel processing support.
Visual Diagram: Class Structure and Method Relationships
classDiagram
class BlockRequestService {
-config: Config
-repository: RepositoryImpl
-shared_services: SharedServices
-block_state_repository: BlockStateRepository
-network_direct_tx: NetDirectSender
-rx: Receiver<BlockRequestParams>
-metrics: Option<BlockProductionMetrics>
-unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection
+start()
-on_incoming_block_request()
-inner()
-send_sync_from()
-resend_finalized()
-resend()
+send_candidate_block()
}
class BlockRequestParams {
+start: BlockSeqNo
+end: BlockSeqNo
+node_id: NodeIdentifier
+at_least_n_blocks: Option<usize>
+last_state_sync_executed: Arc<Mutex<std::time::Instant>>
+is_state_sync_requested: Arc<Mutex<Option<BlockSeqNo>>>
+thread_id: ThreadIdentifier
}
BlockRequestService "1" --o "1" BlockRequestParams : uses
Usage Example
// Initialize the service and get the sender handle and thread handle
let (block_request_sender, block_request_handle) = BlockRequestService::start(
config,
shared_services,
repository,
block_state_repository,
network_direct_tx,
Some(metrics),
unprocessed_blocks_cache,
)?;
// Send a block request
let params = BlockRequestParams {
start: 100,
end: 150,
node_id: requesting_node_id,
at_least_n_blocks: Some(10),
last_state_sync_executed: Arc::new(Mutex::new(std::time::Instant::now())),
is_state_sync_requested: Arc::new(Mutex::new(None)),
thread_id: thread_id,
};
block_request_sender.send(params)?;
This example shows how to start the service and send a block request for a range of blocks to be processed asynchronously.
References
For details on
BlockSeqNo,NodeIdentifier, andThreadIdentifier, see Types.For networking message formats and channel communication, see Network Communication.
For block management and candidate block structures, see Block State Management.
For concurrency utilities like Guarded and
Mutexusage, see Concurrency Utilities.For throttling mechanisms, see Shared Services Throttling.