thread_state.rs
Overview
This file manages the state and lifecycle of external messages within a particular thread context. It encapsulates the queuing, caching, processing, and feedback mechanisms for external messages that interact with the system. The core component, ExternalMessagesThreadState, maintains an internal queue of external messages, enforces cache size limits, supports message insertion and removal, and reports metrics related to message handling.
The file leverages thread-safe structures (Arc, Mutex) to allow concurrent access and modification of the message queue. It also integrates with feedback mechanisms to handle cases where the queue overflows, sending notifications about discarded messages.
Main Structures and Their Roles
ExternalMessagesThreadStateConfig
A builder-configured structure used to initialize an ExternalMessagesThreadState instance. It provides typed and validated construction of the thread state with necessary dependencies and configuration parameters.
Fields:
report_metrics: Option<BlockProductionMetrics>— Optional metrics reporter for block production statistics.thread_id: ThreadIdentifier— Identifier for the thread this state is associated with.cache_size: usize— Maximum allowed size of the external message cache.feedback_sender: InstrumentedSender<ExtMsgFeedbackList>— Channel sender to communicate feedback about external messages.
Usage:
Constructed using the builder pattern for type safety and optional parameters.
Converts into an
ExternalMessagesThreadStateinstance via From trait implementation.
ExternalMessagesThreadState
Represents the runtime state of external messages for a given thread. It encapsulates:
An internal queue (
ExternalMessagesQueue) guarded by aMutexfor safe concurrent access.Configuration parameters such as cache size and thread ID.
Optional metrics reporting integration.
Feedback sending capability for overflow or other message-related notifications.
Key Fields:
queue: Arc<Mutex<ExternalMessagesQueue>>— Thread-safe queue of external messages.report_metrics: Option<BlockProductionMetrics>— Optional metrics reporter.thread_id: ThreadIdentifier— Thread ID for identification.cache_size: usize— Maximum cache size.feedback_sender: InstrumentedSender<ExtMsgFeedbackList>— Feedback communication channel.
Methods and Functions
Builder Pattern
ExternalMessagesThreadState::builder() -> ExternalMessagesThreadStateBuilderReturns a builder instance to construct an
ExternalMessagesThreadStateusing configuration options.Example:
let thread_state = ExternalMessagesThreadState::builder() .with_thread_id(thread_id) .with_cache_size(1000) .with_feedback_sender(feedback_sender) .build()?;
push_external_messages
Signature:
pub fn push_external_messages(&self, messages: &[WrappedMessage]) -> anyhow::Result<()>Description:
Adds a batch of external messages to the internal queue, respecting the configured cache size limit. If the queue is full or near capacity, excess messages are not pushed and corresponding overflow feedback is generated and sent via the feedback channel.Parameters:
messages: Slice ofWrappedMessageobjects representing the external messages to enqueue.
Returns:
Result<(), anyhow::Error>indicating success or failure.
Behavior and Implementation Details:
Captures the current timestamp (
Utc::now()) to timestamp incoming messages.Acquires a mutable lock on the queue and determines how many messages can be accepted.
Pushes as many messages as allowed; the remainder are treated as overflow.
Generates overflow feedback messages for dropped messages using
create_queue_overflow_feedback.Sends feedback via
feedback_sender.Reports metric updates about queue size if metric reporting is enabled.
Usage Example:
thread_state.push_external_messages(&incoming_messages)?;
erase_processed
Signature:
pub fn erase_processed(&self, processed: &[Stamp]) -> anyhow::Result<()>Description:
Removes external messages from the queue that have been processed, identified by their unique stamps.Parameters:
processed: Slice ofStampidentifiers for messages to remove.
Returns:
Result<(), anyhow::Error>indicating success or failure.
Implementation Details:
Acquires a mutable lock on the queue to erase processed messages.
Updates metrics regarding queue size after removal.
Logs trace information for debugging.
Usage Example:
thread_state.erase_processed(&processed_stamps)?;
get_remaining_external_messages
Signature:
pub fn get_remaining_external_messages(&self) -> anyhow::Result<HashMap<AccountAddress, VecDeque<(Stamp, Message)>>>Description:
Retrieves a mapping of all currently unprocessed external messages grouped by their destination account address.Returns:
Result<HashMap<AccountAddress, VecDeque<(Stamp, Message)>>, anyhow::Error>containing the grouped unprocessed messages.
Implementation Details:
Acquires a shared lock (
guarded) on the queue to safely access the messages.Returns a clone or reference to the internal data structure representing unprocessed messages.
Usage Example:
let remaining_msgs = thread_state.get_remaining_external_messages()?;
Important Implementation Details and Algorithms
Thread Safety:
ExternalMessagesThreadStateuses anArc<Mutex<ExternalMessagesQueue>>to ensure safe concurrent access and modification of the message queue across multiple threads.Cache Size Management:
Thepush_external_messagesmethod enforces a strict limit on the number of messages stored. It calculates the available capacity by subtracting the current queue size from the configuredcache_size, then accepts only as many messages as fit, dropping the rest.Overflow Feedback Generation:
Messages that cannot be enqueued due to cache overflow are wrapped into feedback messages indicating queue overflow. This is done via thecreate_queue_overflow_feedbackfunction, which generates appropriate feedback for each dropped message and sends them collectively through thefeedback_sender.Metrics Integration:
The file optionally integrates withBlockProductionMetricsto report the size of the external messages queue after insertions and removals. This facilitates monitoring and diagnostics related to block production and message handling.Timestamping:
Incoming messages are timestamped using the current UTC time at the moment they are pushed to the queue, aiding in time-based processing or expiration logic.
Interactions with Other Components
ExternalMessagesQueue:
The internal queue structure managing the storage, retrieval, and removal of external messages. This file wraps and controls access to the queue.create_queue_overflow_feedback:
A helper function that constructs feedback messages when the queue overflows, ensuring dropped messages are properly acknowledged.InstrumentedSender:
Used for sending feedback messages asynchronously, integrating with the telemetry or messaging system.BlockProductionMetrics:
Optional metrics reporter used to track queue size changes for performance monitoring.WrappedMessage, Stamp, Message:
Data types representing external messages and their unique identifiers involved in queue operations.ThreadIdentifier:
Identifies the specific thread context for which this external messages state is maintained, allowing per-thread message state isolation.
Visual Diagram: Class Structure of ExternalMessagesThreadState
classDiagram
class ExternalMessagesThreadState {
-queue: Arc<Mutex<ExternalMessagesQueue>>
-report_metrics: Option<BlockProductionMetrics>
-thread_id: ThreadIdentifier
-cache_size: usize
-feedback_sender: InstrumentedSender<ExtMsgFeedbackList>
+builder()
+push_external_messages()
+erase_processed()
+get_remaining_external_messages()
}
class ExternalMessagesThreadStateConfig {
-report_metrics: Option<BlockProductionMetrics>
-thread_id: ThreadIdentifier
-cache_size: usize
-feedback_sender: InstrumentedSender<ExtMsgFeedbackList>
+build()
}
ExternalMessagesThreadStateConfig --> ExternalMessagesThreadState : builds >
This diagram displays the two main structs: the configuration builder (ExternalMessagesThreadStateConfig) and the runtime state (ExternalMessagesThreadState), with their primary fields and public methods, illustrating the builder relationship.