aggregated_thread_load.rs
Overview
This file defines the mechanisms to calculate and maintain an aggregated load metric over a sliding window for a particular thread within a concurrent or distributed system. It focuses primarily on tracking and updating load values associated with message queues and account activities across a sequence of blocks, enabling efficient load analysis and decision-making based on recent thread activity.
The core component is the AggregatedLoad structure, which maintains a sliding window of load snapshots and their associated account loads. This aggregated data supports operations such as obtaining the total load over the window, proposing optimized account routing bitmasks, and updating the load metrics when new blocks are appended. The file also includes a utility function snapshot_load to extract the current message queue length from a block state.
Detailed Descriptions
Type Alias
pub(super) type Load = usize;
Loadis an alias forusizerepresenting the load value measured in integer units (e.g., queue length count).
Struct: AggregatedLoad
pub(super) struct AggregatedLoad {
window: Vec<(Load, InThreadAccountsLoad)>,
cursor: usize,
aggregated_value: (Load, InThreadAccountsLoad),
is_filled: bool,
window_size: usize,
}
Purpose: Maintains a sliding window of load snapshots and aggregates their values for a specific thread.
Fields:
window: A vector holding tuples of(Load, InThreadAccountsLoad)representing the load and the associated in-thread accounts load for each position in the window.cursor: The current index in the window where the next load snapshot will be inserted.aggregated_value: The current sum/aggregate of all loads and accounts load in the window.is_filled: A boolean indicating if the window has been filled at least once (i.e., the sliding window is fully populated).window_size: The fixed size of the sliding window.
Implementation: AggregatedLoad
new(window_size: usize) -> Self
Description: Creates a new
AggregatedLoadinstance with the specified sliding window size. Initializes the window with default zero loads.Parameters:
window_size: The number of load snapshots to maintain in the sliding window.
Returns: A new
AggregatedLoadobject.Example:
let agg_load = AggregatedLoad::new(10);
is_ready(&self) -> bool
Description: Checks whether the sliding window has been completely filled.
Returns:
trueif the window is filled; otherwisefalse.Usage: Can be used to determine if aggregated statistics are reliable (i.e., based on a full window).
reset(&mut self)
Description: Resets the internal state of the sliding window, marking it as not filled and resetting the cursor to zero.
Usage: Useful when the aggregation needs to start fresh, e.g., after a significant event or reconfiguration.
load_value(&self) -> Load
Description: Returns the aggregated load value (sum of all loads in the sliding window).
Returns:
Load(usize) representing the total load.
propose_new_bitmask(&self, current_bitmask: &Bitmask<AccountRouting>) -> Option<Bitmask<AccountRouting>>
Description: Uses the aggregated
InThreadAccountsLoaddata to propose an optimized account routing bitmask based on the current bitmask.Parameters:
current_bitmask: A reference to the currentBitmask<AccountRouting>.
Returns: An
Option<Bitmask<AccountRouting>>which may contain a better routing bitmask if one is found.Usage: Enables adaptive routing decisions to optimize thread load balancing.
Notes: Relies on the
best_splitmethod ofInThreadAccountsLoad.
append_from<TOptimisticState>(&mut self, block: &AckiNackiBlock, block_state: Arc<TOptimisticState>, metrics: &Option<BlockProductionMetrics>) where TOptimisticState: OptimisticState
Description: Appends a new load snapshot from the provided block and block state, updating the sliding window and the aggregated values accordingly.
Parameters:
block: Reference to the currentAckiNackiBlockwhose load is being considered.block_state: Shared reference (Arc) to the block's optimistic state implementingOptimisticState.metrics: Optional metrics reporter to report queue sizes.
Implementation Details:
Advances the
cursorin a circular manner.Marks the window as filled once the cursor wraps around.
Retrieves the current message queue length via
snapshot_load.Updates the
windowentry at the cursor index with the new load andInThreadAccountsLoadfor the block.Adjusts the aggregate sums by adding the new values and subtracting the old ones from the replaced window slot.
Reports the queue length metric if
metricsis provided.
Usage: Should be called whenever a new block is processed in order to update the load metrics.
Example:
aggregated_load.append_from(&block, block_state.clone(), &metrics);
Function: snapshot_load<T>(block_state: Arc<T>) -> Load where T: OptimisticState
Description: Extracts the current load as the message queue length from a given optimistic block state.
Parameters:
block_state: Shared (Arc) reference to the optimistic state.
Returns: The integer load value representing the length of the internal message queue.
Usage: Used internally by
AggregatedLoadto snapshot the current thread load.Note: Delegates to the
get_internal_message_queue_length()method from theOptimisticStatetrait.
Interaction with Other Parts of the System
InThreadAccountsLoad: Used to track and aggregate the load associated with accounts in a thread. The aggregated load in the window is accumulated using this type, which supports methods likenew_from,add_in_place, andsub_in_place.Bitmask<AccountRouting>: Used in proposing improved routing bitmasks based on the aggregated account load.OptimisticState: Represents the state of a block in an optimistic concurrency model. The block state provides the current message queue length and thread ID.AckiNackiBlock: Represents a block used in the append operation to update load metrics.BlockProductionMetrics: Optional metrics collector used to report queue sizes for monitoring or analysis purposes.Sliding window logic: Maintains a fixed-size buffer of load snapshots to efficiently compute rolling aggregates without recomputing sums from scratch.
Visual Diagram
classDiagram
class AggregatedLoad {
-window: Vec<(Load, InThreadAccountsLoad)>
-cursor: usize
-aggregated_value: (Load, InThreadAccountsLoad)
-is_filled: bool
-window_size: usize
+new()
+is_ready()
+reset()
+load_value()
+propose_new_bitmask()
+append_from()
}
AggregatedLoad o-- "InThreadAccountsLoad" : uses
AggregatedLoad ..> Bitmask : proposes
AggregatedLoad ..> OptimisticState : interacts with
AggregatedLoad ..> AckiNackiBlock : consumes
AggregatedLoad ..> BlockProductionMetrics : reports to
This diagram shows the AggregatedLoad structure with its key properties and methods, and its dependencies and interactions with other types such as InThreadAccountsLoad, Bitmask, OptimisticState, AckiNackiBlock, and BlockProductionMetrics. The arrows indicate the usage or association relationships.