peer_sender.rs
Overview
The peer_sender.rs file implements the PeerSender struct and its associated asynchronous logic for managing outgoing direct peer-to-peer network communication. It is responsible for establishing and maintaining a connection to a remote peer, sending messages over this connection, and processing received messages and transfer results. The file leverages an abstracted transport layer (NetTransport and NetConnection) to support different network transports and protocols.
The main responsibilities of this file include:
Establishing and reconnecting outgoing connections to a given peer address.
Sending
NetMessageinstances asynchronously to the connected peer.Receiving incoming messages on the established connection.
Reporting the status of message transfers to metrics and event channels.
Handling shutdown signals for graceful termination of the sending loop.
This file interacts closely with modules related to network transport, message handling, metrics collection, and pub-sub connection management. It uses types such as NetMessage, NetTransport, ConnectionWrapper, and NetMetrics to coordinate the message delivery lifecycle.
Structs and Methods
PeerSender<PeerId, Transport>
The core struct representing a peer sender responsible for managing the connection and sending messages to a specific peer.
Type Parameters
PeerId: The type of the peer identifier. Must implementDisplay, Hash, Eq,Clone,Send, Sync, and'static.Transport: The network transport type implementing theNetTransporttrait with an associatedConnectiontype.
Fields
id: PeerId— Identifier of the peer.addr: SocketAddr— Socket address of the peer.connection: Option<Arc<ConnectionWrapper<Transport::Connection>>>— Optional active connection wrapper for the peer.shutdown_rx: tokio::sync::watch::Receiver<bool>— Receiver to listen for shutdown signals.transport: Transport— Network transport instance used to create connections.metrics: Option<NetMetrics>— Optional metrics collector for reporting.credential: NetCredential— Credentials used for authentication with the peer.peer_events_tx: tokio::sync::mpsc::UnboundedSender<PeerEvent<PeerId>>— Channel for sending peer events such as sender stop notifications.transfer_result_tx: tokio::sync::mpsc::Sender<(Result<usize, TransportError>, NetMessage, Instant)>— Channel sender for reporting transfer results.transfer_result_rx: tokio::sync::mpsc::Receiver<(Result<usize, TransportError>, NetMessage, Instant)>— Channel receiver for receiving transfer results.incoming_reply_tx: IncomingSender— Channel sender for forwarding incoming messages.
Methods
new
pub(crate) fn new(
id: PeerId,
addr: SocketAddr,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
transport: Transport,
metrics: Option<NetMetrics>,
credential: NetCredential,
peer_events_tx: tokio::sync::mpsc::UnboundedSender<PeerEvent<PeerId>>,
incoming_reply_tx: IncomingSender,
) -> Self
Creates a new PeerSender instance with the provided parameters.
Parameters:
id: The peer identifier.addr: The peer's socket address.shutdown_rx: Receiver to listen for shutdown signals.transport: The transport instance to use for network connections.metrics: Optional metrics collector.credential: Credentials to authenticate with the peer.peer_events_tx: Sender channel for peer events.incoming_reply_tx: Sender channel for incoming messages.
Returns: A new
PeerSenderinstance with uninitialized connection and transfer result channels.
run
pub(crate) async fn run(
&mut self,
mut outgoing_rx: tokio::sync::mpsc::Receiver<(NetMessage, Instant)>,
metrics: Option<NetMetrics>,
)
Runs the main asynchronous loop managing connection lifecycle, message sending, and receiving.
Parameters:
outgoing_rx: A receiver channel delivering outgoingNetMessageinstances paired with their enqueue timestamps.metrics: Optional metrics collector.
Behavior:
Continuously attempts to establish a connection to the peer.
Upon successful connection, listens concurrently for:
Shutdown signals.
Outgoing messages to send.
Incoming messages on the connection.
Transfer result notifications.
Handles connection errors and reconnection attempts using exponential backoff.
Sends peer events when the sender stops.
Usage Example:
let mut peer_sender = PeerSender::new(...); peer_sender.run(outgoing_rx, metrics).await;
connect_to_peer
async fn connect_to_peer(&self) -> anyhow::Result<ConnectionWrapper<Transport::Connection>>
Attempts to establish an outgoing connection to the peer's address, retrying with Fibonacci backoff on failure.
Returns: A wrapped
ConnectionWrapperon success, or an error wrapped inanyhow::Result.Implementation Details:
Uses
self.transport.connectwith the protocolACKI_NACKI_DIRECT_PROTOCOLand credentials.Wraps the connection in a
ConnectionWrapperwith assigned connection roleDirectReceiver.Retries indefinitely with increasing delay up to 1 hour, reporting warnings on each failure.
Usage Example:
let connection = peer_sender.connect_to_peer().await?;
start_transfer_message
fn start_transfer_message(&self, net_message: NetMessage, buffer_duration: Instant)
Initiates an asynchronous task to transfer a given message over the active connection.
Parameters:
net_message: TheNetMessageinstance to send.buffer_duration: Timestamp marking when the message was buffered.
Behavior:
Checks if a connection exists; if not, returns immediately.
Spawns a Tokio task that:
Updates metrics for outgoing buffer and transfer phases.
Calls the
transferfunction to send the message data asynchronously.Sends the transfer result back through the
transfer_result_txchannel.
Usage Example:
peer_sender.start_transfer_message(message, Instant::now());
handle_transfer_result
async fn handle_transfer_result(
&mut self,
transfer_result: (Result<usize, TransportError>, NetMessage, Instant)
) -> bool
Processes the result of a message transfer.
Parameters:
transfer_result: A tuple containing the transfer result (Ok(bytes_sent)orErr), the original message, and the transfer start instant.
Returns:
trueif the transfer succeeded and the connection remains usable;falseotherwise.Behavior:
On success, logs debug information and updates sent bytes metrics.
On failure, logs error details, reports metrics, and closes the connection.
Usage Example:
let continue_running = peer_sender.handle_transfer_result(result).await; if !continue_running { // handle reconnect or shutdown }
Free Functions
receive_message
async fn receive_message<Connection: NetConnection>(
connection: Option<Arc<ConnectionWrapper<Connection>>>,
metrics: Option<NetMetrics>,
incoming_reply_tx: IncomingSender,
) -> bool
Receives and processes a message from the given connection.
Parameters:
connection: Optional wrapped connection from which to receive data.metrics: Optional metrics collector.incoming_reply_tx: Sender for forwarding deserialized incoming messages.
Returns:
trueif message receiving and forwarding succeeded;falseif an error occurred or the connection is absent.Behavior:
Awaits reception of raw data on the connection.
Attempts to deserialize the data into a
NetMessageusingbincode.Updates metrics and logs the incoming message.
Forwards deserialized message wrapped in
IncomingMessagetoincoming_reply_tx.Returns
falseif deserialization fails or the consumer is disconnected.
Usage Example:
let ok = receive_message(connection, metrics, incoming_reply_tx).await; if !ok { // handle failure }
Important Implementation Details and Algorithms
Connection Management:
Uses an exponential Fibonacci backoff strategy (via
tokio_retry::strategy::FibonacciBackoff) to retry connection attempts after failures, with a maximum delay of 1 hour.Wraps connections with
ConnectionWrapperto maintain metadata such as remote host ID and connection role.Connection roles are assigned as
DirectReceiverfor outgoing connections.
Asynchronous Concurrency:
The
runmethod uses nestedtokio::select!blocks to concurrently:Listen for shutdown signals.
Handle outgoing message requests.
Receive incoming messages.
Process transfer results.
Message transfers are delegated to background tasks spawned via
tokio::spawnto avoid blocking the main loop.
Metrics and Tracing:
Metrics are updated at various delivery phases (
OutgoingBuffer,OutgoingTransfer,IncomingBuffer).Transfer durations are measured using
Instanttimestamps.The
tracingcrate is used extensively for logging debug, trace, warning, and error events with contextual information such as peer ID, message IDs, and host IDs.
Error Handling:
Transfer errors result in closing the current connection and terminating the inner message sending loop to trigger reconnect.
Deserialization failures and receive errors cause the receive loop to terminate and report errors.
Interactions with Other Parts of the System
Transport Layer:
Uses
NetTransportfor connection establishment andNetConnectionfor message sending and receiving.The
transferfunction from thetransfermodule handles actual message data transfer asynchronously.
Message Handling:
Works with
NetMessagefor representing network messages.Receives raw message data and deserializes it for forwarding.
Sends outgoing messages received from an external channel.
Pub-Sub and Connection Management:
Utilizes
ConnectionWrapperfor connection metadata.Uses
PeerEventfor notifying peers about sender lifecycle events.Uses
IncomingSenderfor forwarding incoming messages to pub-sub components.
Metrics:
Reports metrics via
NetMetricsfor bytes sent/received, errors, and delivery phase timing.
Shutdown Coordination:
Listens for shutdown signals via a
tokio::sync::watch::Receiver<bool>.Sends
PeerEvent::SenderStoppedwhen stopping.
Visual Diagram
classDiagram
class PeerSender {
- id: PeerId
- addr: SocketAddr
- connection: Option<Arc<ConnectionWrapper>>
- shutdown_rx: watch::Receiver<bool>
- transport: Transport
- metrics: Option<NetMetrics>
- credential: NetCredential
- peer_events_tx: mpsc::UnboundedSender<PeerEvent>
- transfer_result_tx: mpsc::Sender<(Result<usize, TransportError>, NetMessage, Instant)>
- transfer_result_rx: mpsc::Receiver<(Result<usize, TransportError>, NetMessage, Instant)>
- incoming_reply_tx: IncomingSender
+ new(...)
+ run(outgoing_rx, metrics)
- connect_to_peer() : Result<ConnectionWrapper>
- start_transfer_message(net_message, buffer_duration)
- handle_transfer_result(transfer_result) : bool
}
class receive_message {
<<function>>
+ receive_message(connection, metrics, incoming_reply_tx) : bool
}
PeerSender --> ConnectionWrapper : manages
PeerSender --> NetTransport : uses
PeerSender --> NetMetrics : reports metrics
PeerSender --> PeerEvent : sends events
PeerSender --> NetMessage : sends messages
PeerSender --> IncomingSender : sends incoming messages
PeerSender --> transfer : calls async transfer
receive_message ..> ConnectionWrapper : receives from
receive_message ..> NetMetrics : reports metrics
receive_message ..> IncomingSender : forwards messages
This diagram depicts the main struct PeerSender with its key fields and methods, as well as its interaction with the receive_message function. The relationships illustrate how PeerSender manages connection wrappers, interacts with transport and metrics systems, and coordinates message sending and event reporting.