writer_service.rs

Overview

writer_service.rs defines the MessageDBWriterService struct, a service responsible for asynchronously writing batches of messages to a durable storage backend. It manages incoming collections of messages keyed by account addresses, spawns dedicated threads to perform write operations per address in parallel, and optionally reports write latency metrics. This design enables efficient, non-blocking message persistence, isolating database I/O from the main application flow.

Structures and Methods

MessageDBWriterService

A cloneable service struct that encapsulates a multi-producer, single-consumer (mpsc) channel sender to enqueue message batches and a handler thread for consuming these batches and writing them to persistent storage.

Fields

Methods

new
pub fn new(
    message_db: MessageDurableStorage,
    metrics: Option<BlockProductionMetrics>,
) -> anyhow::Result<Self>

Constructs and returns a new instance of MessageDBWriterService.

let message_db = MessageDurableStorage::new(...);
let metrics = Some(BlockProductionMetrics::new());
let writer_service = MessageDBWriterService::new(message_db, metrics)?;
write
pub fn write(
    &self,
    messages: HashMap<AccountAddress, Vec<(MessageIdentifier, Arc<WrappedMessage>)>>,
) -> anyhow::Result<()>

Enqueues a batch of messages to be written asynchronously.

let mut batch = HashMap::new();
batch.insert(account_address, vec![(message_id, wrapped_message)]);
writer_service.write(batch)?;

Implementation Details

Interactions with Other Modules

Data Flow and Threading Workflow

flowchart TD
A[Caller] -->|Calls write()| B[MessageDBWriterService Sender]
B -->|Sends message batch| C[Background Handler Thread]
C -->|For each AccountAddress| D[Spawn Inner Thread]
D -->|Write messages| E[MessageDurableStorage]
E -->|Write result| D
D -->|Join threads| C
C -->|Report metrics| F[BlockProductionMetrics]
C -->|Log errors| G[tracing Logger]

This flowchart illustrates the asynchronous message writing process: a caller sends messages via write(), which queues them through the channel. The background handler thread receives batches and spawns inner threads to write messages per account address. Write results are joined, metrics reported, and errors logged accordingly.