shared_services.rs

Overview

The shared_services.rs file defines the SharedServices struct and its associated Container struct, which collectively manage and coordinate various multithreading and routing services within the system. It provides a centralized interface for thread synchronization, load balancing, routing, cross-thread reference data management, and rate limiting of incoming block requests from different nodes.

This file primarily facilitates the orchestration of concurrent operations related to block processing and thread management, incorporating rate limiting to control request throughput per node. It also contains mechanisms to prevent redundant event handling via internal caches of processed blocks.


Structs and Their Functionality

SharedServices

SharedServices is a clonable, thread-safe container that holds shared references to various multithreading services encapsulated within a mutex-protected Container. It also maintains rate limiting, block production metrics, and tracks the timestamp of the last block finalization event.

Fields

Key Methods

start
pub fn start(
    router: RoutingService,
    data_dir: PathBuf,
    metrics: Option<BlockProductionMetrics>,
    thread_load_threshold: usize,
    thread_load_window_size: usize,
    rate_limit_on_incoming_block_req: u32,
    thread_cnt_soft_limit: usize,
    crossref_db: CrossRefStorage,
) -> Self

Initializes a new instance of SharedServices with the provided services and configuration parameters. It sets up all internal services including thread synchronization, tracking, load balancing, routing, and cross-thread reference data management.

let shared_services = SharedServices::start(
    router_instance,
    PathBuf::from("/data"),
    Some(metrics),
    100,
    50,
    500,
    10,
    crossref_storage_instance,
);
exec
pub fn exec<F, R>(&mut self, f: F) -> R
where
    F: FnOnce(&mut Container) -> R,

Provides safe, synchronized access to the inner Container by locking its mutex and executing a closure f that operates on the mutable reference of the container.

shared_services.exec(|services| {
    services.threads_tracking.some_method();
});
on_block_finalized
pub fn on_block_finalized<TOptimisticState>(
    &mut self,
    block: &AckiNackiBlock,
    state: Arc<TOptimisticState>,
) where
    TOptimisticState: OptimisticState,

Handles the event of a block being finalized. It updates internal tracking structures and notifies relevant services to update their state accordingly.

shared_services.on_block_finalized(&block, optimistic_state_arc);
throttle
pub fn throttle(&self, node_id: &NodeIdentifier) -> anyhow::Result<()>

Checks if a node identified by node_id is allowed to proceed based on the configured rate limiter.

match shared_services.throttle(&node_id) {
    Ok(()) => { /* proceed */ },
    Err(e) => { /* handle rate limit exceeded */ }
}
duration_since_last_finalization
pub fn duration_since_last_finalization(&self) -> std::time::Duration

Returns the duration elapsed since the last block finalization event.


Container

Container holds the concrete implementations of various services used by SharedServices. It is wrapped in a mutex within SharedServices to provide synchronized access.

Fields

Notes


Important Implementation Details


Interaction with Other System Components


Visual Diagram of SharedServices Structure

classDiagram
class SharedServices {
+metrics: Option<BlockProductionMetrics>
+last_finalization_timestamp: AtomicU64
+throttle()
+on_block_finalized()
+exec()
}
class Container {
+threads_tracking: ThreadsTrackingService
+thread_sync: ThreadSyncService
+load_balancing: LoadBalancingService
+router: RoutingService
+cross_thread_ref_data_service: CrossThreadRefDataRepository
+dirty_hack__appended_blocks: FixedSizeHashSet
+dirty_hack__finalized_blocks: FixedSizeHashSet
+dirty_hack__invalidated_blocks: FixedSizeHashSet
}
SharedServices "1" *-- "1" Container : contains
SharedServices o-- DefaultKeyedRateLimiter : limiter
Container --> ThreadsTrackingService
Container --> ThreadSyncService
Container --> LoadBalancingService
Container --> RoutingService
Container --> CrossThreadRefDataRepository

Testing

The module includes unit tests for the throttling functionality:

These tests demonstrate the correctness of the rate limiting mechanism under concurrent access conditions.


Related Topics