repository_impl.rs
Overview
This file implements the core repository layer for managing blockchain blocks, states, and related metadata in a multi-threaded environment. It provides persistent storage, caching, and state management functionalities for finalized and optimistic (unfinalized) blockchain data structures. The repository manages block data, optimistic states, finalized blocks, accounts, and metadata related to threads (processing chains). It also facilitates synchronization, state snapshot application, and interaction with external storage services and metrics systems.
Key functionalities include:
Loading and saving blocks and states from/to disk.
Managing finalized and optimistic states with caching and eviction.
Supporting multi-threaded blockchains with per-thread metadata and state management.
Handling block finalization and marking states as finalized.
Providing interfaces for accessing blocks and states for other system components.
Integrating with account repositories, block state repositories, and message durable storage.
Supporting cross-thread state references and block keeper sets updates.
The file interacts heavily with other components such as:
Accounts repository (
AccountsRepository)Block state repository (
BlockStateRepository)Message durable storage services (
MessageDurableStorageandMessageDBWriterService)Shared services for inter-component coordination (
SharedServices)Block keeper system (
BlockKeeperSetand related structures)Optimistic states (
OptimisticStateImpl)Various blockchain-specific types (
BlockIdentifier,BlockSeqNo,ThreadIdentifier, etc.)
Entities and Structures
FinalizedBlockStorage
Manages a fixed-size per-thread in-memory cache of finalized blocks and their states.
Fields
per_thread_buffer_size: Maximum cache size per thread.buffer: HashMap fromThreadIdentifiertoFixedSizeHashMapofBlockIdentifierto(BlockState, Arc<Envelope<GoshBLS, AckiNackiBlock>>)tuples.
Key Methods
new(per_thread_buffer_size: usize) -> Self: Creates a new storage with specified cache size.store(block_state: BlockState, block: Envelope<GoshBLS, AckiNackiBlock>): Stores a finalized block and its state in the cache. Also caches blocks for threads spawning from threads table.find(block_identifier: &BlockIdentifier, hint: &[ThreadIdentifier]) -> Option<Arc<Envelope<GoshBLS, AckiNackiBlock>>>: Looks up a finalized block by its identifier using thread hints for optimization.buffer(): Returns a reference to the internal buffer.
Usage Example
let mut storage = FinalizedBlockStorage::new(100);
storage.store(block_state, block);
if let Some(cached_block) = storage.find(&block_id, &[thread_id]) {
// Use cached_block
}
BkSetUpdate
Represents updates to block keeper sets with sequence numbers and added nodes.
Fields
seq_no: Sequence number of the update.current: Current block keeper set (optional).future: Future block keeper set (optional).added_nodes: Tuple of sets of current and future addedNodeIdentifiers.
Conversions
From
ApiBkSet: Converts API block keeper set representation to internal representation.Into
ApiBkSet: Converts internal representation back to API format.
RepositoryImpl
Primary implementation of the Repository trait providing block and state persistence and caching.
Fields
data_dir: Root path for repository data storage.zerostate_path: Optional path to the zerostate file.metadatas: Thread metadata map wrapped inArc<Mutex<>>.saved_states: Map of thread to saved block sequence numbers and identifiers.thread_last_finalized_state: Last finalized optimistic state per thread.shared_services: Shared services instance for coordinating components.nack_set_cache: Cache for tracking unresolved NACKs (negative acknowledgments).block_state_repository: Repository for block states.optimistic_state: Cache of optimistic states keyed by block identifier.accounts: Accounts repository instance.split_state: Flag indicating whether the state is split into external/internal parts.metrics: Optional metrics collector.message_db: Durable storage for messages.message_storage_service: Service for writing messages to durable storage.states_cache_size: Cache size limit for states.finalized_blocks: Cache of finalized blocks.bk_set_update_tx: Channel sender for block keeper set updates.unfinalized_blocks: Cache of unfinalized blocks per thread.last_message_for_acc: Map of last message identifiers per account.
Key Methods
Initialization and Construction
fn new(...) -> Self: Creates a new repository instance, initializes directories, loads metadata and zerostate, and prepares caches.fn load_metadata(data_dir: &Path) -> anyhow::Result<HashMap<ThreadIdentifier, Arc<Mutex<Metadata<BlockIdentifier, BlockSeqNo>>>>>>: Loads per-thread metadata from disk.fn save_metadata(data_dir: &Path, metadatas: RepositoryMetadata) -> anyhow::Result<()>: Saves metadata to disk.
Block and State Access
fn get_finalized_block(&self, identifier: &BlockIdentifier) -> anyhow::Result<Option<Arc<Self::CandidateBlock>>>: Retrieves a finalized block by identifier.fn get_block_from_repo_or_archive(...) -> anyhow::Result<Arc<Self::CandidateBlock>>: Retrieves a block either from unfinalized cache or finalized storage.fn get_optimistic_state(...) -> anyhow::Result<Option<Arc<Self::OptimisticState>>>: Loads an optimistic state for a given block and thread, potentially loading from cache, disk, or archive.fn get_full_optimistic_state(...) -> anyhow::Result<Option<Arc<Self::OptimisticState>>>: Loads a fully expanded optimistic state, optionally loading external accounts if split state is enabled.fn last_finalized_optimistic_state(&self, thread_id: &ThreadIdentifier) -> Option<Arc<Self::OptimisticState>>: Returns the last finalized optimistic state for a thread.
Block Finalization
fn mark_block_as_finalized(...) -> anyhow::Result<()>: Marks a block as finalized, updates caches, metadata, metrics, and signals to other services.fn finalize_synced_block(&mut self, thread_snapshot: &ThreadSnapshot, state: &OptimisticStateImpl) -> anyhow::Result<()>: Finalizes a block received via synchronization.
State and Block Persistence
fn store_optimistic<T: Into<Arc<Self::OptimisticState>>>(&self, state: T) -> anyhow::Result<()>: Saves an optimistic state to cache and disk, optionally splitting accounts ifsplit_stateis enabled.fn save_block(data_dir: &Path, block: &Self::CandidateBlock) -> anyhow::Result<()>: Persists a candidate block to disk.fn load_block(data_dir: &Path, block_id: &BlockIdentifier) -> anyhow::Result<Option<Self::CandidateBlock>>: Loads a block from disk.fn erase_block(&self, block_id: &BlockIdentifier, thread_id: &ThreadIdentifier) -> anyhow::Result<()>: Removes block data and markers.
Metadata and Cache Management
fn init_thread(&mut self, thread_id: &ThreadIdentifier, parent_block_id: &BlockIdentifier) -> anyhow::Result<()>: Initializes repository metadata for a thread, ensuring parent is finalized.fn clear_optimistic_states(&self, thread_id: &ThreadIdentifier) -> anyhow::Result<()>: Clears optimistic states older than the last finalized block for a thread.
Utilities
fn load_from_file<T: Deserialize<'de>>(file_path: &PathBuf) -> anyhow::Result<Option<T>>: Generic function to deserialize data from a file.fn save_to_file<T: Serialize>(file_path: &PathBuf, data: &T, force_sync: bool) -> anyhow::Result<()>: Generic function to serialize and save data to a file atomically.fn get_path(&self, path: &str, oid: String) -> PathBuf: Constructs file path for a given subdirectory and object ID.fn get_nodes_by_threads(&self) -> HashMap<ThreadIdentifier, Option<NodeIdentifier>>: Retrieves last finalized producer nodes per thread.fn sync_accounts_from_state(&mut self, shard_state: Arc<ShardStateUnsplit>) -> anyhow::Result<()>: Synchronizes accounts from the given shard state into the accounts repository.
Trait Implementation
Implements the
Repositorytrait defining blockchain repository interface, including:Types such as
Attestation,CandidateBlock,OptimisticState, etc.Methods for loading blocks, states, metadata, marking blocks finalized, state snapshots, etc.
Metadata<TBlockIdentifier, TBlockSeqNo>
Holds per-thread metadata including the last finalized block identifier, sequence number, and optionally the last finalized producer node.
Fields
last_finalized_block_id: Identifier of the last finalized block.last_finalized_block_seq_no: Sequence number of the last finalized block.last_finalized_producer_id: Optional identifier of the last producer node.
WrappedExtMessage<TMessage>
Wrapper for external messages with index and expiration timestamp.
Fields
index: Message index.message: The actual message.timestamp: Expiry timestamp (current time + constant timeout).
Constructor
new(index: u32, message: TMessage) -> Self: Creates a new wrapped message with current timestamp plus timeout.
ExtMessages<TMessage>
Queue wrapper holding multiple wrapped external messages.
Field
queue: Vector ofWrappedExtMessage<TMessage>.
Default
Initializes empty queue.
ThreadSnapshot
Snapshot structure capturing all relevant thread state at a point in time for synchronization purposes.
Fields (Getters auto-derived)
optimistic_state: Serialized optimistic state bytes.cross_thread_ref_data: Cross-thread reference data vector.db_messages: Database message vectors.finalized_block: Finalized block envelope.bk_set,descendant_bk_set,future_bk_set,descendant_future_bk_set: Block keeper sets.finalized_block_stats: Block statistics.attestation_target: Attestation targets.producer_selector: Producer selector data.block_height: Height of the block.prefinalization_proof: Envelope with attestation data.ancestor_blocks_finalization_checkpoints: Checkpoints for ancestor blocks finalization.finalizes_blocks: Set of block indices that finalize blocks.parent_ancestor_blocks_finalization_checkpoints: Parent thread's finalization checkpoints.
Important Algorithms and Implementation Details
Optimistic State Loading and Caching
When requesting an optimistic state for a block, the repository first checks the in-memory cache.
If not cached, it tries to load from disk (
load_from_file), or falls back to loading from an archive of saved states.The archive loading attempts to find a chain of blocks back to the nearest available state or zerostate, applying blocks in order to reconstruct the requested state.
The repository maintains a cache of optimistic states and clears outdated states to limit memory use (
clear_optimistic_states).
Finalized Block Storage
Finalized blocks are cached per-thread with a fixed buffer size.
When storing a finalized block, the repository also caches it for threads that are spawning from the block (via threads table).
Lookup of finalized blocks uses thread hints for efficient search before scanning all threads.
Block Finalization Process
Finalizing a block involves marking its state as finalized, updating per-thread metadata, and caching the finalized block.
Metrics are updated to reflect block finalization timing and block keeper set sizes.
The repository sends updates about block keeper set changes via a channel.
It synchronizes accounts from the final state into the accounts repository.
If the block shares state resources, the repository saves the state for sharing via state sync service.
State Splitting
If
split_stateis enabled, the optimistic state maintains external accounts separately.During saving, external accounts are stored individually in the accounts repository.
When loading full optimistic state, external accounts are loaded and merged into the shard state.
Persistence and Atomic File Operations
Data is serialized using
bincodeand saved to temporary files before being atomically renamed to target paths.This approach ensures file writes are atomic and prevents corruption.
Directory structures are created as needed.
Message Extraction for Database Storage
The
extract_new_messagesfunction compares the current messages with the last saved message per account.Only new messages after the last saved message are extracted for writing to the durable message storage.
This avoids duplication of messages in the database.
Interaction with Other System Components
AccountsRepository: Manages account data persistence and retrieval, including external accounts when state splitting is enabled.
BlockStateRepository: Manages metadata and markers related to block states (finalized, applied, verification status).
MessageDurableStorage and MessageDBWriterService: Store and manage blockchain messages persistently.
SharedServices: Coordinates cross-thread references, thread tracking, routing, and other shared services.
BlockKeeperSet and BlockKeeperSetChange: Represent sets of block keepers responsible for block production, with updates communicated via channels.
OptimisticStateImpl: Represents the mutable optimistic state of the blockchain used for processing unfinalized blocks.
ZeroState: Provides initial state for threads when no prior state exists.
Telemetry and Metrics: Collect performance and operation metrics, e.g., block production times, account numbers.
File Structure and Workflow Diagram
classDiagram
class RepositoryImpl {
-data_dir: PathBuf
-zerostate_path: Option<PathBuf>
-metadatas: RepositoryMetadata
-saved_states: Arc<Mutex<HashMap<ThreadIdentifier, BTreeMap<BlockSeqNo, BlockIdentifier>>>>
-thread_last_finalized_state: Arc<Mutex<HashMap<ThreadIdentifier, Arc<OptimisticStateImpl>>>>
-shared_services: SharedServices
-nack_set_cache: Arc<Mutex<FixedSizeHashSet<UInt256>>>
-block_state_repository: BlockStateRepository
-optimistic_state: Arc<Mutex<HashMap<BlockIdentifier, Arc<OptimisticStateImpl>>>>
-accounts: AccountsRepository
-split_state: bool
-metrics: Option<BlockProductionMetrics>
-message_db: MessageDurableStorage
-message_storage_service: MessageDBWriterService
-states_cache_size: usize
-finalized_blocks: Arc<Mutex<FinalizedBlockStorage>>
-bk_set_update_tx: InstrumentedSender<BkSetUpdate>
-unfinalized_blocks: Arc<Mutex<HashMap<ThreadIdentifier, UnfinalizedCandidateBlockCollection>>>
-last_message_for_acc: Arc<Mutex<HashMap<AccountAddress, MessageIdentifier>>>
+new()
+get_optimistic_state()
+store_optimistic()
+mark_block_as_finalized()
+get_finalized_block()
+load_metadata()
+save_metadata()
+init_thread()
+clear_optimistic_states()
+sync_accounts_from_state()
}
class FinalizedBlockStorage {
-per_thread_buffer_size: usize
-buffer: HashMap<ThreadIdentifier, FixedSizeHashMap<BlockIdentifier, (BlockState, Arc<Envelope<GoshBLS, AckiNackiBlock>>)>>
+new()
+store()
+find()
}
class BkSetUpdate {
-seq_no: u32
-current: Option<Arc<BlockKeeperSet>>
-future: Option<Arc<BlockKeeperSet>>
-added_nodes: (HashSet<NodeIdentifier>, HashSet<NodeIdentifier>)
}
RepositoryImpl --> FinalizedBlockStorage : uses
RepositoryImpl --> AccountsRepository : uses
RepositoryImpl --> BlockStateRepository : uses
RepositoryImpl --> MessageDBWriterService : uses
RepositoryImpl --> SharedServices : uses
RepositoryImpl --> OptimisticStateImpl : manages
RepositoryImpl --> BkSetUpdate : sends updates via channel
Key Functions and Methods
load_from_file<T>(file_path: &PathBuf) -> anyhow::Result<Option<T>>
Loads and deserializes data of type T from a given file path.
Parameters
file_path: Path to the file to load.
Returns
Ok(Some(data))if file exists and deserialization succeeds.Ok(None)if the file does not exist.Errif reading or deserialization fails.
Usage
let metadata: Option<HashMap<ThreadIdentifier, Metadata<BlockIdentifier, BlockSeqNo>>> = load_from_file(&path)?;
save_to_file<T>(file_path: &PathBuf, data: &T, force_sync: bool) -> anyhow::Result<()>
Serializes and saves data atomically to a file.
Parameters
file_path: Target file path.data: Reference to data to serialize.force_sync: Whether to force filesystem sync after write.
Returns
Ok(())on success.Erron I/O or serialization failure.
Usage
save_to_file(&path, &metadata, true)?;
RepositoryImpl::new(...) -> Self
Constructs a new repository instance, initializes data directory, loads metadata and zerostate, sets up caches and services.
Parameters
data_dir: Root directory for repository data.zerostate_path: Optional path to zerostate.states_cache_size: Cache size for states.shared_services: Shared services instance.nack_set_cache: Cache for unresolved NACKs.split_state: Whether to split account states.block_state_repository: Repository for block states.metrics: Optional metrics collector.accounts_repository: Repository for accounts.
message_db: Durable message database.finalized_blocks: Cache of finalized blocks.bk_set_update_tx: Channel for block keeper set update notifications.
Returns