message_storage.rs
Overview
This file implements the MessageDurableStorage struct, which provides durable storage capabilities for messages using a key-value store abstraction. The primary goal is to store, index, and retrieve serialized messages efficiently, associating messages with sequence numbers and account addresses. It supports batch operations for reading messages and indexes, and maintains an in-memory sequence tracker to facilitate incremental message writes.
The storage backend is abstracted via the KeyValueStore trait and can be backed by different implementations, including an in-memory store (MemStore) or Aerospike database. The module uses locking mechanisms to synchronize sequence number updates and provides optional compile-time feature flags to enable or disable database interactions.
MessageDurableStorage Struct
Purpose
Encapsulates storage and retrieval operations for messages with durability guarantees. It manages serialization, indexing, and sequence tracking, and delegates actual persistence to a generic key-value store.
Fields
Field | Type | Description |
|---|---|---|
|
| Shared reference to the underlying key-value store for persistence. |
|
| Prefix used as part of the key namespace in the storage to isolate message sets. |
|
| Thread-safe in-memory map tracking last used sequence number per destination address (hex string). |
Methods
new(store: impl KeyValueStore + 'static, set_prefix: &str) -> Self
Creates a new instance of MessageDurableStorage using a provided key-value store and a set prefix.
Parameters:
store: An implementation ofKeyValueStoretrait to handle data persistence.set_prefix: String slice used as a prefix in storage keys.
Returns: A new
MessageDurableStorageinstance.Usage Example:
let kv_store = MemStore::new();
let storage = MessageDurableStorage::new(kv_store, "msg_set");
mem() -> Self
Creates an in-memory MessageDurableStorage instance using MemStore as backend with an empty prefix.
Returns:
MessageDurableStorageinstance backed by memory store.
write_messages(messages: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>) -> anyhow::Result<()>
Persists a batch of messages grouped by account address. Each message is serialized and stored with an associated auto-incremented sequence number. Additionally, an index mapping from the combination of address and sequence number to the message hash is created.
Parameters:
messages: A map where keys are account addresses and values are vectors of tuples containingMessageIdentifierand anArcwrappedWrappedMessage.
Returns:
Ok(())on success, or an error if storage operations fail.Implementation Details:
Uses feature flags (
messages_dbanddisable_db_for_messages) to conditionally skip DB operations.Sequence numbers per address are tracked in-memory and incremented on each message write.
Stores two records per message:
Keyed by message hash containing serialized message blob and sequence number.
Keyed by concatenation of address and sequence number containing the message hash (reverse index).
read_message(hash: &str) -> anyhow::Result<Option<(i64, WrappedMessage)>>
Retrieves a message and its sequence number by its hash.
Parameters:
hash: Hex string representing the message hash.
Returns:
Ok(Some((sequence_number, WrappedMessage)))if found.Ok(None)if message not found or DB features disabled.Errif deserialization or data integrity checks fail.
Usage Example:
if let Some((seq, message)) = storage.read_message("abc123")? {
// process message
}
get_rowid_by_hash(hash: &str) -> anyhow::Result<Option<i64>>
Retrieves only the sequence number (row ID) associated with a message hash.
Parameters:
hash: Message hash as hex string.
Returns: Optional sequence number or
Noneif not found.Note: Internally calls
read_messageand extracts the sequence number.
next_simple(dest: &str, start_cursor: i64, limit: usize) -> anyhow::Result<(Vec<WrappedMessage>, Option<i64>)>
Fetches a sequential batch of messages for a given destination address starting from a specified cursor (sequence number). It returns the messages in ascending order of sequence numbers along with an optional new cursor for subsequent reads.
Parameters:
dest: Destination address as hex string.start_cursor: The last known sequence number to start reading after.limit: Maximum number of messages to retrieve.
Returns:
Tuple containing:
Vector of
WrappedMessageinstances.Optional sequence number of the last message returned (or
Noneif none).
Implementation Details:
Computes the next sequence numbers range.
Uses batch index key lookups (
batch_get_hashes_by_rowid) to get hashes by sequence numbers.Uses batch message reads (
batch_read_messages) to fetch messages by hashes.Ensures returned messages form a monotonic sequence using
monotonic_prefix_len.Returns empty vector and
Noneif the sequence is broken.
Internal Helper Methods
message_key(&self, hash: &str) -> Key
Constructs a storage key for a message using the namespace, set prefix, and message hash.
index_key(&self, account: &str, seq: i64) -> Key
Constructs a storage key for the index entry mapping account and sequence number to message hash.
batch_read_messages(hashes: Vec<String>) -> anyhow::Result<Vec<(i64, WrappedMessage)>>
Performs a batch read of multiple messages given their hashes. Returns a vector of tuples (sequence_number, WrappedMessage).
Parameters:
hashes: Vector of message hash strings.
Returns: Vector of tuples containing sequence numbers and deserialized messages.
Error Handling: Returns an error if any record is missing expected bins or deserialization fails.
batch_get_hashes_by_rowid(dest: &str, seqs: Vec<i64>) -> anyhow::Result<Vec<String>>
Performs a batch read of message hashes by the destination address and a list of sequence numbers.
Parameters:
dest: Destination address as hex string.seqs: Vector of sequence numbers.
Returns: Vector of message hash strings corresponding to the given sequence numbers.
Error Handling: Returns errors on missing bins or failed retrievals.
Debug Methods (Available in Debug Builds)
db_reads(&self) -> usize: Returns the count of read operations performed by the underlying store.db_writes(&self) -> usize: Returns the count of write operations performed by the underlying store.
Utility Function
monotonic_prefix_len(xs: &[i64]) -> usize
Determines the length of the longest prefix of the input slice where the sequence numbers increase exactly by 1 at each step.
Parameters:
xs: Slice of 64-bit integers representing sequence numbers.
Returns: Length of the monotonic prefix.
Usage: Used in
next_simpleto ensure returned messages form a contiguous sequence without gaps.
Testing
The tests module contains unit tests for the monotonic_prefix_len function, covering various edge cases such as empty arrays, single elements, non-monotonic sequences, repeated elements, and sequences with multiple breaks.
Interaction with Other System Components
Depends on the
KeyValueStoretrait and its implementations likeMemStorefor underlying data persistence.Interacts with types such as
WrappedMessage,MessageIdentifier, andAccountAddressfor message metadata and identification.Uses external crates for serialization (
bincode), synchronization (parking_lot::Mutex), and Aerospike-specific key and bin utilities.Utilizes feature flags (
messages_db,disable_db_for_messages) to conditionally compile database-related code paths.Metrics integration via
AEROSPIKE_OBJECT_TYPE_INT_MESSAGESconstant to classify Aerospike operations.
Data Storage Model
Messages are stored keyed by their hash, with serialized content and sequence number.
Reverse index entries map from
<account_address>-<sequence_number>to message hash.Sequence numbers are auto-incremented timestamps or counters, tracked in-memory per destination.
Diagram: Class Structure of MessageDurableStorage
classDiagram
class MessageDurableStorage {
-store: Arc<KeyValueStore>
-set_prefix: String
-seq: Arc<Mutex<HashMap<String, i64>>>
+new(store, set_prefix)
+mem()
+write_messages(messages)
+read_message(hash)
+get_rowid_by_hash(hash)
+next_simple(dest, start_cursor, limit)
+db_reads()
+db_writes()
-message_key(hash)
-index_key(account, seq)
-batch_read_messages(hashes)
-batch_get_hashes_by_rowid(dest, seqs)
}
This diagram shows the main struct with its private fields and public methods, highlighting the encapsulated storage and retrieval functionalities. The private helper methods support internal key construction and batch operations.