message_storage.rs

Overview

This file implements the MessageDurableStorage struct, which provides durable storage capabilities for messages using a key-value store abstraction. The primary goal is to store, index, and retrieve serialized messages efficiently, associating messages with sequence numbers and account addresses. It supports batch operations for reading messages and indexes, and maintains an in-memory sequence tracker to facilitate incremental message writes.

The storage backend is abstracted via the KeyValueStore trait and can be backed by different implementations, including an in-memory store (MemStore) or Aerospike database. The module uses locking mechanisms to synchronize sequence number updates and provides optional compile-time feature flags to enable or disable database interactions.


MessageDurableStorage Struct

Purpose

Encapsulates storage and retrieval operations for messages with durability guarantees. It manages serialization, indexing, and sequence tracking, and delegates actual persistence to a generic key-value store.

Fields

Field

Type

Description

store

Arc<dyn KeyValueStore>

Shared reference to the underlying key-value store for persistence.

set_prefix

String

Prefix used as part of the key namespace in the storage to isolate message sets.

seq

Arc<Mutex<HashMap<String, i64>>>

Thread-safe in-memory map tracking last used sequence number per destination address (hex string).


Methods

new(store: impl KeyValueStore + 'static, set_prefix: &str) -> Self

Creates a new instance of MessageDurableStorage using a provided key-value store and a set prefix.

let kv_store = MemStore::new();
let storage = MessageDurableStorage::new(kv_store, "msg_set");

mem() -> Self

Creates an in-memory MessageDurableStorage instance using MemStore as backend with an empty prefix.


write_messages(messages: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>) -> anyhow::Result<()>

Persists a batch of messages grouped by account address. Each message is serialized and stored with an associated auto-incremented sequence number. Additionally, an index mapping from the combination of address and sequence number to the message hash is created.


read_message(hash: &str) -> anyhow::Result<Option<(i64, WrappedMessage)>>

Retrieves a message and its sequence number by its hash.

if let Some((seq, message)) = storage.read_message("abc123")? {
    // process message
}

get_rowid_by_hash(hash: &str) -> anyhow::Result<Option<i64>>

Retrieves only the sequence number (row ID) associated with a message hash.


next_simple(dest: &str, start_cursor: i64, limit: usize) -> anyhow::Result<(Vec<WrappedMessage>, Option<i64>)>

Fetches a sequential batch of messages for a given destination address starting from a specified cursor (sequence number). It returns the messages in ascending order of sequence numbers along with an optional new cursor for subsequent reads.


Internal Helper Methods

message_key(&self, hash: &str) -> Key

Constructs a storage key for a message using the namespace, set prefix, and message hash.

index_key(&self, account: &str, seq: i64) -> Key

Constructs a storage key for the index entry mapping account and sequence number to message hash.

batch_read_messages(hashes: Vec<String>) -> anyhow::Result<Vec<(i64, WrappedMessage)>>

Performs a batch read of multiple messages given their hashes. Returns a vector of tuples (sequence_number, WrappedMessage).


batch_get_hashes_by_rowid(dest: &str, seqs: Vec<i64>) -> anyhow::Result<Vec<String>>

Performs a batch read of message hashes by the destination address and a list of sequence numbers.


Debug Methods (Available in Debug Builds)


Utility Function

monotonic_prefix_len(xs: &[i64]) -> usize

Determines the length of the longest prefix of the input slice where the sequence numbers increase exactly by 1 at each step.


Testing

The tests module contains unit tests for the monotonic_prefix_len function, covering various edge cases such as empty arrays, single elements, non-monotonic sequences, repeated elements, and sequences with multiple breaks.


Interaction with Other System Components


Data Storage Model


Diagram: Class Structure of MessageDurableStorage

classDiagram
class MessageDurableStorage {
-store: Arc<KeyValueStore>
-set_prefix: String
-seq: Arc<Mutex<HashMap<String, i64>>>
+new(store, set_prefix)
+mem()
+write_messages(messages)
+read_message(hash)
+get_rowid_by_hash(hash)
+next_simple(dest, start_cursor, limit)
+db_reads()
+db_writes()
-message_key(hash)
-index_key(account, seq)
-batch_read_messages(hashes)
-batch_get_hashes_by_rowid(dest, seqs)
}

This diagram shows the main struct with its private fields and public methods, highlighting the encapsulated storage and retrieval functionalities. The private helper methods support internal key construction and batch operations.