sqlite_helper.rs
Overview
This file provides functionality for managing SQLite database operations related to archival storage of blockchain data structures such as blocks, accounts, messages, and transactions. It acts as a helper layer to facilitate concurrent insertion and management of these records into an SQLite database, using a background worker thread for asynchronous writes. The implementation includes database connection setup with optimized SQLite pragmas, database file rotation for archival, and transactional batch inserts for performance.
The file interacts closely with the domain-specific data structures: ArchBlock, ArchAccount, ArchMessage, ArchTransaction, and the DocumentsDb trait for storing various records in the database.
Key Structs and Types
SqliteHelperConfig
Configuration struct holding paths for data directory and database file.
Fields:
data_dir: PathBuf— Directory where database files are located.db_file: PathBuf— SQLite database filename.
Methods:
new(data_dir: PathBuf, db_file: Option<PathBuf>) -> Self
Creates a new config using the provided data directory and an optional database file name. Defaults to"bm-archive.db"if no file is provided.
Usage Example:
let config = SqliteHelperConfig::new(PathBuf::from("./data"), None);
SqliteHelperContext
Holds the configuration and a thread-safe SQLite connection wrapped in a mutex for synchronized access.
Fields:
config: SqliteHelperConfigconn: Arc<Mutex<rusqlite::Connection>>— Shared connection wrapped inArcand locked byMutex.
DBFiles
Internal struct managing paths to key database files for rotation and archival.
Fields:
empty: PathBuf— Path to an empty schema database file used as a template.next: PathBuf— Path to the next working database file during rotation.work: PathBuf— Current working database file.
SqliteHelper
Primary struct managing the SQLite helper functionality.
Fields:
record_sender: Sender<DBStoredRecord>— Channel sender for asynchronously sending records to the worker thread.config: SqliteHelperConfigconn: Arc<Mutex<rusqlite::Connection>>db_files: DBFiles
Key Methods:
from_config(config: SqliteHelperConfig) -> anyhow::Result<(Self, thread::JoinHandle<()>)>
Initializes a newSqliteHelperinstance:Creates an SQLite connection to the configured database file.
Spawns a background thread running
put_records_workerto handle record insertion asynchronously.Prepares database file paths for rotation.
create_connection(db_path: PathBuf) -> anyhow::Result<rusqlite::Connection>
Opens an SQLite connection with read-write flags and applies performance-related pragmas such as journal mode, synchronous level, and mmap size.create_connection_ro(db_path: PathBuf) -> anyhow::Result<rusqlite::Connection>
Opens a read-only SQLite connection with URI and no mutex flags.shutdown(&mut self) -> anyhow::Result<()>
Gracefully shuts down the helper by replacing the sender channel to stop the worker thread and triggers a WAL checkpoint.rotate_db_file(&mut self) -> anyhow::Result<thread::JoinHandle<()>>
Rotates the current database file:Copies an empty database schema file to a "next" file.
Stops the current writer worker by replacing the sender.
Renames and archives the current DB file with timestamp suffix.
Renames the "next" file to the working DB file.
Creates a new connection and spawns a new writer worker thread.
Performs WAL checkpoint on the archived file for consistency.
Record Insertion Methods:
Each record type has a dedicated internal method that performs batched insertion in a SQLite transaction using prepared statements. These are called from the background worker thread.
store_accounts(context: &mut SqliteHelperContext, accounts: Vec<ArchAccount>) -> anyhow::Result<()>
Inserts or updates accounts in theaccountstable.store_block(context: &mut SqliteHelperContext, block: Box<ArchBlock>) -> anyhow::Result<()>
Inserts or updates a block in theblockstable. Supports conditional compilation for storing only event-related fields.store_messages(context: &mut SqliteHelperContext, messages: Vec<ArchMessage>) -> anyhow::Result<()>
Inserts or updates messages in themessagestable. Supports conditional compilation for simplified storage.store_transactions(context: &mut SqliteHelperContext, transactions: Vec<ArchTransaction>) -> anyhow::Result<()>
Inserts transactions into thetransactionstable, converted first to a flattened representation.
Background Worker:
put_records_worker(receiver: Receiver<DBStoredRecord>, context: &mut SqliteHelperContext)
Runs in a dedicated thread, consumes records sent over the channel, and dispatches them to appropriate store methods. Logs errors and panics on block insertion failures.
Trait Implementation: DocumentsDb for SqliteHelper
SqliteHelper implements the DocumentsDb trait for asynchronous insertion of blockchain data.
put_block(item: ArchBlock) -> anyhow::Result<()>
Sends a block wrapped inDBStoredRecord::Blockto the worker thread.put_accounts(items: Vec<ArchAccount>) -> anyhow::Result<()>
Sends accounts unless thestore_events_onlyfeature is enabled.put_messages(items: Vec<ArchMessage>) -> anyhow::Result<()>
Sends messages for insertion.put_transactions(items: Vec<ArchTransaction>) -> anyhow::Result<()>
Sends transactions unless thestore_events_onlyfeature is enabled.has_delivery_problems() -> bool
Always returnsfalse; no delivery problems tracked here.
Utility Functions
unprefix_opt_u64str(value: Option<String>) -> Option<String>
Removes the first character from a string wrapped inOption.unprefix_opt_u128str(value: Option<String>) -> Option<String>
Removes the first two characters from a string wrapped inOption.append_suffix(path: &Path, suffix: &str) -> PathBuf
Appends a suffix to the filename component of aPath.rename_with_suffixes(work: &Path, archived: &Path) -> std::io::Result<()>
Renames the main database file and associated SQLite write-ahead log (WAL) and shared memory (SHM) files by appending suffixes for archival purposes.print_sqlite_info(conn: &rusqlite::Connection) -> anyhow::Result<()>
Queries and prints SQLite database pragmas such as journal mode, synchronous level, WAL checkpoint status, page size, cache size, and schema version.
Implementation Details and Algorithms
Concurrency Model:
The file uses Rust'sstd::sync::mpscchannel for asynchronous communication between the producer (main thread) and the consumer (worker thread). Data records are sent to the worker thread, which batches inserts in transactions to improve SQLite performance.Database Connection:
SQLite connections are wrapped inArc<Mutex<>>to enable shared ownership and safe concurrent access control. Thecreate_connectionmethod applies pragmas to optimize for WAL mode (wal2), checkpointing, memory usage, and mmap to improve IO performance.Database Rotation:
When rotating the database file, the current DB is renamed with a timestamp suffix for archival, and a fresh empty DB schema copy replaces the working DB. WAL and SHM files are also renamed accordingly to preserve consistency.Batch Insertions:
Each record type uses prepared statements and transactions to batch insert multiple records efficiently. The code handles upserts where applicable (e.g., accounts and messages usingON CONFLICT DO UPDATE).Error Handling:
Errors during record storage are logged withtracing::error. In the case of block storage failure, the thread panics as it is considered fatal.Feature Flags:
The behavior for storing full records versus event-only data is controlled via thestore_events_onlyfeature flag, allowing reduced storage footprint when enabled.
Interaction with Other Parts of the System
Uses domain-specific structs:
ArchAccountArchBlockArchMessageArchTransactionFlatTransaction(for flattened transaction representation on insert)
Implements the
DocumentsDbtrait, providing database persistence for archival data within the larger system.Relies on the
DBStoredRecordenum to differentiate record types sent via channels.Uses external crates:
rusqlitefor SQLite DB handlingparking_lot::Mutexfor efficient lockingchronofor timestamping during DB rotation
Visual Diagram
flowchart TD
A[SqliteHelper] -->|uses| B[SqliteHelperConfig]
A -->|manages| C[DBFiles]
A -->|holds| D["record_sender(Channel Sender)"]
A -->|holds| E["conn(Arc<Mutex<Connection>>)"]
D -->|sends DBStoredRecord| F[put_records_worker Thread]
F -->|calls| G["store_block()"]
F -->|calls| H["store_accounts()"]
F -->|calls| I["store_messages()"]
F -->|calls| J["store_transactions()"]
A -->|implements| K[DocumentsDb Trait]
K -->|calls| D
subgraph DB Rotation
A --> L["rotate_db_file()"]
L --> M["rename_with_suffixes()"]
L -->|creates| E
L -->|spawns| F
end
Tests
Contains a unit test test_sqlite_features that:
Creates a temporary SQLite database with appropriate flags.
Sets journal_mode to
wal2.Creates a temporary table and inserts a record.
Queries and asserts the inserted data.
Prints SQLite info using
print_sqlite_info.
This file provides a robust, concurrent SQLite storage helper tailored for blockchain archival data, focusing on performance and safe multi-threaded operation, with mechanisms for database file lifecycle management. It serves as a critical persistence layer in the system's data storage architecture.