from_maps.rs
Overview
This file defines the ThreadMessageQueueStateDiff struct and implements conversion logic to produce an updated ThreadMessageQueueState. It handles the application of incremental changes (diffs) to the thread message queue state, including adding and removing accounts, producing new messages, and consuming existing messages. The core functionality ensures that the state is kept consistent with respect to message ordering, message storage, and account presence.
This file interacts primarily with:
ThreadMessageQueueStateand related types (OrderSet,AccountInbox,MessagesRange) for representing the state of message queues.MessageDurableStoragefor persistent message storage and retrieval.Message-related types such as
MessageIdentifierandWrappedMessage.Iterator utilities like
MessagesRangeIteratorfor managing ranges of messages.Logging utilities (
tracing) for instrumentation and debugging.
ThreadMessageQueueStateDiff Struct
pub struct ThreadMessageQueueStateDiff {
initial_state: ThreadMessageQueueState,
consumed_messages: HashMap<AccountAddress, HashSet<MessageIdentifier>>,
removed_accounts: Vec<AccountAddress>,
added_accounts: std::collections::BTreeMap<AccountAddress, AccountInbox>,
produced_messages: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>,
db: MessageDurableStorage,
}
Purpose
Encapsulates the incremental changes to be applied to a thread message queue state including:
initial_state: The starting state before applying the diff.consumed_messages: Messages identified as consumed (removed) per account.removed_accounts: Accounts to be removed from the state.added_accounts: New accounts and their inboxes to be added.produced_messages: New messages produced for different accounts.db: Durable storage interface used to retrieve or validate messages.
Builder Pattern
Uses typed_builder for ergonomic construction with visibility on builder methods and build method returning a Result<ThreadMessageQueueState>.
Conversion Implementation: From<ThreadMessageQueueStateDiff> for anyhow::Result<ThreadMessageQueueState>
This is the core method that applies the diff to the initial state to produce an updated state.
Parameters
val: ThreadMessageQueueStateDiff- the diff object containing changes.
Return Value
anyhow::Result<ThreadMessageQueueState>- the updated message queue state or an error if inconsistencies are detected.
Detailed Workflow
Initialize Working Copies:
Clonesmessages(aBTreeMap<AccountAddress, AccountInbox>) andorder_set(anOrderSet) from the initial state for mutation.Add Accounts:
Inserts new accounts fromadded_accountsintomessages. If the account is not already in the order set, it is inserted.Determine Intersection of Produced and Consumed Messages:
Builds a set of message IDs that appear both in produced and consumed messages to avoid processing them twice.Add Produced Messages:
For each account:Retain messages that are either not in the intersection or have a redirect header.
Extend the tail sequence of the existing inbox with new messages.
Update the compacted history range based on database lookups of message sequences.
Ensure order set is updated with new accounts if needed.
Remove Consumed Messages:
For each account:Retain only consumed messages not in the intersection.
Use
MessagesRangeIteratorto verify and update the message ranges.Remove account from
messagesandorder_setif empty after removal.
Remove Accounts:
Remove accounts listed inremoved_accountsfrommessagesandorder_set.Update Cursor:
Calculate a new cursor position based on the current size ofmessages.Construct New State:
Returns a newThreadMessageQueueStatewith updated messages, order set, and cursor.
Implementation Details and Assertions
Uses
tracingcrate for detailed instrumentation and logging spans.Includes assertions to detect "dirty state" conditions, like unexpected consumed messages or mismatched iterator sets.
Tail cloning and splitting logic ensures message sequences in memory align with persisted database state.
The method is careful about preserving message ordering and history compaction.
Usage Example
let diff = ThreadMessageQueueStateDiff::builder()
.with_initial_state(current_state)
.with_consumed_messages(consumed_map)
.with_removed_accounts(removed_vec)
.with_added_accounts(added_map)
.with_produced_messages(produced_map)
.with_db(storage_handle)
.build()?;
let updated_state: ThreadMessageQueueState = diff.into()?;
Test Module
Contains unit tests validating the behavior of ThreadMessageQueueStateDiff conversion:
test_add_account: Tests adding a new account to an empty state.
test_remove_account: Tests removing an account from the state.
test_consumed_messages: Tests removal of consumed messages from the account inbox.
test_produced_messages_out_of_order: (ignored test) Tests panic on out-of-order produced messages.
Additional commented-out test for DB-backed message production.
The tests use helper functions to create empty states and messages, illustrating typical usage patterns.
Interaction with Other Components
Message Storage: Uses
MessageDurableStoragefor persistent message retrieval and validation.Message Identifiers and Wrappers: Works with
MessageIdentifierandWrappedMessageto manage messages.Account Inbox and Message Ranges: Manipulates
AccountInboxandMessagesRangeto maintain message queues per account.Iterators: Uses
MessagesRangeIteratorfor consuming message sequences safely.ThreadMessageQueueState: The main state structure that this diff updates.
Tracing and Instrumentation: Uses
tracingfor detailed debugging and observability.
See related topics such as ThreadMessageQueueState, MessageDurableStorage, and MessagesRangeIterator for in-depth details on these components.
Mermaid Diagram: Structure of from_maps.rs
classDiagram
class ThreadMessageQueueStateDiff {
-initial_state: ThreadMessageQueueState
-consumed_messages: HashMap<AccountAddress, HashSet<MessageIdentifier>>
-removed_accounts: Vec<AccountAddress>
-added_accounts: BTreeMap<AccountAddress, AccountInbox>
-produced_messages: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>
-db: MessageDurableStorage
+from() Result<ThreadMessageQueueState>
}
ThreadMessageQueueStateDiff ..> ThreadMessageQueueState : uses
ThreadMessageQueueStateDiff ..> MessageDurableStorage : uses
ThreadMessageQueueStateDiff ..> HashMap : uses
ThreadMessageQueueStateDiff ..> HashSet : uses
ThreadMessageQueueStateDiff ..> AccountAddress : uses
ThreadMessageQueueStateDiff ..> MessageIdentifier : uses
ThreadMessageQueueStateDiff ..> WrappedMessage : uses
ThreadMessageQueueStateDiff ..> AccountInbox : uses
class ThreadMessageQueueState {
+messages: BTreeMap<AccountAddress, AccountInbox>
+order_set: OrderSet
+cursor: usize
}
class MessagesRangeIterator {
+new()
+next_range()
+remaining()
}
ThreadMessageQueueState ..> AccountInbox
AccountInbox ..> MessagesRangeIterator
This diagram visualizes the main struct and its relationships with key types used in the file. The conversion method (from) applies the diff to produce a new ThreadMessageQueueState.