mod.rs
Overview
This file implements a block subscriber component responsible for receiving, processing, and storing blockchain block data. It listens for incoming messages over a network socket, deserializes the block data, persists it into a SQLite database, and notifies other system components about new blocks through event propagation.
The core functionality revolves around asynchronous networking combined with a multi-threaded worker that handles database operations and inter-thread communication. It integrates closely with the database layer for persisting blocks, the event system for broadcasting block arrivals, and the network transport layer for receiving data.
Key Components
Enum: WorkerCommand
Defines commands sent to the worker thread managing block data processing:
Data(Vec<u8>): Contains raw bytes of incoming block data to be processed.RotateDb: Command to rotate the SQLite database file.Shutdown: Command to gracefully shut down the worker, including closing DB resources.
Struct: BlockSubscriber
The main struct encapsulating the block subscription functionality.
Fields
db_file: PathBuf
Path to the SQLite database file where blocks are stored.socket_addr: SocketAddr
Network socket address to listen for incoming block data.event_pub: Sender<Event>
Channel sender for publishing events (e.g., new blocks).bp_data_tx: Sender<(String, Vec<String>)>
Channel sender for sending block producer (BP) related data.
Methods
new
pub fn new(
db_file: PathBuf,
socket_addr: SocketAddr,
event_pub: Sender<Event>,
bp_data_tx: Sender<(String, Vec<String>)>,
) -> Self
Constructs a new BlockSubscriber.
Parameters:
db_file: Path to the SQLite DB file.socket_addr: Socket address to listen on.event_pub: Event channel sender.bp_data_tx: Channel sender for BP data.
Panics:
If the database file cannot be opened or created.Usage Example:
let subscriber = BlockSubscriber::new(
PathBuf::from("blockchain.db"),
"127.0.0.1:12345".parse().unwrap(),
event_sender,
bp_sender,
);
run
pub async fn run(
&self,
app_state: Arc<AppState>,
metrics: Option<Metrics>,
cmd_tx: mpsc::Sender<WorkerCommand>,
cmd_rx: mpsc::Receiver<WorkerCommand>,
) -> anyhow::Result<()>
Starts the block subscriber operation:
Spawns a listener to receive incoming network connections.
Spawns a dedicated blocking worker thread to process the data received.
Manages lifecycle and error handling of these concurrent tasks.
Parameters:
app_state: Shared application state.metrics: Optional metrics collector.cmd_tx: Command sender to the worker.cmd_rx: Command receiver for the worker.
Returns:
Result indicating success or failure of the subscriber runtime.Usage Example:
subscriber.run(app_state, Some(metrics), cmd_tx, cmd_rx).await?;
Function: worker
fn worker(
_db_file: impl AsRef<Path>,
rx: mpsc::Receiver<WorkerCommand>,
event_pub: mpsc::Sender<Event>,
bp_data_tx: mpsc::Sender<(String, Vec<String>)>,
app_state: Arc<AppState>,
metrics: Option<Metrics>,
) -> anyhow::Result<()>
The worker function runs in a dedicated thread:
Listens for
WorkerCommands sent overrx.On receiving
Data, deserializes block envelopes, extracts metadata, and persists block data into SQLite via a helper.Sends block producer info to
bp_data_tx.Updates application state timestamps and metrics.
Handles database rotation and shutdown commands.
Reports processing status via logs and event channel.
Parameters:
_db_file: Path to the database file (not used directly here).rx: Receiver for commands.event_pub: Event sender.bp_data_tx: Sender for block producer data.app_state: Shared application state.metrics: Optional metrics collector.
Returns:
Result indicating worker success or failure.Implementation Details:
Uses
SqliteHelperfromdatabase::sqlite::sqlite_helperfor SQLite management.Maintains a transaction trace map and shard state cache.
Uses
bincodefor deserialization.Updates atomic timestamp
last_block_gen_utimeinapp_state.Reports metrics using the
bmmetrics submodule.On
RotateDbcommand, invokesrotate_db_file()on the SQLite helper.On
Shutdowncommand, creates a checkpoint and prepares DB for shutdown.
Async Function: listener
async fn listener(socket_addr: SocketAddr, tx: mpsc::Sender<WorkerCommand>) -> anyhow::Result<()>
Connects to the remote block source via a MsQuicTransport:
Attempts to connect repeatedly with exponential backoff.
Upon connection, receives messages asynchronously.
Each received message is sent to the worker thread as
WorkerCommand::Data.Logs connection status and errors.
Parameters:
socket_addr: Socket address to connect to.tx: Command sender to the worker.
Returns:
Result indicating listener success or failure.Implementation Details:
Uses
transport_layer::msquic::MsQuicTransportfor QUIC connection.Generates self-signed network credentials for connection.
On message receipt, logs message size and latency before sending data.
On errors or disconnects, retries connection with a 1-second delay.
Function: connect
pub fn connect(db_file: impl AsRef<Path>) -> anyhow::Result<Connection>
Initializes the SQLite database connection:
Opens or creates the SQLite database file.
Configures SQLite pragmas for performance tuning:
Disables journal mode.
Sets synchronous to 0 (off).
Sets cache size.
Uses exclusive locking mode.
Stores temporary data in memory.
Creates the
raw_blockstable if it does not exist.Parameters:
db_file: Path to the SQLite database file.
Returns:
SQLiteConnectionwrapped inanyhow::Result.Panics:
If the database cannot be opened.Usage Example:
let conn = connect("blockchain.db")?;
Implementation Details and Algorithms
Data Flow:
Thelistenerasynchronously receives raw block data over a QUIC connection, wraps it intoWorkerCommand::Data, and sends it to theworkerthread. Theworkerdeserializes the block envelope, extracts relevant metadata (such as thread ID and block time), persists the block using theSqliteHelper, updates application state timestamps, and reports metrics.Concurrency Model:
The
listenerruns asynchronously usingtokio.The
workerruns in a dedicated blocking thread to prevent blocking the async runtime.Communication between components is performed via
mpsc::channels, allowing safe cross-thread message passing.
Database Management:
TheSqliteHelperwrapper manages the SQLite connection and operations. The worker uses a mutex-guardedArcreference to the helper for thread-safe access. Database rotation and shutdown commands allow managing the database lifecycle without disrupting data ingestion.Error Handling:
Errors during deserialization, database operations, or channel communication are logged withtracing. The worker continues processing unless a critical unrecoverable error occurs.
Interaction with Other System Components
AppState (
AppState):
Holds shared state such as the timestamp of the last block generation, updated atomically by the worker.Metrics (
Metrics):
Optionally collects and reports blockchain metrics, such as the last finalized sequence number per thread.Event System (
Event):
The worker publishesEvent::NewBlocknotifications to inform other subsystems of new block arrivals.Database Layer (
database::sqlite::sqlite_helper):
Provides SQLite management facilities, including connection configuration and block serialization integration.Network Transport (
transport_layer::msquic):
Provides the QUIC transport protocol implementation used for receiving block data.Block and BLS Types (
node::bls,node::types):
Represent cryptographic envelopes and block data structures deserialized and processed in the worker.
Structure Diagram
flowchart TD
BlockSubscriber["BlockSubscriber"]
Listener["listener()"]
Worker["worker()"]
EventPub["event_pub: Sender<Event>"]
BPDataTx["bp_data_tx: Sender<(String, Vec<String>)>"]
CmdTx["cmd_tx: Sender<WorkerCommand>"]
CmdRx["cmd_rx: Receiver<WorkerCommand>"]
SqliteHelper["SqliteHelper (SQLite DB)"]
AppState["AppState (Shared State)"]
Metrics["Metrics (Optional)"]
BlockSubscriber --> Listener
BlockSubscriber --> Worker
Listener -->|Sends WorkerCommand::Data| CmdTx
CmdRx --> Worker
Worker --> SqliteHelper
Worker --> EventPub
Worker --> BPDataTx
Worker --> AppState
Worker --> Metrics
This flowchart shows how the BlockSubscriber orchestrates the listener and worker components, with communication channels and dependencies on shared state, events, metrics, and database helpers.