shared_services.rs
Overview
The shared_services.rs file defines the SharedServices struct and its associated Container struct, which collectively manage and coordinate various multithreading and routing services within the system. It provides a centralized interface for thread synchronization, load balancing, routing, cross-thread reference data management, and rate limiting of incoming block requests from different nodes.
This file primarily facilitates the orchestration of concurrent operations related to block processing and thread management, incorporating rate limiting to control request throughput per node. It also contains mechanisms to prevent redundant event handling via internal caches of processed blocks.
Structs and Their Functionality
SharedServices
SharedServices is a clonable, thread-safe container that holds shared references to various multithreading services encapsulated within a mutex-protected Container. It also maintains rate limiting, block production metrics, and tracks the timestamp of the last block finalization event.
Fields
container: Arc<Mutex<Container>>
Thread-safe shared ownership of theContainerwhich holds the core services.metrics: Option<BlockProductionMetrics>
Optional metrics for block production, useful for monitoring and performance tracking.limiter: Arc<DefaultKeyedRateLimiter<NodeIdentifier>>
Rate limiter keyed byNodeIdentifierto throttle incoming block requests on a per-node basis.last_finalization_timestamp: Arc<AtomicU64>
Atomic timestamp recording the last finalized block time in milliseconds since the UNIX epoch.
Key Methods
start
pub fn start(
router: RoutingService,
data_dir: PathBuf,
metrics: Option<BlockProductionMetrics>,
thread_load_threshold: usize,
thread_load_window_size: usize,
rate_limit_on_incoming_block_req: u32,
thread_cnt_soft_limit: usize,
crossref_db: CrossRefStorage,
) -> Self
Initializes a new instance of SharedServices with the provided services and configuration parameters. It sets up all internal services including thread synchronization, tracking, load balancing, routing, and cross-thread reference data management.
Parameters:
router: The routing service instance for message/event routing.data_dir: Directory path for persistent cross-thread reference data storage.metrics: Optional block production metrics object.thread_load_threshold: Threshold to trigger load balancing actions.thread_load_window_size: Window size for measuring thread load.rate_limit_on_incoming_block_req: Maximum allowed incoming block requests per second, per node.thread_cnt_soft_limit: Soft upper limit on the number of threads supported by cross-thread reference data service.crossref_db: Storage backend for cross-thread reference data.
Returns:
A fully initializedSharedServicesobject.Usage Example:
let shared_services = SharedServices::start(
router_instance,
PathBuf::from("/data"),
Some(metrics),
100,
50,
500,
10,
crossref_storage_instance,
);
exec
pub fn exec<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Container) -> R,
Provides safe, synchronized access to the inner Container by locking its mutex and executing a closure f that operates on the mutable reference of the container.
Parameters:
f: Closure that receives a mutable reference toContainerand returns a result of typeR.
Returns:
The result of closuref.Details:
This method ensures thread-safe execution of operations that require mutable access to the core services. It also includes optional instrumentation to warn about long mutex lock durations when compiled with thefail_on_long_lockfeature.Usage Example:
shared_services.exec(|services| {
services.threads_tracking.some_method();
});
on_block_finalized
pub fn on_block_finalized<TOptimisticState>(
&mut self,
block: &AckiNackiBlock,
state: Arc<TOptimisticState>,
) where
TOptimisticState: OptimisticState,
Handles the event of a block being finalized. It updates internal tracking structures and notifies relevant services to update their state accordingly.
Parameters:
block: Reference to the finalized block (AckiNackiBlock).state: AnArcto an optimistic state implementation that provides thread production data.
Implementation Details:
Ensures each block finalization is processed only once using an internal cache (
dirty_hack__finalized_blocks).Updates the timestamp of the last finalization.
Invokes
thread_syncto synchronize thread state.Calls
threads_trackingto handle finalized block updates.Invokes
load_balancingto adjust load metrics based on the finalized block.
Usage Example:
shared_services.on_block_finalized(&block, optimistic_state_arc);
throttle
pub fn throttle(&self, node_id: &NodeIdentifier) -> anyhow::Result<()>
Checks if a node identified by node_id is allowed to proceed based on the configured rate limiter.
Parameters:
node_id: The identifier of the node making the request.
Returns:
Ok(())if the request is within the rate limits.An error if the rate limit is exceeded.
Usage Example:
match shared_services.throttle(&node_id) {
Ok(()) => { /* proceed */ },
Err(e) => { /* handle rate limit exceeded */ }
}
duration_since_last_finalization
pub fn duration_since_last_finalization(&self) -> std::time::Duration
Returns the duration elapsed since the last block finalization event.
Returns:
ADurationrepresenting the time since the last finalization timestamp.
Container
Container holds the concrete implementations of various services used by SharedServices. It is wrapped in a mutex within SharedServices to provide synchronized access.
Fields
threads_tracking: ThreadsTrackingService
Tracks the processing status of threads and blocks.thread_sync: ThreadSyncService
Synchronizes thread states during block finalization and other events.load_balancing: LoadBalancingService
Monitors and balances load across threads.router: RoutingService
Handles routing of messages/events between components.cross_thread_ref_data_service: CrossThreadRefDataRepository
Manages cross-thread reference data, including persistent storage.dirty_hack__appended_blocks: FixedSizeHashSet<BlockIdentifier>
Cache to prevent handling the same appended block event multiple times.dirty_hack__finalized_blocks: FixedSizeHashSet<BlockIdentifier>
Cache to prevent handling the same finalized block event multiple times.dirty_hack__invalidated_blocks: FixedSizeHashSet<BlockIdentifier>
Cache to prevent handling the same invalidated block event multiple times.
Notes
The
dirty_hack__*caches are used as a workaround to handle duplicate event calls gracefully, as repeated calls are not expected by inner services.The
routerfield is public due to legacy reasons but is intended to be private.
Important Implementation Details
Rate Limiting:
The rate limiter is implemented using thegovernorcrate'sDefaultKeyedRateLimiter, keyed byNodeIdentifier. It enforces per-node request limits specified at initialization time.Thread Safety:
The services container is protected by aMutexwithin anArc, allowing safe concurrent access and cloning ofSharedServices. The use of atomics enables lock-free timestamp updates.Block Finalization Handling:
Finalization events are guarded by caching finalized block identifiers to prevent redundant processing. Upon finalization, multiple services (thread_sync,threads_tracking,load_balancing) are updated to reflect the new state.Metrics and Monitoring:
Optional block production metrics can be passed in and are used by load balancing and tracking services.Testing Utilities:
Thetest_startmethod provides an easy way to instantiateSharedServiceswith default testing parameters, useful for unit tests.
Interaction with Other System Components
RoutingService:
Facilitates message/event routing; integrated into the container and used extensively bythreads_trackingandload_balancingservices.ThreadsTrackingService:
Tracks the progress and status of blocks across threads; invoked during block finalization events.ThreadSyncService:
Ensures thread states are synchronized during finalization.LoadBalancingService:
Collects thread load metrics and balances processing load; reacts to block finalization to update its metrics.CrossThreadRefDataRepository:
Manages persistent cross-thread reference data stored on disk or memory as configured.Block and Thread Types:
UsesAckiNackiBlock,BlockIdentifier, andThreadIdentifiertypes for block and thread identification.Rate Limiter:
Limits incoming request rates per node to protect system resources and prevent abuse.
Visual Diagram of SharedServices Structure
classDiagram
class SharedServices {
+metrics: Option<BlockProductionMetrics>
+last_finalization_timestamp: AtomicU64
+throttle()
+on_block_finalized()
+exec()
}
class Container {
+threads_tracking: ThreadsTrackingService
+thread_sync: ThreadSyncService
+load_balancing: LoadBalancingService
+router: RoutingService
+cross_thread_ref_data_service: CrossThreadRefDataRepository
+dirty_hack__appended_blocks: FixedSizeHashSet
+dirty_hack__finalized_blocks: FixedSizeHashSet
+dirty_hack__invalidated_blocks: FixedSizeHashSet
}
SharedServices "1" *-- "1" Container : contains
SharedServices o-- DefaultKeyedRateLimiter : limiter
Container --> ThreadsTrackingService
Container --> ThreadSyncService
Container --> LoadBalancingService
Container --> RoutingService
Container --> CrossThreadRefDataRepository
Testing
The module includes unit tests for the throttling functionality:
throttling_one_thread_no_limits
Verifies that when rate limiting is set to maximum (u32::MAX), no requests get throttled.throttling_two_threads_gt_then_limit
Tests rate limiting behavior with two different nodes, ensuring that requests exceeding the limit are throttled for one node while the other node remains unaffected. It also checks that after a cooldown period, requests are allowed again.
These tests demonstrate the correctness of the rate limiting mechanism under concurrent access conditions.