writer_service.rs
Overview
writer_service.rs defines the MessageDBWriterService struct, a service responsible for asynchronously writing batches of messages to a durable storage backend. It manages incoming collections of messages keyed by account addresses, spawns dedicated threads to perform write operations per address in parallel, and optionally reports write latency metrics. This design enables efficient, non-blocking message persistence, isolating database I/O from the main application flow.
Structures and Methods
MessageDBWriterService
A cloneable service struct that encapsulates a multi-producer, single-consumer (mpsc) channel sender to enqueue message batches and a handler thread for consuming these batches and writing them to persistent storage.
Fields
sender: std::sync::mpsc::Sender<HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>>
Channel sender used to send message batches keyed by account address.handler: Arc<JoinHandle<anyhow::Result<()>>>
Shared ownership handle to the background thread running the message writing loop.
Methods
new
pub fn new(
message_db: MessageDurableStorage,
metrics: Option<BlockProductionMetrics>,
) -> anyhow::Result<Self>
Constructs and returns a new instance of MessageDBWriterService.
Parameters:
message_db: An instance ofMessageDurableStorageproviding the interface for persistent message writes.metrics: Optional metrics collector of typeBlockProductionMetricsto report write latency.
Returns:
anyhow::Result<Self>— On success, returns a newMessageDBWriterServiceinstance. On failure, returns an error detailing the cause.Functionality:
Creates an mpsc channel.
Spawns a dedicated thread named
"MessageDBWriterService"that continuously listens for incoming message batches.For each received batch, it iterates over each account address's messages and spawns independent threads (named
"MessageDBWriterService_thread_inner") to write messages concurrently.Handles thread panics and write errors gracefully, logging them with
tracing::error!.Reports elapsed write time in milliseconds via the optional metrics collector.
If the receiver channel closes or errors, logs an error indicating termination.
Usage Example:
let message_db = MessageDurableStorage::new(...);
let metrics = Some(BlockProductionMetrics::new());
let writer_service = MessageDBWriterService::new(message_db, metrics)?;
write
pub fn write(
&self,
messages: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>,
) -> anyhow::Result<()>
Enqueues a batch of messages to be written asynchronously.
Parameters:
messages: A hashmap mappingAccountAddressto a vector of tuples, each containing aMessageIdentifierand a reference-countedWrappedMessage.
Returns:
anyhow::Result<()>— Returns an error if the handler thread has stopped or if sending on the channel fails.Functionality:
Verifies the background thread is still active.
Sends the message batch through the channel to the handler thread for processing.
Usage Example:
let mut batch = HashMap::new();
batch.insert(account_address, vec![(message_id, wrapped_message)]);
writer_service.write(batch)?;
Implementation Details
The service uses Rust's standard library multithreading primitives (
std::thread,std::sync::mpsc) to implement an asynchronous message writing mechanism.Each batch of messages is processed in a dedicated thread that listens on the receiving end of the channel.
For each account address in the batch, the service spawns a separate thread to handle writing messages for that address concurrently, improving throughput.
Thread panics and errors during write operations are caught and logged but do not crash the service.
Metrics collection is integrated for performance monitoring, capturing the time spent writing messages.
The design currently spawns a new thread per account address each time a batch is processed; a comment in the code notes potential future improvements to control the number of threads.
Interactions with Other Modules
Interacts with
MessageDurableStoragefrom thestoragemodule to perform the actual writes to durable storage.Utilizes message-related types
MessageIdentifierandWrappedMessagefrom themessagemodule, andAccountAddressfromtypes.Optionally integrates with
BlockProductionMetricsfrom thehelper::metricsmodule for performance tracking.Uses the
tracingcrate for structured logging of errors and panics.
Data Flow and Threading Workflow
flowchart TD
A[Caller] -->|Calls write()| B[MessageDBWriterService Sender]
B -->|Sends message batch| C[Background Handler Thread]
C -->|For each AccountAddress| D[Spawn Inner Thread]
D -->|Write messages| E[MessageDurableStorage]
E -->|Write result| D
D -->|Join threads| C
C -->|Report metrics| F[BlockProductionMetrics]
C -->|Log errors| G[tracing Logger]
This flowchart illustrates the asynchronous message writing process: a caller sends messages via write(), which queues them through the channel. The background handler thread receives batches and spawns inner threads to write messages per account address. Write results are joined, metrics reported, and errors logged accordingly.