executor.rs
Overview
This file contains the core execution logic responsible for initializing and running the block management system's main components. It orchestrates the setup of metrics, message routing, REST API server, block subscription, and event handling. The file provides the AppState struct to maintain shared application state and the asynchronous execute function which acts as the entry point for running the system with the provided configuration and communication channels. It also includes a utility function for resolving socket addresses.
Entities and Components
AppState Struct
Purpose: Holds shared application state and resources to be accessed concurrently by various components of the system.
Fields:
default_bp: SocketAddr
The default block producer's network address.message_router: Arc<MessageRouter>
A thread-safe reference to theMessageRouterinstance responsible for routing messages within the system.last_block_gen_utime: AtomicU64
An atomic timestamp of the last generated block in microseconds, enabling concurrent safe updates.
execute Function
pub async fn execute(
args: Args,
cmd_tx: mpsc::Sender<WorkerCommand>,
cmd_rx: mpsc::Receiver<WorkerCommand>,
) -> anyhow::Result<()>
Purpose:
Main asynchronous function to initialize system components, start services, and manage their lifecycles.Parameters:
args: Args
Command-line or configuration arguments containing essential runtime parameters such as REST API address and data source URL.cmd_tx: mpsc::Sender<WorkerCommand>
Sender channel for issuing commands to the block subscriber worker.cmd_rx: mpsc::Receiver<WorkerCommand>
Receiver channel for receiving commands from the block subscriber worker.
Returns:
AResultindicating success or failure (anyhow::Result<()>).Functionality:
Metrics Initialization:
Checks for a telemetry OTLP endpoint. If provided, sets up an OpenTelemetry meter provider and initializes metrics collection.Event Bus Setup:
Creates an MPSC channel for internal event communication (events::Event).Block Producer Data Channel:
Sets up a channel to transmit block producer information between threads.Message Router Setup:
Reads
BLOCK_MANAGER_APIenvironment variable for API binding address.Resolves socket addresses for both the stream source URL and default block producer (
DEFAULT_BP).Creates a
BPResolverImplinstance wrapped in a thread-safeMutexandArc.Starts the block producer listener on the resolver to handle updates asynchronously.
Loads optional owner wallet public key and signing keys for message routing security.
Instantiates the
MessageRouterwith the configured parameters.
REST API Server Launch:
Sets up a TCP listener on the provided REST API address (
args.rest_api).Creates a
salvo::Serverinstance that serves API routes with sharedAppState.
Block Subscriber Initialization:
Creates aBlockSubscriberinstance responsible for subscribing to block events from the blockchain or data stream source, passing event and block producer channels.Concurrent Execution and Lifecycle Management:
Usestokio::select!to concurrently run the REST API server and block subscriber tasks, monitoring for premature exits and returning errors if either terminates unexpectedly.
Usage Example:
// Assuming args and channels are prepared:
let result = executor::execute(args, cmd_tx.clone(), cmd_rx).await;
if let Err(e) = result {
eprintln!("Execution failed: {:?}", e);
}
parse_socket_address Function
fn parse_socket_address(hostname: &str, port: u16) -> std::io::Result<SocketAddr>
Purpose:
Utility function to resolve a hostname and port into a usableSocketAddr.Parameters:
hostname: &str— Hostname or IP address string to resolve.port: u16— Port number to combine with the hostname.
Returns:
Result<SocketAddr, std::io::Error>representing either a resolved socket address or an error if resolution fails.Details:
Uses Rust's standard library'sToSocketAddrstrait to perform DNS resolution and returns the first resolved address or an error if none are found.
Implementation Details and Algorithms
Concurrent State Management:
The use ofArcandMutexensures thread-safe sharing of mutable state (e.g.,BPResolverImpl) across asynchronous tasks.Event and Command Channels:
Multiple MPSC channels are used for decoupling and asynchronous communication:events::Eventchannel for internal event propagation.Block producer data channel to update block producer resolver with fresh IP addresses.
Command channels (
cmd_tx,cmd_rx) for controlling the block subscriber worker.
Telemetry Integration:
Conditional initialization of OpenTelemetry metrics based on environment variable presence allows flexible deployment without mandatory telemetry.Graceful Task Supervision:
tokio::select!allows running multiple asynchronous tasks concurrently while detecting if either task exits unexpectedly, enabling controlled shutdown or error propagation.
Interaction with Other Modules
block_subscriberModule:
Interacts via channels and command messages to subscribe and process blockchain events.bp_resolverModule:
Creates and managesBPResolverImplinstances to resolve block producer addresses dynamically.message_routerModule:
Sets up and configures message routing based on resolved block producer info and signing keys.rest_api_routesModule:
Provides the router for the REST API server, utilizing sharedAppState.metricsandtelemetry_utilsModules:
Used for initializing and managing telemetry and metrics collection.eventsModule:
Defines event types used for intra-system event communication.
Mermaid Diagram: File Structure and Workflow
flowchart TD
Execute["execute()"]
AppStateStruct["AppState Struct"]
ParseAddr["parse_socket_address()"]
Execute -->|Creates| AppStateStruct
Execute -->|Calls| ParseAddr
Execute -->|Initializes| Metrics["Metrics (optional)"]
Execute -->|Spawns| RestAPI["REST API Server"]
Execute -->|Spawns| BlockSub["Block Subscriber"]
Execute -->|Sets up| MsgRouter["MessageRouter"]
Execute -->|Uses| BPResolver["BPResolverImpl"]
Execute -->|Creates| EventBus["Event Bus (MPSC channel)"]
Execute -->|Creates| CmdChannels["Command Channels (MPSC)"]
RestAPI -->|Shares| AppStateStruct
BlockSub -->|Receives commands| CmdChannels
BlockSub -->|Publishes events| EventBus
MsgRouter -->|Uses| BPResolver
This diagram illustrates the primary components and their relationships during execution. The execute function is central, initializing shared state (AppState), setting up communication channels, and spawning the REST API server and block subscriber worker. The message router and block producer resolver play key roles in managing network addresses and message flow.