sender.rs
Overview
This file implements the DirectSender component, responsible for managing direct message sending to peers in a network. The DirectSender handles message dispatching, peer resolution, and peer sender lifecycle management, ensuring reliable delivery of network messages to targeted peers or addresses.
It operates by receiving messages with target receivers, resolving peer addresses when necessary, and forwarding messages to dedicated peer senders that manage connections and transmissions. The file also defines asynchronous tasks for peer address resolution and message forwarding coordination.
DirectSender Struct
The DirectSender struct is generic over PeerId and Transport types:
PeerId: Identifier type for peers; constrained to implementDisplay,Hash, Eq,Clone,Send, and Sync.Transport: Network transport implementing theNetTransporttrait.
Fields
shutdown_rx: Watch receiver to signal shutdown.network_config_rx: Watch receiver for dynamic network configuration updates.network_config: Current network configuration snapshot.transport: Transport instance for network communication.metrics: Optional metrics collector for monitoring network operations.messages_rx: Unbounded receiver channel for incoming messages to dispatch. Messages are tuples of(DirectReceiver<PeerId>, NetMessage, Instant).outgoing_reply_tx: Broadcast sender for outgoing messages replies.incoming_reply_tx: Sender for incoming replies.peer_resolver_rx: Watch receiver providing updated mappings of peers to theirPeerData(addresses, etc.).unresolved_peers: Cache for messages targeting peers whose addresses are not yet resolved. MapsPeerIdto a vector of(NetMessage, Instant).resolved_peers: Cache for resolved peer addresses paired with their message senders. MapsPeerIdto a vector of(SocketAddr, Sender<(NetMessage, Instant)>).peer_events_tx: Unbounded sender for peer-related events (e.g., address resolution, sender shutdown).peer_events_rx: Unbounded receiver for peer-related events.
Methods
new
pub(crate) fn new(
shutdown_rx: tokio::sync::watch::Receiver<bool>,
network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
transport: Transport,
metrics: Option<NetMetrics>,
messages_rx: tokio::sync::mpsc::UnboundedReceiver<(DirectReceiver<PeerId>, NetMessage, Instant)>,
outgoing_reply_tx: tokio::sync::broadcast::Sender<OutgoingMessage>,
incoming_reply_tx: IncomingSender,
peer_resolver_rx: tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
) -> Self
Constructs a new DirectSender instance with the provided components.
Parameters:
shutdown_rx: Signals component shutdown.network_config_rx: Provides network configuration updates.transport: Network transport instance.metrics: Optional metrics collector.messages_rx: Receiver for inbound messages to dispatch.outgoing_reply_tx: Sender for outgoing replies.incoming_reply_tx: Sender for incoming replies.peer_resolver_rx: Provides current peer address mappings.
Returns: A new
DirectSenderinstance initialized with empty peer caches and event channels.
run
pub(crate) async fn run(&mut self)
Main event loop that drives the DirectSender. It concurrently listens for:
Shutdown signals.
Network configuration updates.
Incoming messages to dispatch.
Peer events (address resolution, sender shutdown).
The loop terminates on shutdown or channel closure.
Behavior:
On receiving a message, calls
dispatch_message.On receiving a peer event, calls
handle_peer_event.Updates internal network configuration on config changes.
dispatch_message
fn dispatch_message(
&mut self,
receiver: DirectReceiver<PeerId>,
net_message: NetMessage,
buffer_duration: Instant,
)
Dispatches a message to the intended receiver.
If the receiver is a peer ID:
If resolved peer addresses exist, attempts to send the message to all resolved addresses.
If unresolved, queues the message and triggers peer address resolution.
If the receiver is a socket address, forwards the message via the
outgoing_reply_txchannel.
Parameters:
receiver: Target receiver, either a peer ID or socket address.net_message: The network message to send.buffer_duration: Timestamp indicating when the message was buffered.
handle_peer_event
fn handle_peer_event(&mut self, event: PeerEvent<PeerId>)
Handles peer-related events:
AddrsResolved: When peer addresses are resolved, starts peer senders for each address, sends queued messages, and caches senders.SenderStopped: Cleans up peer sender references upon sender termination.
try_send_to_peer
fn try_send_to_peer(
&self,
id: &PeerId,
addr: SocketAddr,
tx: &tokio::sync::mpsc::Sender<(NetMessage, Instant)>,
message: NetMessage,
buffer_duration: Instant,
)
Attempts to send a message to a peer sender channel non-blockingly.
Logs errors if the sender channel is full or closed.
Reports metrics for delivery failures.
If sending fails, finishes the delivery phase metric with appropriate details.
start_peer_resolver
fn start_peer_resolver(&self, id: PeerId)
Spawns an asynchronous task to resolve the addresses of a peer with the given ID. The task interacts with:
Shutdown signals.
The peer resolver watch channel for address data.
Peer events channel to notify about resolution results.
start_peer_sender
fn start_peer_sender(
&self,
id: PeerId,
addr: SocketAddr,
) -> tokio::sync::mpsc::Sender<(NetMessage, Instant)>
Starts a dedicated peer sender task for a specific peer address. This peer sender manages the actual network transport connection.
Returns the sender channel for sending messages to the peer sender.
Spawns a critical task running the
PeerSenderinstance'srunmethod.
Asynchronous Functions
peer_resolver
async fn peer_resolver<PeerId>(
id: PeerId,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
peers_rx: tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
peer_events_tx: tokio::sync::mpsc::UnboundedSender<PeerEvent<PeerId>>,
metrics: Option<NetMetrics>,
)
Asynchronous loop that waits for peer addresses to be resolved from the shared peer data mapping. On success, sends an AddrsResolved event. Retries resolution with backoff defined by RESOLVE_RETRY_TIMEOUT until shutdown or resolution success.
Handles shutdown signals gracefully.
Reports resolution failures via metrics.
resolve_peer_addrs
async fn resolve_peer_addrs<PeerId>(
peers_rx: &mut tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
id: PeerId,
metrics: &Option<NetMetrics>,
) -> Vec<SocketAddr>
Helper function that attempts to fetch the addresses of a given peer ID from the latest resolved peer data. Retries with delay if no address is found.
Returns a vector of socket addresses associated with the peer.
Important Implementation Details and Algorithms
Message Dispatching: The dispatching logic distinguishes between direct peer ID receivers and raw socket address receivers, maintaining separate caches for unresolved and resolved peers.
Peer Resolution: Uses asynchronous tasks to resolve peer addresses with retries and backoff, ensuring messages are buffered until resolution completes.
Concurrent Event Handling: The
runmethod usestokio::select!to efficiently handle multiple concurrent event sources (shutdown, config changes, message inputs, peer events).Backpressure Handling: When forwarding messages to peer sender channels, non-blocking
try_sendis used. If senders are lagging or closed, errors are logged and metrics updated.Metrics Integration: Various stages report errors and delivery phases to
NetMetrics, aiding in performance monitoring and diagnostics.Peer Sender Lifecycle: Each resolved address spawns a dedicated
PeerSendertask that manages connection and message transmission to that peer endpoint.
Interaction with Other System Components
NetTransportTrait: Abstracts the underlying network transport layer used byPeerSenderinstances to send messages.PeerSender: Manages actual sending of messages to individual peer addresses;DirectSenderorchestrates and communicates with these.NetworkConfig: Provides configuration parameters, including credentials, for message authentication and connection management.PeerData: Represents metadata about peers such as network addresses.PeerEvent: Events related to peer address resolution and sender lifecycle, used to coordinate internal state transitions.NetMessage: The network message type being transmitted.OutgoingMessage,IncomingSender: Used for handling replies and incoming message flow.spawn_critical_task: Utility for spawning tasks that are monitored for system criticality and metrics.Metrics (
NetMetrics): Optional component to monitor and report network performance and errors.
Usage Example
// Assume necessary channels and components are created elsewhere:
// shutdown_rx, network_config_rx, transport, metrics, messages_rx, outgoing_reply_tx, incoming_reply_tx, peer_resolver_rx
let mut direct_sender = DirectSender::new(
shutdown_rx,
network_config_rx,
transport,
Some(metrics),
messages_rx,
outgoing_reply_tx,
incoming_reply_tx,
peer_resolver_rx,
);
// Run the direct sender event loop (typically in a dedicated Tokio task)
tokio::spawn(async move {
direct_sender.run().await;
});
Mermaid Class Diagram
classDiagram
class DirectSender {
-shutdown_rx
-network_config_rx
-network_config
-transport
-metrics
-messages_rx
-outgoing_reply_tx
-incoming_reply_tx
-peer_resolver_rx
-unresolved_peers
-resolved_peers
-peer_events_tx
-peer_events_rx
+new()
+run()
-dispatch_message()
-handle_peer_event()
-try_send_to_peer()
-start_peer_resolver()
-start_peer_sender()
}
class PeerSender {
+new()
+run()
}
DirectSender --> PeerSender : spawns and communicates
DirectSender ..> NetTransport : uses
DirectSender ..> NetMessage : sends
DirectSender ..> PeerEvent : handles
DirectSender ..> NetworkConfig : reads
DirectSender ..> NetMetrics : reports metrics