next.rs
Overview
This file defines the logic for constructing the next valid message range in a sequence of messages, handling both storage interactions and message consumption verification. It provides an error enumeration for build failures, a trait to abstract the range-building operation, and an implementation of that trait for any storage type that supports durable reads.
The primary focus is on updating a MessagesRange by appending new messages, verifying consumed messages against the existing range, and preparing the next range state for further processing or validation. This mechanism is crucial for maintaining consistent message sequencing and integrity in systems where messages are stored durably and processed incrementally.
Enumerations
BuildNextRangeError<Message, StorageErr, IteratorError>
This enum represents the possible errors that can occur during the process of building the next message range.
Variants:
StorageError(StorageErr): Indicates an error encountered while accessing the durable storage.
InvalidRangeOrConsumedMessage { next_consumed: Message, next_in_range: Message }: Occurs when the consumed message does not match the expected message from the range iterator, indicating a mismatch or data inconsistency.RangeIteratorError(IteratorError): Represents errors raised by the underlying range iterator during traversal.
OutOfRange(Message): Triggered when a consumed message is not found within the expected range.
Usage example:
match storage.build_next_range(range, appended_messages, consumed_messages) {
Ok(next_range) => { /* proceed with next range */ },
Err(BuildNextRangeError::StorageError(e)) => { /* handle storage error */ },
Err(BuildNextRangeError::InvalidRangeOrConsumedMessage { next_consumed, next_in_range }) => { /* handle mismatch */ },
Err(e) => { /* handle other errors */ },
}
Traits
BuildNextRange<MessageKey, Message>
Defines an interface for building the next message range based on a current range, new appended messages, and consumed messages.
Associated Types:
Error: The error type returned if building the next range fails.
Required Methods:
fn build_next_range(&self, range: MessagesRange<MessageKey, Message>, appended_messages: &[(MessageKey, Message)], consumed_messages: &[Message]) -> Result<MessagesRange<MessageKey, Message>, Self::Error>;
Parameters:
range: The current message range of typeMessagesRange<MessageKey, Message>.appended_messages: Slice of tuples containing message keys and messages that have been appended since the last range.consumed_messages: Slice of messages that have been consumed and need verification against the range.
Returns:
Ok(MessagesRange<MessageKey, Message>): The updated range after appending and verification.Err(Self::Error): An error indicating failure in building the next range.
Implementations
BuildNextRange for Storage (where Storage: DurableStorageRead<MessageKey, Message>)
This implementation enables any storage type that supports durable reads to build the next message range.
Type Constraints:
MessageKey: Must implementPartialEq,Clone, and convertible fromMessage(From<Message>).Message: Must implementPartialEqandClone.Storage: Must implementDurableStorageRead<MessageKey, Message>.
Error Type:
BuildNextRangeError<Message, Storage::LoadError, IteratorError<Storage::LoadError, MessageKey>>
Method:
fn build_next_range(
&self,
mut range: MessagesRange<MessageKey, Message>,
appended_messages: &[(MessageKey, Message)],
consumed_messages: &[Message],
) -> Result<MessagesRange<MessageKey, Message>, Self::Error>
Functionality:
Extends the
tail_sequenceof the currentrangeby appending new messages.Creates a
MessagesRangeIteratorto iterate over the updated range.Iterates through each consumed message:
Fetches the next message from the iterator.
Compares it with the consumed message.
Returns
InvalidRangeOrConsumedMessageerror if they don't match.
If an expected message is missing, returns an
OutOfRangeerror.Retrieves the remaining range after consumption.
(Comments indicate plans for further message consumption and block extension logic.)
Usage Example:
let next_range = storage.build_next_range(current_range, &new_appended_msgs, &consumed_msgs)?;
Important Implementation Details
The method relies heavily on
MessagesRangeIteratorto sequentially verify consumed messages against the stored range, ensuring message order and integrity.The conversion from
MessagetoMessageKeyis essential for range key management and storage lookups.Error handling is precise, differentiating between storage errors, iterator errors, range mismatches, and out-of-range conditions.
The
TODOcomments in the implementation suggest planned enhancements involving finalizing message blocks and candidate block management, which are likely tied to the broader message lifecycle.
Interaction with Other Modules
crate::iter::iterator: UtilizesIteratorErrorandMessagesRangeIteratorfor iterating over message ranges and handling iterator-specific errors.crate::range::MessagesRange: The core data structure representing a range of messages that this file manipulates.crate::storage::DurableStorageRead: Defines the required interface for the storage backend that supports reading messages durably, used here to fetch and validate message data.
This file acts as a bridge between message range management (MessagesRange), message iteration (MessagesRangeIterator), and durable storage access (DurableStorageRead), ensuring the consistency and correctness of message ranges as they evolve.
Diagram: Structure of next.rs
classDiagram
class BuildNextRangeError {
<<enum>>
StorageError
InvalidRangeOrConsumedMessage
RangeIteratorError
OutOfRange
}
class BuildNextRange {
<<trait>>
+build_next_range()
}
class Storage {
+build_next_range()
}
BuildNextRangeError <|-- Storage
BuildNextRange <|.. Storage
This diagram illustrates the relationship between the error enum, the trait BuildNextRange, and the implementation of the trait for the Storage type. The Storage struct implements the BuildNextRange trait and can return errors defined in BuildNextRangeError.