postprocessing.rs
Overview
This file defines the postprocess function, which performs critical state update operations after processing a block of transactions/messages within a threaded shard environment. It updates the optimistic state of accounts, message queues, and thread references, handles internal message routing between threads, manages account lifecycle events (such as unloading or redirecting accounts), and prepares data for cross-thread communication.
The postprocess function is a core part of the state transition logic that ensures consistency and correctness of the optimistic shard state in a multi-threaded blockchain or distributed ledger context.
Function: postprocess
pub fn postprocess(
mut initial_optimistic_state: OptimisticStateImpl,
consumed_internal_messages: HashMap<AccountAddress, HashSet<MessageIdentifier>>,
mut produced_internal_messages_to_the_current_thread: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>,
accounts_that_changed_their_dapp_id: HashMap<AccountRouting, Option<WrappedAccount>>,
block_id: BlockIdentifier,
block_seq_no: BlockSeqNo,
mut new_state: OptimisticShardState,
mut produced_internal_messages_to_other_threads: HashMap<AccountRouting, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>,
block_info: BlockInfo,
thread_id: ThreadIdentifier,
threads_table: ThreadsTable,
block_accounts: HashSet<AccountAddress>,
accounts_repo: AccountsRepository,
db: MessageDurableStorage,
#[cfg(feature = "monitor-accounts-number")] updated_accounts_number: u64,
) -> anyhow::Result<(OptimisticStateImpl, CrossThreadRefData)>
Purpose
The postprocess function finalizes the optimistic shard state after processing a set of messages and account changes for a given block and thread. It performs the following main tasks:
Synchronizes and sorts produced internal messages while removing those consumed.
Updates thread references to track the latest block processed by the thread.
Updates message queues to reflect consumed and produced messages.
Handles accounts that have changed their decentralized application (DApp) IDs, potentially removing some from the current thread's state.
Performs account unloading based on configured thresholds to reduce memory footprint.
Updates the optimistic state with new block metadata and caches.
Constructs cross-thread reference data for communication between threads.
Parameters
initial_optimistic_state: OptimisticStateImpl
The initial optimistic state containing messages, accounts, thread references, etc., before postprocessing.consumed_internal_messages: HashMap<AccountAddress, HashSet<MessageIdentifier>>
A map of addresses to sets of message identifiers that were consumed during block processing.produced_internal_messages_to_the_current_thread: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>
Internal messages produced for the current thread during block processing.accounts_that_changed_their_dapp_id: HashMap<AccountRouting, Option<WrappedAccount>>
Accounts that changed their DApp ID, potentially affecting routing and threading.block_id: BlockIdentifier
Identifier of the current block being processed.block_seq_no: BlockSeqNo
Sequence number of the current block.new_state: OptimisticShardState
The optimistic shard state before applying the postprocessing changes.produced_internal_messages_to_other_threads: HashMap<AccountRouting, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>
Internal messages produced for other threads.block_info: BlockInfo
Metadata about the current block.thread_id: ThreadIdentifier
The identifier of the current processing thread.threads_table: ThreadsTable
Table containing information about all threads.block_accounts: HashSet<AccountAddress>
Set of accounts involved in the current block.accounts_repo: AccountsRepository
Repository interface for account storage and retrieval.db: MessageDurableStorage
Durable storage interface for messages.updated_accounts_number: u64(conditional with feature flagmonitor-accounts-number)
Optional count of updated accounts used for monitoring.
Return Value
Returns a Result with a tuple containing:
OptimisticStateImpl— The updated optimistic state after postprocessing.CrossThreadRefData— Data required for cross-thread message and account communication.
Usage Example
let (updated_state, cross_thread_data) = postprocess(
initial_state,
consumed_messages,
produced_messages_current_thread,
changed_accounts,
block_id,
block_seq_no,
current_state,
produced_messages_other_threads,
block_info,
thread_id,
threads_table,
block_accounts,
accounts_repo,
db,
#[cfg(feature = "monitor-accounts-number")] updated_accounts_count,
)?;
Detailed Behavior and Implementation Notes
Sorting and Filtering of Produced Messages:
The function first separates produced internal messages that were already consumed, sorts them by their wrapped message for deterministic ordering, and re-combines them. This ensures message queues reflect accurate consumption states.Thread Reference Update:
Thethread_refs_stateis cloned and updated with the current thread's latest block and sequence number for tracking progress.Building Next Message Queue State:
Using a builder pattern fromThreadMessageQueueState, the function constructs the next message queue state incorporating new consumed and produced messages, and no account removals yet.Handling Accounts Changing DApp ID:
Accounts whose routing no longer belongs to the current optimistic state are removed from the thread’s state and added tooutbound_accountsfor cross-thread communication. The shard state is updated to insert redirects (stubs) for these accounts.Second Message Queue State Build:
The message queue state is rebuilt with accounts removed in the previous step, reflecting the updated account set.Caching and Unloading Accounts:
If unloading is configured (unload_after), accounts that have not changed recently are unloaded from memory by replacing their state with external references. Corresponding entries are cached for faster access. Accounts that are deleted from the shard state are reported to the accounts repository.Rebuilding Optimistic State:
The final optimistic state is rebuilt with updated fields including block identifiers, shard state, messages, thread references, changed and cached accounts, and optionally, the updated accounts number.Cross-Thread Reference Data Construction:
Prepares data indicating outbound messages and accounts to other threads, along with block and thread metadata.
Interaction with Other Parts of the System
Message and Account Types:
Utilizes types likeWrappedMessage,WrappedAccount,AccountAddress,AccountRouting, andMessageIdentifierto represent core blockchain entities.Repositories and Storage:
Reads and writes to persistent storage viaAccountsRepositoryandMessageDurableStorage.Optimistic State Management:
Works closely withOptimisticStateImplandOptimisticShardStateto ensure the internal shard state reflects the latest message processing results.Thread and Block Metadata:
UsesThreadIdentifier,BlockIdentifier,BlockSeqNo, andThreadsTableto manage multi-threaded execution context.Cross-Thread Communication:
ConstructsCrossThreadRefDatafor passing messages and account updates across thread boundaries.Message Queue State:
Builds and updatesThreadMessageQueueStatethat manages the lifecycle of messages within each thread.
Mermaid Diagram: Function Workflow
flowchart TD
A[Start: initial_optimistic_state & inputs]
B[Filter & sort produced_internal_messages_to_the_current_thread]
C[Sort produced_internal_messages_to_other_threads]
D[Update thread_refs_state with current block info]
E[Build next ThreadMessageQueueState with consumed & produced messages]
F[Handle accounts_that_changed_their_dapp_id]
G[Update shard state with redirects for outbound accounts]
H[Rebuild ThreadMessageQueueState with removed accounts]
I[Unload accounts based on unload_after policy]
J[Rebuild OptimisticStateImpl with updated data]
K[Construct CrossThreadRefData for outbound messages/accounts]
L[Return updated state and cross-thread data]
A --> B --> C --> D --> E --> F --> G --> H --> I --> J --> K --> L
This diagram illustrates the sequential processing steps within the postprocess function, showing the flow from initial state inputs through message handling, account updates, state rebuilding, and finally preparation of data for cross-thread communication.