sender.rs

Overview

This file implements the DirectSender component, responsible for managing direct message sending to peers in a network. The DirectSender handles message dispatching, peer resolution, and peer sender lifecycle management, ensuring reliable delivery of network messages to targeted peers or addresses.

It operates by receiving messages with target receivers, resolving peer addresses when necessary, and forwarding messages to dedicated peer senders that manage connections and transmissions. The file also defines asynchronous tasks for peer address resolution and message forwarding coordination.

DirectSender Struct

The DirectSender struct is generic over PeerId and Transport types:

Fields

Methods

new

pub(crate) fn new(
    shutdown_rx: tokio::sync::watch::Receiver<bool>,
    network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
    transport: Transport,
    metrics: Option<NetMetrics>,
    messages_rx: tokio::sync::mpsc::UnboundedReceiver<(DirectReceiver<PeerId>, NetMessage, Instant)>,
    outgoing_reply_tx: tokio::sync::broadcast::Sender<OutgoingMessage>,
    incoming_reply_tx: IncomingSender,
    peer_resolver_rx: tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
) -> Self

Constructs a new DirectSender instance with the provided components.

run

pub(crate) async fn run(&mut self)

Main event loop that drives the DirectSender. It concurrently listens for:

The loop terminates on shutdown or channel closure.

Behavior:

dispatch_message

fn dispatch_message(
    &mut self,
    receiver: DirectReceiver<PeerId>,
    net_message: NetMessage,
    buffer_duration: Instant,
)

Dispatches a message to the intended receiver.

Parameters:

handle_peer_event

fn handle_peer_event(&mut self, event: PeerEvent<PeerId>)

Handles peer-related events:

try_send_to_peer

fn try_send_to_peer(
    &self,
    id: &PeerId,
    addr: SocketAddr,
    tx: &tokio::sync::mpsc::Sender<(NetMessage, Instant)>,
    message: NetMessage,
    buffer_duration: Instant,
)

Attempts to send a message to a peer sender channel non-blockingly.

start_peer_resolver

fn start_peer_resolver(&self, id: PeerId)

Spawns an asynchronous task to resolve the addresses of a peer with the given ID. The task interacts with:

start_peer_sender

fn start_peer_sender(
    &self,
    id: PeerId,
    addr: SocketAddr,
) -> tokio::sync::mpsc::Sender<(NetMessage, Instant)>

Starts a dedicated peer sender task for a specific peer address. This peer sender manages the actual network transport connection.

Asynchronous Functions

peer_resolver

async fn peer_resolver<PeerId>(
    id: PeerId,
    shutdown_rx: tokio::sync::watch::Receiver<bool>,
    peers_rx: tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
    peer_events_tx: tokio::sync::mpsc::UnboundedSender<PeerEvent<PeerId>>,
    metrics: Option<NetMetrics>,
)

Asynchronous loop that waits for peer addresses to be resolved from the shared peer data mapping. On success, sends an AddrsResolved event. Retries resolution with backoff defined by RESOLVE_RETRY_TIMEOUT until shutdown or resolution success.

resolve_peer_addrs

async fn resolve_peer_addrs<PeerId>(
    peers_rx: &mut tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
    id: PeerId,
    metrics: &Option<NetMetrics>,
) -> Vec<SocketAddr>

Helper function that attempts to fetch the addresses of a given peer ID from the latest resolved peer data. Retries with delay if no address is found.

Important Implementation Details and Algorithms

Interaction with Other System Components

Usage Example

// Assume necessary channels and components are created elsewhere:
// shutdown_rx, network_config_rx, transport, metrics, messages_rx, outgoing_reply_tx, incoming_reply_tx, peer_resolver_rx

let mut direct_sender = DirectSender::new(
    shutdown_rx,
    network_config_rx,
    transport,
    Some(metrics),
    messages_rx,
    outgoing_reply_tx,
    incoming_reply_tx,
    peer_resolver_rx,
);

// Run the direct sender event loop (typically in a dedicated Tokio task)
tokio::spawn(async move {
    direct_sender.run().await;
});

Mermaid Class Diagram

classDiagram
class DirectSender {
-shutdown_rx
-network_config_rx
-network_config
-transport
-metrics
-messages_rx
-outgoing_reply_tx
-incoming_reply_tx
-peer_resolver_rx
-unresolved_peers
-resolved_peers
-peer_events_tx
-peer_events_rx
+new()
+run()
-dispatch_message()
-handle_peer_event()
-try_send_to_peer()
-start_peer_resolver()
-start_peer_sender()
}
class PeerSender {
+new()
+run()
}
DirectSender --> PeerSender : spawns and communicates
DirectSender ..> NetTransport : uses
DirectSender ..> NetMessage : sends
DirectSender ..> PeerEvent : handles
DirectSender ..> NetworkConfig : reads
DirectSender ..> NetMetrics : reports metrics