load_queue.rs
Overview
This file implements functionality to load and reconstruct the state of a thread message queue from a durable message storage. It primarily defines an extension method on the ThreadMessageQueueState struct to initialize and populate the message queue state by loading messages from a database (MessageDurableStorage) based on stored message ranges (MessagesRange). This process involves validating the continuity of messages, fetching tail sequences beyond the compacted history, and organizing messages per account address.
Key Components
Constants
MAX_MESSAGES: usize = 100
Defines the batch size limit for fetching messages from the durable storage in one database query during state loading.
Implementation for ThreadMessageQueueState
Function: load_state
pub fn load_state(
db: &MessageDurableStorage,
state_messages: &BTreeMap<
AccountAddress,
MessagesRange<MessageIdentifier, Arc<WrappedMessage>>,
>,
) -> anyhow::Result<Self>
Purpose
Loads the state of a thread message queue from the durable storage, reconstructing the message ranges and tail sequences for each account address.
Parameters
db: &MessageDurableStorage
Reference to the durable storage backend used to retrieve persisted messages.state_messages: &BTreeMap<AccountAddress, MessagesRange<MessageIdentifier, Arc<WrappedMessage>>>
A mapping from account addresses to message ranges representing compacted or partially loaded message histories.
Returns
anyhow::Result<Self>
Returns aThreadMessageQueueStateinstance on success or an error if loading fails.
Behavior and Algorithm
Initializes an empty
OrderSetto maintain the order of account addresses within the queue.Sets a cursor to
0initially (used for tracking progression in the queue).Iterates over each
(account_address, range)pair instate_messages.For each account:
Creates a
MessagesRangeIteratorto iterate over the compacted message range.Extracts the first message identifier from the iterator, which represents the start of the compacted history.
Locates the database cursor corresponding to this message hash using
db.get_rowid_by_hash.Loads messages from the database in batches of
MAX_MESSAGESstarting from the located cursor:For the initial batch, compares database messages with iterator messages to ensure continuity and no mismatch.
Subsequent messages beyond the iterator are collected into a tail sequence (
VecDeque).
Continues loading additional batches until no more messages are retrieved.
Updates the message range's tail sequence with the collected messages.
Inserts the updated range into a new map
new_state_messages.Inserts the account address into the
OrderSetto preserve order.
Returns a new
ThreadMessageQueueStateinstance with:messagesset tonew_state_messages.order_setreflecting the order of accounts.cursorinitialized to zero.
Panics
If the initial message range is empty.
If the start of the range is not found in the database.
If there is a mismatch between expected messages in the iterator and messages retrieved from the database.
Usage Example
let db: MessageDurableStorage = ...; // Initialize storage
let state_messages: BTreeMap<AccountAddress, MessagesRange<MessageIdentifier, Arc<WrappedMessage>>> = ...; // Loaded or constructed message ranges
match ThreadMessageQueueState::load_state(&db, &state_messages) {
Ok(queue_state) => {
// Use the reconstructed ThreadMessageQueueState
}
Err(e) => {
eprintln!("Failed to load thread message queue state: {:?}", e);
}
}
Important Implementation Details
The method ensures message continuity by comparing messages fetched from the database with messages expected from the stored range iterator; any mismatch triggers a panic.
Messages beyond the stored ranges (tail sequences) are fetched iteratively in batches to avoid loading large amounts of data at once.
The
Arc<WrappedMessage>wrapper is used to enable shared ownership and thread-safe references to message data.The
OrderSetmaintains the order of account addresses, which might be important for queue processing logic (seeOrderSetandThreadMessageQueueState).Uses
BTreeMapfor deterministic ordering of accounts.The function returns an error in case database operations fail, leveraging the
anyhowcrate for error propagation.
Interactions with Other Components
MessageDurableStorage: The persistent storage interface used to fetch message data and obtain row IDs by message hash.MessagesRangeandMessagesRangeIterator: Used to represent and iterate over ranges of messages per account.MessageIdentifier: Represents unique identifiers for messages, used for matching and ordering.WrappedMessage: A wrapper around message data containing deserialized message content.OrderSet: Maintains the order of accounts in the message queue state.ThreadMessageQueueState: The struct extended by this file to implement loading functionality.
This file is a core part of the mechanism that reconstructs in-memory message queue state from persistent storage, enabling the system to resume processing or querying message queues after restarts or crashes.
Mermaid Diagram
classDiagram
class ThreadMessageQueueState {
+load_state(db: MessageDurableStorage, state_messages: BTreeMap<AccountAddress, MessagesRange>) Result<Self>
-messages: BTreeMap<AccountAddress, MessagesRange>
-order_set: OrderSet
-cursor: usize
}
class MessageDurableStorage {
+get_rowid_by_hash(hash: String) Result<Option<i64>>
+next_simple(account_hex: String, cursor: i64, limit: usize) Result<(Vec<Message>, bool)>
}
class MessagesRange {
+set_tail_sequence(tail: VecDeque<(MessageIdentifier, Arc<WrappedMessage>)>)
+tail_sequence() VecDeque<(MessageIdentifier, Arc<WrappedMessage>)>
}
class MessagesRangeIterator {
+new(db: &MessageDurableStorage, range: MessagesRange)
+next() Option<Result<(SomeType, MessageIdentifier)>>
}
class OrderSet {
+new()
+insert(account: AccountAddress)
}
ThreadMessageQueueState ..> MessageDurableStorage : uses
ThreadMessageQueueState ..> MessagesRange : manages
ThreadMessageQueueState ..> MessagesRangeIterator : iterates
ThreadMessageQueueState ..> OrderSet : maintains order