load_queue.rs

Overview

This file implements functionality to load and reconstruct the state of a thread message queue from a durable message storage. It primarily defines an extension method on the ThreadMessageQueueState struct to initialize and populate the message queue state by loading messages from a database (MessageDurableStorage) based on stored message ranges (MessagesRange). This process involves validating the continuity of messages, fetching tail sequences beyond the compacted history, and organizing messages per account address.

Key Components

Constants

Implementation for ThreadMessageQueueState

Function: load_state

pub fn load_state(
    db: &MessageDurableStorage,
    state_messages: &BTreeMap<
        AccountAddress,
        MessagesRange<MessageIdentifier, Arc<WrappedMessage>>,
    >,
) -> anyhow::Result<Self>
Purpose

Loads the state of a thread message queue from the durable storage, reconstructing the message ranges and tail sequences for each account address.

Parameters
Returns
Behavior and Algorithm
  1. Initializes an empty OrderSet to maintain the order of account addresses within the queue.

  2. Sets a cursor to 0 initially (used for tracking progression in the queue).

  3. Iterates over each (account_address, range) pair in state_messages.

  4. For each account:

    • Creates a MessagesRangeIterator to iterate over the compacted message range.

    • Extracts the first message identifier from the iterator, which represents the start of the compacted history.

    • Locates the database cursor corresponding to this message hash using db.get_rowid_by_hash.

    • Loads messages from the database in batches of MAX_MESSAGES starting from the located cursor:

      • For the initial batch, compares database messages with iterator messages to ensure continuity and no mismatch.

      • Subsequent messages beyond the iterator are collected into a tail sequence (VecDeque).

    • Continues loading additional batches until no more messages are retrieved.

    • Updates the message range's tail sequence with the collected messages.

    • Inserts the updated range into a new map new_state_messages.

    • Inserts the account address into the OrderSet to preserve order.

  5. Returns a new ThreadMessageQueueState instance with:

    • messages set to new_state_messages.

    • order_set reflecting the order of accounts.

    • cursor initialized to zero.

Panics
Usage Example
let db: MessageDurableStorage = ...; // Initialize storage
let state_messages: BTreeMap<AccountAddress, MessagesRange<MessageIdentifier, Arc<WrappedMessage>>> = ...; // Loaded or constructed message ranges

match ThreadMessageQueueState::load_state(&db, &state_messages) {
    Ok(queue_state) => {
        // Use the reconstructed ThreadMessageQueueState
    }
    Err(e) => {
        eprintln!("Failed to load thread message queue state: {:?}", e);
    }
}

Important Implementation Details

Interactions with Other Components

This file is a core part of the mechanism that reconstructs in-memory message queue state from persistent storage, enabling the system to resume processing or querying message queues after restarts or crashes.

Mermaid Diagram

classDiagram
class ThreadMessageQueueState {
+load_state(db: MessageDurableStorage, state_messages: BTreeMap<AccountAddress, MessagesRange>) Result<Self>
-messages: BTreeMap<AccountAddress, MessagesRange>
-order_set: OrderSet
-cursor: usize
}
class MessageDurableStorage {
+get_rowid_by_hash(hash: String) Result<Option<i64>>
+next_simple(account_hex: String, cursor: i64, limit: usize) Result<(Vec<Message>, bool)>
}
class MessagesRange {
+set_tail_sequence(tail: VecDeque<(MessageIdentifier, Arc<WrappedMessage>)>)
+tail_sequence() VecDeque<(MessageIdentifier, Arc<WrappedMessage>)>
}
class MessagesRangeIterator {
+new(db: &MessageDurableStorage, range: MessagesRange)
+next() Option<Result<(SomeType, MessageIdentifier)>>
}
class OrderSet {
+new()
+insert(account: AccountAddress)
}
ThreadMessageQueueState ..> MessageDurableStorage : uses
ThreadMessageQueueState ..> MessagesRange : manages
ThreadMessageQueueState ..> MessagesRangeIterator : iterates
ThreadMessageQueueState ..> OrderSet : maintains order