queue.rs
Overview
The queue.rs file defines and implements the ExternalMessagesQueue struct, which manages a collection of external messages associated with unique stamps. The queue supports operations such as adding new messages, removing processed messages, and retrieving unprocessed messages grouped by account address. This functionality is critical for systems that handle ordered processing of external messages, ensuring messages are tracked by a timestamped index and organized per destination account.
Structs and Types
ExternalMessagesQueue
A data structure that holds external messages in an ordered map indexed by a Stamp. Each entry associates a Stamp with a tuple containing the destination account address and the wrapped message.
Fields:
messages: BTreeMap<Stamp, (AccountAddress, WrappedMessage)>
Stores the messages keyed by their uniqueStamp. TheBTreeMapensures the messages are kept in sorted order by the stamp.last_index: u64
Tracks the last used index for stamps to assign the next sequential stamp index.
Derives:
Getters- Provides getter methods for the struct fields.Debug - Enables formatting the struct with debug output.
Public Methods
empty() -> ExternalMessagesQueue
Creates and returns a new empty ExternalMessagesQueue.
Usage:
let queue = ExternalMessagesQueue::empty();Returns:
An instance ofExternalMessagesQueuewith no messages andlast_indexset to zero.
erase_processed(&mut self, processed: &[Stamp])
Removes messages from the queue that have been processed, identified by their Stamps.
Parameters:
processed: Slice ofStampvalues representing messages that have already been handled.
Behavior:
Converts the slice into aBTreeSetfor efficient membership checking, then retains only those messages whose stamp is not in the set.Usage:
let processed_stamps = vec![stamp1, stamp2]; queue.erase_processed(&processed_stamps);
push_external_messages(&mut self, messages: &[WrappedMessage], timestamp: DateTime<Utc>)
Adds a batch of external messages to the queue, assigning sequential stamps with the provided timestamp.
Parameters:
messages: Slice ofWrappedMessageinstances to be added.timestamp: ADateTime<Utc>timestamp to associate with all new messages.
Behavior:
For each message:Increments the internal
last_index.Creates a
Stampwith the current index and the provided timestamp.Inserts the message into the
messagesmap with the associatedAccountAddressextracted from the message.
Usage:
queue.push_external_messages(&new_messages, Utc::now());
unprocessed_messages(&self) -> HashMap<AccountAddress, VecDeque<(Stamp, Message)>>
Retrieves all unprocessed messages grouped by their destination account addresses.
Returns:
AHashMapwhere:Keys are
AccountAddressvalues.Values are
VecDeques containing tuples of(Stamp, Message)in insertion order.
Behavior:
Iterates over all messages inmessages, grouping them by account address into queues preserving order.Usage:
let grouped_messages = queue.unprocessed_messages(); if let Some(queue) = grouped_messages.get(&account) { // process queue of messages }
Important Implementation Details
The
messagesfield uses aBTreeMapkeyed byStamp, which combines an index and timestamp to guarantee message ordering and uniqueness.last_indexis incremented on each insertion to maintain a unique and sequential stamp index.The
erase_processedmethod efficiently removes multiple processed messages by constructing aBTreeSetfor membership checking, leveraging the sorted properties of the collections.Message grouping in
unprocessed_messagesuses aHashMapmappingAccountAddresstoVecDeque, preserving insertion order for each account's messages, enabling FIFO processing per account.
Interactions with Other Parts of the System
The
ExternalMessagesQueuedepends on several external types:Stamp(fromexternal_messages::stamp) representing the unique identifier with index and timestamp.AccountAddress(fromtypes) identifying the destination account for each message.WrappedMessage(frommessage) encapsulating the actual message data.Message(fromtvm_block) which is the inner message type extracted fromWrappedMessage.
The queue is likely used in the context of message processing pipelines where external messages must be tracked, ordered, and dispatched to specific accounts.
Interaction with timestamping via
chrono::DateTime<Utc>ensures temporal ordering and tracking.
Visual Diagram
classDiagram
class ExternalMessagesQueue {
-messages: BTreeMap<Stamp, (AccountAddress, WrappedMessage)>
-last_index: u64
+empty()
+erase_processed(processed: &[Stamp])
+push_external_messages(messages: &[WrappedMessage], timestamp: DateTime<Utc>)
+unprocessed_messages() HashMap<AccountAddress, VecDeque<(Stamp, Message)>>
}
The diagram shows the ExternalMessagesQueue struct with its private fields and public methods, illustrating its core responsibility as a container and manager of timestamped external messages keyed by stamps and grouped by account addresses.