executor.rs
Overview
The executor.rs file defines the core logic for running the network layer's message processing and connection management within a distributed system. It primarily handles incoming and outgoing network messages, manages subscriptions, and oversees incoming network connections. This file orchestrates asynchronous tasks that listen for new connections and process subscription requests, ensuring efficient message dispatching and network communication. The main entry point is the asynchronous run function, which sets up and coordinates the pub-sub mechanisms and network transport.
Types and Structures
IncomingSender Enum
IncomingSender abstracts over two types of message senders for incoming network messages:
AsyncUnbounded(mpsc::UnboundedSender): A Tokio asynchronous unbounded channel sender.
SyncUnbounded(InstrumentedSender): A synchronous instrumented sender for telemetry-enabled message dispatch.
Methods
async fn send(&self, message: IncomingMessage) -> anyhow::Result<()>Sends an IncomingMessage through the wrapped sender.
Parameters:
message: The IncomingMessage to be sent.
Returns:
Ok(())if the message was successfully sent.Err(anyhow::Error)if the send operation fails.
Usage Example:
let incoming_sender = IncomingSender::AsyncUnbounded(some_sender); incoming_sender.send(message).await?;This method matches on the enum variant and attempts to send the message. If the underlying channel is closed or the send fails, it returns an error wrapping a descriptive message.
Functions
async fn run<Transport>(...) -> anyhow::Result<()>
This is the main asynchronous executor function responsible for running the network server tasks.
Generics:
Transport: A type implementing theNetTransporttrait, representing the network transport layer.
Parameters:
Name
Type
Description
shutdown_rxtokio::sync::watch::Receiver<bool>Receives shutdown signals to gracefully terminate operations.
config_rxtokio::sync::watch::Receiver<NetworkConfig>Receives updates to network configuration.
transportTransportThe network transport layer instance used for sending/receiving data.
is_proxyboolIndicates if the server operates as a proxy.
metricsOption<NetMetrics>Optional metrics collector for network statistics.
max_connectionsusizeMaximum number of concurrent connections allowed.
subscribe_rxtokio::sync::watch::Receiver<Vec<Vec<SocketAddr>>>Receives subscription updates, each a list of socket addresses for pub-sub subscriptions.
outgoing_txbroadcast::Sender<OutgoingMessage>Broadcast sender to publish outgoing messages to network subscribers.
incoming_txIncomingSenderSender that receives network messages to be processed by the pub-sub layer.
Returns:
Ok(())on successful server run completion.Err(anyhow::Error)if any task fails or errors out.
Functionality:
Logs the start of the server.
Creates a new
PubSubinstance wrapping the transport and proxy mode flag.Sets up a channel to track closed connections.
Spawns two primary asynchronous tasks:
listen_incoming_connections: Listens for new incoming network connections and handles incoming messages.handle_subscriptions: Manages subscription changes and message forwarding.
Uses
tokio::select!to concurrently await completion of either task and logs the task result usingtrace_task_result.
Interactions:
Utilizes
PubSubfor managing message routing and subscriptions.Communicates with
listen_incoming_connectionsandhandle_subscriptionsmodules for connection and subscription management.Sends/receives messages through
incoming_txandoutgoing_txchannels.Tracks connection lifecycle via an internal
connection_closedchannel.
Usage Example:
let result = run( shutdown_rx, config_rx, transport, false, Some(metrics), 100, subscribe_rx, outgoing_tx, incoming_tx, ).await?;
Implementation Details and Algorithms
The file leverages Tokio's asynchronous runtime to spawn independent tasks for connection listening and subscription handling, enabling concurrent network operations.
The
IncomingSenderenum encapsulates two different channel types transparently, allowing flexibility in sending incoming messages either via asynchronous or telemetry-instrumented synchronous channels.The
runfunction employstokio::select!to wait for either the connection listener or subscription handler task to complete or fail, ensuring the server responds promptly to errors or shutdown signals.The
connection_closedchannel is used to monitor connection termination events, facilitating resource cleanup and metrics updates.
Interactions with Other System Components
PubSub Module: The
PubSubstruct is central to managing subscriptions and message routing. It is cloned and passed to both connection listener and subscription handler tasks.listen_incoming_connections: This module manages network socket acceptance and message reception, forwarding incoming messages through
incoming_tx.handle_subscriptions: This module processes subscription updates and forwards outgoing messages via
outgoing_tx.Transport Layer (
NetTransport): Abstracts the underlying network transport mechanism used for data transmission.Metrics (
NetMetrics): Optional component to collect and report network-related statistics.Channels: Uses Tokio's
broadcastandmpscchannels for message dissemination and event signaling.
Mermaid Diagram
classDiagram
class IncomingSender {
+send()
}
class PubSub {
+clone()
}
class run {
+async fn
}
run --> IncomingSender : uses
run --> PubSub : creates/clones
run --> listen_incoming_connections : spawns task
run --> handle_subscriptions : spawns task
listen_incoming_connections ..> IncomingSender : sends messages
handle_subscriptions ..> IncomingSender : accesses
run --> metrics : optional
run --> transport : uses
This diagram illustrates the relationships between the main entities and functions in the file, highlighting run as the orchestrator that creates and manages PubSub, IncomingSender, and spawns asynchronous tasks for connection and subscription handling.