save_service.rs
Overview
This file implements a long-running service responsible for saving the state of blockchain blocks asynchronously. It listens for updates to the BlockStateInner via a message receiver and triggers persistent storage operations when the state changes. The service ensures that the latest state is saved only when necessary, optimizing disk I/O and maintaining consistency of the block state data.
Functions
start_state_save_service
pub fn start_state_save_service(
state_receiver: InstrumentedReceiver<Arc<BlockStateInner>>,
) -> anyhow::Result<()>
Purpose
Starts the state save service, which continuously listens for new block state updates delivered through the provided InstrumentedReceiver. Upon receiving a new state, it checks whether the state has changed since the last save and if so, persists the updated state.
Parameters
state_receiver: AnInstrumentedReceiverwrapping aArc<BlockStateInner>. This receiver acts as the source of updated block states for the service. It uses asynchronous message passing with instrumentation for telemetry.
Return Value
Returns
anyhow::Result<()>:Ok(())if the service runs uninterrupted (though practically it runs indefinitely).An error (
anyhow::Error) in case receiving from the channel fails, which breaks the loop.
Usage Example
let receiver = ...; // acquire an InstrumentedReceiver<Arc<BlockStateInner>>
start_state_save_service(receiver)?;
This function is typically run in a dedicated thread or asynchronous task as part of the node's background services.
Behavior and Implementation Details
The function enters an infinite loop, blocking on
state_receiver.recv()waiting for a newBlockStateInnerinstance.Upon receiving a new state (
Ok(state)):It logs a trace message containing:
The block sequence number, accessed via a guarded read on the state (
state.guarded(|e| *e.block_seq_no())).The block identifier.
It acquires a write lock via
state.shared_access.write()to safely mutate the shared state.It compares the
last_saved_object_state_versionwith the currentobject_state_version.If versions differ, indicating a new state to save, it calls
state.save()?to persist the state.
If receiving the state fails (
Err(e)), the function returns an error usinganyhow::bail!, which terminates the service.
Important Implementation Notes
The use of
Arc<BlockStateInner>allows shared ownership and thread-safe concurrency for the block state across multiple consumers.The
Guardedutility is used for synchronized access to guarded data withinBlockStateInner.The service relies on version numbers (
last_saved_object_state_versionandobject_state_version) to avoid redundant saves.The service uses continuous blocking receive calls, making it suitable for being spawned in a dedicated thread or asynchronous context.
Key Structures and Concepts
BlockStateInner
Represents the internal state of a blockchain block.
Contains fields such as:
block_seq_no(): The sequence number of the block.block_identifier: A unique identifier for the block.last_saved_object_state_version: Tracks the last saved version of the object state.object_state_version: Tracks the current version of the object state.
Supports thread-safe shared access through
shared_accessand guarded reading/writing.Provides a
save()method which persists the state to durable storage.
InstrumentedReceiver
A telemetry-enhanced message receiver based on multi-producer, single-consumer (mpsc) channels.
Used for receiving updates on block states asynchronously.
Supports instrumentation useful for monitoring and logging.
Guarded
A utility type providing controlled access to shared mutable data with synchronization.
Used here to ensure safe access to internal fields of
BlockStateInner.
Interaction with Other System Components
The service consumes block state updates produced elsewhere in the system, likely from block processing or consensus components that generate new
BlockStateInnerinstances.After detecting changes in the block state, it invokes persistence mechanisms embedded in
BlockStateInner::save().The service logs trace information leveraging the telemetry and tracing infrastructure.
It depends on synchronization primitives to safely share and update state in a concurrent environment.
Mermaid Diagram
classDiagram
class save_service {
+start_state_save_service(state_receiver)
}
class BlockStateInner {
+block_seq_no()
+block_identifier
+last_saved_object_state_version
+object_state_version
+save()
+shared_access
}
class InstrumentedReceiver {
+recv()
}
class Guarded {
+guarded()
}
save_service --> InstrumentedReceiver : consumes
save_service --> BlockStateInner : processes and saves
BlockStateInner o-- Guarded : uses for access control
This diagram illustrates the relationship between the main function and the key types it interacts with in the file. The start_state_save_service function consumes data from InstrumentedReceiver, processes BlockStateInner instances, and uses Guarded for safe concurrent data access.