sender.rs
Overview
This file implements the core functionality for asynchronously sending messages over a network connection within a pub-sub communication system. It provides the sender async function that manages the lifecycle of outgoing messages from a broadcast channel, handling shutdown signals and connection closures gracefully. The file also defines an internal helper function send_message which performs the actual message transfer process, including metrics tracking and error handling.
The sender continuously listens for new outgoing messages and sends them through a wrapped network connection, ensuring proper interaction with shutdown signals and connection status events. Metrics related to message delivery phases and errors are collected when the optional NetMetrics instance is provided.
Public Functions
sender
pub async fn sender<Connection: NetConnection + 'static>(
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
metrics: Option<NetMetrics>,
connection: Arc<ConnectionWrapper<Connection>>,
stop_tx: tokio::sync::watch::Sender<bool>,
mut stop_rx: tokio::sync::watch::Receiver<bool>,
mut outgoing_messages_rx: tokio::sync::broadcast::Receiver<OutgoingMessage>,
) -> anyhow::Result<()>
Description
The main asynchronous loop that manages sending outgoing messages to a peer over a network connection. It listens concurrently for:
Shutdown signals (
shutdown_rx)Stop signals (
stop_rx)Connection closure events
New outgoing messages from a broadcast channel (
outgoing_messages_rx)
When a new message is received, it delegates to send_message for handling the actual transfer.
Parameters
shutdown_rx: A Tokio watch receiver signaling global shutdown requests.metrics: Optional metrics collector for network operations (NetMetrics).connection: AnArcwrappedConnectionWrappercontaining the network connection and related info.stop_tx: A watch sender to signal stopping of sender tasks in case of errors.stop_rx: A watch receiver to listen for local stop signals.outgoing_messages_rx: A broadcast receiver from which outgoing messages are received.
Return Value
Returns anyhow::Result<()> indicating the success or failure of the sender loop. The loop ends gracefully on shutdown or connection close.
Usage Example
let sender_handle = tokio::spawn(sender(
shutdown_rx,
Some(metrics),
connection,
stop_tx,
stop_rx,
outgoing_messages_rx,
));
Internal Functions
send_message
async fn send_message<Connection: NetConnection + 'static>(
metrics: Option<NetMetrics>,
connection: Arc<ConnectionWrapper<Connection>>,
mut outgoing: OutgoingMessage,
stop_tx: tokio::sync::watch::Sender<bool>,
)
Description
Handles the processing and transmission of a single outgoing message. It updates metrics for delivery phases, modifies message ID metadata, and performs the network transfer. On failure, it reports errors and triggers stop signals.
Parameters
metrics: Optional metrics collector to track delivery phases and errors.connection: Shared reference to the connection wrapper used for sending.outgoing: TheOutgoingMessageinstance to be transferred.stop_tx: Watch sender to communicate stop signals in case of transfer failure.
Return Value
This function returns () as it internally handles errors and triggers stop signals.
Important Details
Updates
last_sender_is_proxyflag in the message based on local connection info.Appends the remote host ID prefix to the message ID to maintain uniqueness.
Measures elapsed time for delivery phases:
Outgoing Buffer (time spent waiting before transfer)
Outgoing Transfer (time spent during network transfer)
On transfer error, logs detailed error information and signals a stop.
Messages are only sent if allowed by the connection's
allow_sendingmethod.
Implementation Details and Algorithms
The sender loop uses
tokio::select!to concurrently listen for shutdown, stop, connection close, and incoming messages.Broadcast channel receiver is used for outgoing messages, allowing multiple producers.
Lagged messages (when receiver falls behind) are detected and logged, with metrics updated accordingly.
The
send_messagefunction utilizes thetransferasync function to perform the low-level message sending over the network connection.Metrics tracking uses delivery phases defined in the
DeliveryPhaseenum and send modeSendMode::Broadcast.Messages are augmented with metadata to identify the last sender and the remote host prefix for traceability.
Error handling is comprehensive, logging errors with detailed context and triggering stop signals to prevent further issues.
Interactions with Other Modules
Uses
NetConnectiontrait from thetransport_layercrate for abstracted network connection operations.Interacts with
ConnectionWrapperandOutgoingMessagefromcrate::pub_sub::connection.Calls
transferfunction fromcrate::transferto perform actual message sending.Uses
NetMetricsfromcrate::metricsfor collecting network delivery and error statistics.Employs
DeliveryPhaseandSendModeenums from the crate for consistent metrics categorization.Uses
tokio::syncprimitives (watchandbroadcast) for async signalling and message passing.Logging and error reporting use the
tracingcrate and adetailederror formatter from the crate.
Diagram: sender.rs Structure and Workflow
flowchart TD
A[Sender Loop] -->|Listens for shutdown_rx| B[Shutdown Signal]
A -->|Listens for stop_rx| C[Stop Signal]
A -->|Listens for connection close| D[Connection Close]
A -->|Receives OutgoingMessage| E[Outgoing Message Received]
E --> F[send_message Function]
F --> G[Check Allow Sending]
G -->|Allowed| H[Update Message Metadata]
H --> I[Start Delivery Phase Metrics]
I --> J[transfer Function - Send Message]
J --> K{Transfer Result}
K -->|Success| L[Finish Delivery Phase, Report Sent Bytes]
K -->|Error| M[Log Error, Report Error, Trigger Stop Signal]
This flowchart depicts the main sender async loop that concurrently waits for signals and messages, and the internal steps within send_message to process and send a message while tracking metrics and handling errors.
Key Types Referenced
NetConnection: Trait defining required network connection operations.ConnectionWrapper: Wrapper aroundNetConnectionincluding connection metadata (local_identity, remote_info, etc.).OutgoingMessage: Struct representing a message to be sent, including metadata such asid,label, and timing info.NetMetrics: Metrics collector for network delivery phases and error reporting.DeliveryPhase: Enum defining message delivery phases (e.g., OutgoingBuffer, OutgoingTransfer).SendMode: Enum defining sending modes (e.g., Broadcast).
For detailed information on these types and related patterns, see the relevant topics on Networking, Metrics Collection, and Pub-Sub Architecture.