mod.rs

Overview

This file implements a block subscriber component responsible for receiving, processing, and storing blockchain block data. It listens for incoming messages over a network socket, deserializes the block data, persists it into a SQLite database, and notifies other system components about new blocks through event propagation.

The core functionality revolves around asynchronous networking combined with a multi-threaded worker that handles database operations and inter-thread communication. It integrates closely with the database layer for persisting blocks, the event system for broadcasting block arrivals, and the network transport layer for receiving data.

Key Components

Enum: WorkerCommand

Defines commands sent to the worker thread managing block data processing:

Struct: BlockSubscriber

The main struct encapsulating the block subscription functionality.

Fields

Methods

new
pub fn new(
    db_file: PathBuf,
    socket_addr: SocketAddr,
    event_pub: Sender<Event>,
    bp_data_tx: Sender<(String, Vec<String>)>,
) -> Self

Constructs a new BlockSubscriber.

let subscriber = BlockSubscriber::new(
    PathBuf::from("blockchain.db"),
    "127.0.0.1:12345".parse().unwrap(),
    event_sender,
    bp_sender,
);
run
pub async fn run(
    &self,
    app_state: Arc<AppState>,
    metrics: Option<Metrics>,
    cmd_tx: mpsc::Sender<WorkerCommand>,
    cmd_rx: mpsc::Receiver<WorkerCommand>,
) -> anyhow::Result<()>

Starts the block subscriber operation:

subscriber.run(app_state, Some(metrics), cmd_tx, cmd_rx).await?;

Function: worker

fn worker(
    _db_file: impl AsRef<Path>,
    rx: mpsc::Receiver<WorkerCommand>,
    event_pub: mpsc::Sender<Event>,
    bp_data_tx: mpsc::Sender<(String, Vec<String>)>,
    app_state: Arc<AppState>,
    metrics: Option<Metrics>,
) -> anyhow::Result<()>

The worker function runs in a dedicated thread:


Async Function: listener

async fn listener(socket_addr: SocketAddr, tx: mpsc::Sender<WorkerCommand>) -> anyhow::Result<()>

Connects to the remote block source via a MsQuicTransport:


Function: connect

pub fn connect(db_file: impl AsRef<Path>) -> anyhow::Result<Connection>

Initializes the SQLite database connection:

let conn = connect("blockchain.db")?;

Implementation Details and Algorithms


Interaction with Other System Components


Structure Diagram

flowchart TD
BlockSubscriber["BlockSubscriber"]
Listener["listener()"]
Worker["worker()"]
EventPub["event_pub: Sender<Event>"]
BPDataTx["bp_data_tx: Sender<(String, Vec<String>)>"]
CmdTx["cmd_tx: Sender<WorkerCommand>"]
CmdRx["cmd_rx: Receiver<WorkerCommand>"]
SqliteHelper["SqliteHelper (SQLite DB)"]
AppState["AppState (Shared State)"]
Metrics["Metrics (Optional)"]
BlockSubscriber --> Listener
BlockSubscriber --> Worker
Listener -->|Sends WorkerCommand::Data| CmdTx
CmdRx --> Worker
Worker --> SqliteHelper
Worker --> EventPub
Worker --> BPDataTx
Worker --> AppState
Worker --> Metrics

This flowchart shows how the BlockSubscriber orchestrates the listener and worker components, with communication channels and dependencies on shared state, events, metrics, and database helpers.