repository_impl.rs

Overview

This file implements the core repository layer for managing blockchain blocks, states, and related metadata in a multi-threaded environment. It provides persistent storage, caching, and state management functionalities for finalized and optimistic (unfinalized) blockchain data structures. The repository manages block data, optimistic states, finalized blocks, accounts, and metadata related to threads (processing chains). It also facilitates synchronization, state snapshot application, and interaction with external storage services and metrics systems.

Key functionalities include:

The file interacts heavily with other components such as:


Entities and Structures

FinalizedBlockStorage

Manages a fixed-size per-thread in-memory cache of finalized blocks and their states.

let mut storage = FinalizedBlockStorage::new(100);
storage.store(block_state, block);
if let Some(cached_block) = storage.find(&block_id, &[thread_id]) {
    // Use cached_block
}

BkSetUpdate

Represents updates to block keeper sets with sequence numbers and added nodes.


RepositoryImpl

Primary implementation of the Repository trait providing block and state persistence and caching.


Metadata<TBlockIdentifier, TBlockSeqNo>

Holds per-thread metadata including the last finalized block identifier, sequence number, and optionally the last finalized producer node.


WrappedExtMessage<TMessage>

Wrapper for external messages with index and expiration timestamp.


ExtMessages<TMessage>

Queue wrapper holding multiple wrapped external messages.


ThreadSnapshot

Snapshot structure capturing all relevant thread state at a point in time for synchronization purposes.


Important Algorithms and Implementation Details

Optimistic State Loading and Caching

Finalized Block Storage

Block Finalization Process

State Splitting

Persistence and Atomic File Operations

Message Extraction for Database Storage


Interaction with Other System Components


File Structure and Workflow Diagram

classDiagram
class RepositoryImpl {
-data_dir: PathBuf
-zerostate_path: Option<PathBuf>
-metadatas: RepositoryMetadata
-saved_states: Arc<Mutex<HashMap<ThreadIdentifier, BTreeMap<BlockSeqNo, BlockIdentifier>>>>
-thread_last_finalized_state: Arc<Mutex<HashMap<ThreadIdentifier, Arc<OptimisticStateImpl>>>>
-shared_services: SharedServices
-nack_set_cache: Arc<Mutex<FixedSizeHashSet<UInt256>>>
-block_state_repository: BlockStateRepository
-optimistic_state: Arc<Mutex<HashMap<BlockIdentifier, Arc<OptimisticStateImpl>>>>
-accounts: AccountsRepository
-split_state: bool
-metrics: Option<BlockProductionMetrics>
-message_db: MessageDurableStorage
-message_storage_service: MessageDBWriterService
-states_cache_size: usize
-finalized_blocks: Arc<Mutex<FinalizedBlockStorage>>
-bk_set_update_tx: InstrumentedSender<BkSetUpdate>
-unfinalized_blocks: Arc<Mutex<HashMap<ThreadIdentifier, UnfinalizedCandidateBlockCollection>>>
-last_message_for_acc: Arc<Mutex<HashMap<AccountAddress, MessageIdentifier>>>
+new()
+get_optimistic_state()
+store_optimistic()
+mark_block_as_finalized()
+get_finalized_block()
+load_metadata()
+save_metadata()
+init_thread()
+clear_optimistic_states()
+sync_accounts_from_state()
}
class FinalizedBlockStorage {
-per_thread_buffer_size: usize
-buffer: HashMap<ThreadIdentifier, FixedSizeHashMap<BlockIdentifier, (BlockState, Arc<Envelope<GoshBLS, AckiNackiBlock>>)>>
+new()
+store()
+find()
}
class BkSetUpdate {
-seq_no: u32
-current: Option<Arc<BlockKeeperSet>>
-future: Option<Arc<BlockKeeperSet>>
-added_nodes: (HashSet<NodeIdentifier>, HashSet<NodeIdentifier>)
}
RepositoryImpl --> FinalizedBlockStorage : uses
RepositoryImpl --> AccountsRepository : uses
RepositoryImpl --> BlockStateRepository : uses
RepositoryImpl --> MessageDBWriterService : uses
RepositoryImpl --> SharedServices : uses
RepositoryImpl --> OptimisticStateImpl : manages
RepositoryImpl --> BkSetUpdate : sends updates via channel

Key Functions and Methods

load_from_file<T>(file_path: &PathBuf) -> anyhow::Result<Option<T>>

Loads and deserializes data of type T from a given file path.

let metadata: Option<HashMap<ThreadIdentifier, Metadata<BlockIdentifier, BlockSeqNo>>> = load_from_file(&path)?;

save_to_file<T>(file_path: &PathBuf, data: &T, force_sync: bool) -> anyhow::Result<()>

Serializes and saves data atomically to a file.

save_to_file(&path, &metadata, true)?;

RepositoryImpl::new(...) -> Self

Constructs a new repository instance, initializes data directory, loads metadata and zerostate, sets up caches and services.