iterator.rs
Overview
This file provides an iterator abstraction for traversing a range of messages stored in a durable storage system. It defines an iterator over a MessagesRange, which may include both a compacted history segment stored in the database and a tail sequence cached in memory. The iterator fetches messages on-demand from the storage backend while handling potential storage errors and range inconsistencies. It is generic over the message key type, message type, and the durable storage implementation.
The file also encapsulates error handling related to loading messages or references from storage, and includes internal utilities to manage loading single messages or batches of messages from storage.
Main Components
IteratorError<StorageErr, MessageKey>
An enumeration representing possible errors during iteration:
LoadMessageError(StorageErr): Error occurred loading a message from storage.
LoadNextRefError(StorageErr): Error occurred loading the next message key reference.
BrokenRange { missing_end: MessageKey, actual_last: MessageKey }: The expected end of the range is missing; the range is broken.
Type Parameters
StorageErr: The error type returned by the storage implementation.
MessageKey: Type used as the key or identifier for messages.
MessagesRangeIterator<'a, MessageKey, Message, Storage>
An iterator over a MessagesRange that fetches messages from a durable storage backend and an in-memory tail sequence.
Type Parameters and Constraints
'a: Lifetime parameter tied to the storage reference.MessageKey: The type of keys identifying messages. Must implementClone,PartialEq, and From in some cases.Message: The message type. Must implementClone.Storage: The durable storage type implementingDurableStorageRead<MessageKey, Message>.
Fields
remaining: MessagesRange<MessageKey, Message>: The range of messages remaining to iterate over.db: &'a Storage: Reference to the durable storage backend.
Methods
new(db: &'a Storage, range: MessagesRange<MessageKey, Message>) -> SelfCreates a new iterator instance over the given range using the provided storage reference.
Parameters:
db: Reference to the storage implementingDurableStorageRead.range: TheMessagesRangespecifying the messages to iterate.
Returns:
A new
MessagesRangeIterator.
remaining(&self) -> &MessagesRange<MessageKey, Message>Returns a reference to the current remaining message range being iterated.
next_range(&mut self, limit: usize) -> Result<Vec<Message>, IteratorError<Storage::LoadError, MessageKey>>Fetches the next batch of messages up to the specified limit from both the compacted history portion in storage and the tail sequence cached in memory.
Parameters:
limit: Maximum number of messages to fetch.
Returns:
Ok(Vec<Message>): Vector of messages fetched.Err(IteratorError): An error occurred during loading.
Usage Example:
let mut iterator = MessagesRangeIterator::new(&storage, message_range); let batch = iterator.next_range(10)?; for msg in batch { // process each message }remaining_messages_from_db(&self) -> Result<Vec<Message>, Storage::LoadError>Retrieves all remaining messages from the compacted history portion in the database without modifying the iterator state.
Returns:
Ok(Vec<Message>)containing all remaining messages in storage.Err(Storage::LoadError)if loading fails.
Implementation of Iterator trait for MessagesRangeIterator
type Item = Result<(Message, MessageKey), IteratorError<Storage::LoadError, MessageKey>>fn next(&mut self) -> Option<Self::Item>Returns the next message and its key wrapped in a
Result. It first attempts to read from the compacted history in storage. If that portion is exhausted, it continues with the tail sequence in memory.Behavior:
If the compacted history exists:
Loads the next message from storage.
Updates the remaining compacted history range.
Else, pops the next message from the tail sequence.
Returns
Noneif no messages remain.
Internal Module: consume
Contains helper functions to load messages and message ranges from storage, handling iteration logic over storage references.
Functions
from_storage(db: &Storage, msg_ref: MessageKey, end: MessageKey) -> Result<(Message, Option<RangeInclusive<MessageKey>>), IteratorError<Storage::LoadError, MessageKey>>Loads a single message identified by
msg_reffrom storage and determines if there are more messages up toend.Returns:
Ok((message, Some(next_range))): The message and the next remaining range.Ok((message, None)): The message and no remaining range (end reached).Err(IteratorError): On loading failure.
range_from_storage(db: &Storage, msg_ref: MessageKey, end: MessageKey, limit: usize) -> Result<(Vec<Message>, Option<RangeInclusive<MessageKey>>), IteratorError<Storage::LoadError, MessageKey>>Loads a range of messages starting at
msg_refup toend, with an upper bound oflimitmessages.Algorithm Details:
Fetches up to
limitmessages from storage.Converts messages to keys and searches for
endin the loaded batch.If
endis found before the last message, truncates and sets the next range accordingly.If
endis not found, attempts to retrieve the next key after the last loaded message; if none, signals broken range.
Returns:
Ok((messages, next_range)): Loaded messages and optionally next range to continue.Err(IteratorError): On loading failure or broken range.
Interaction with Other Components
MessagesRange<MessageKey, Message>: The iterator operates over this struct which encapsulates a range of messages divided into a compacted history stored in durable storage and a tail sequence in memory.DurableStorageRead<MessageKey, Message>: A trait implemented by the storage backend, providing operations such as loading messages, retrieving the next message key, and fetching remaining messages.consumemodule: Internal helper functions used by the iterator to load messages or message ranges from storage in a consistent and error-resilient manner.The iterator relies on these components to provide seamless iteration over potentially large datasets stored partially in persistent storage and partially in memory.
Important Implementation Details
The iterator maintains an internal
remainingfield of typeMessagesRangewhich tracks the current unconsumed portion of the message range.When iterating, the iterator first attempts to fetch messages from the compacted history portion stored in durable storage. It uses the
consumemodule functions to load messages individually or in batches.After exhausting the compacted history, the iterator consumes messages from the in-memory
tail_sequencequeue.The iterator gracefully handles errors from the storage layer, wrapping them into the
IteratorErrorenum.The implementation ensures that the iterator state is updated after each fetch to reflect consumed messages and remaining ranges.
Mermaid Diagram
classDiagram
class MessagesRangeIterator {
-remaining: MessagesRange
-db: &Storage
+new()
+remaining()
+next_range()
+remaining_messages_from_db()
+next()
}
class IteratorError {
<<enum>>
+LoadMessageError
+LoadNextRefError
+BrokenRange
}
class consume {
+from_storage()
+range_from_storage()
}
MessagesRangeIterator ..> MessagesRange : uses
MessagesRangeIterator ..> DurableStorageRead : interacts
MessagesRangeIterator ..> IteratorError : returns
MessagesRangeIterator ..> consume : calls
This diagram shows the core MessagesRangeIterator struct with its methods and relationships to the MessagesRange data structure, the DurableStorageRead trait, the IteratorError enum for error handling, and the internal consume module providing utility functions.