sender.rs

Overview

This file implements the core functionality for asynchronously sending messages over a network connection within a pub-sub communication system. It provides the sender async function that manages the lifecycle of outgoing messages from a broadcast channel, handling shutdown signals and connection closures gracefully. The file also defines an internal helper function send_message which performs the actual message transfer process, including metrics tracking and error handling.

The sender continuously listens for new outgoing messages and sends them through a wrapped network connection, ensuring proper interaction with shutdown signals and connection status events. Metrics related to message delivery phases and errors are collected when the optional NetMetrics instance is provided.


Public Functions

sender

pub async fn sender<Connection: NetConnection + 'static>(
    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
    metrics: Option<NetMetrics>,
    connection: Arc<ConnectionWrapper<Connection>>,
    stop_tx: tokio::sync::watch::Sender<bool>,
    mut stop_rx: tokio::sync::watch::Receiver<bool>,
    mut outgoing_messages_rx: tokio::sync::broadcast::Receiver<OutgoingMessage>,
) -> anyhow::Result<()>

Description

The main asynchronous loop that manages sending outgoing messages to a peer over a network connection. It listens concurrently for:

When a new message is received, it delegates to send_message for handling the actual transfer.

Parameters

Return Value

Returns anyhow::Result<()> indicating the success or failure of the sender loop. The loop ends gracefully on shutdown or connection close.

Usage Example

let sender_handle = tokio::spawn(sender(
    shutdown_rx,
    Some(metrics),
    connection,
    stop_tx,
    stop_rx,
    outgoing_messages_rx,
));

Internal Functions

send_message

async fn send_message<Connection: NetConnection + 'static>(
    metrics: Option<NetMetrics>,
    connection: Arc<ConnectionWrapper<Connection>>,
    mut outgoing: OutgoingMessage,
    stop_tx: tokio::sync::watch::Sender<bool>,
)

Description

Handles the processing and transmission of a single outgoing message. It updates metrics for delivery phases, modifies message ID metadata, and performs the network transfer. On failure, it reports errors and triggers stop signals.

Parameters

Return Value

This function returns () as it internally handles errors and triggers stop signals.

Important Details


Implementation Details and Algorithms


Interactions with Other Modules


Diagram: sender.rs Structure and Workflow

flowchart TD
A[Sender Loop] -->|Listens for shutdown_rx| B[Shutdown Signal]
A -->|Listens for stop_rx| C[Stop Signal]
A -->|Listens for connection close| D[Connection Close]
A -->|Receives OutgoingMessage| E[Outgoing Message Received]
E --> F[send_message Function]
F --> G[Check Allow Sending]
G -->|Allowed| H[Update Message Metadata]
H --> I[Start Delivery Phase Metrics]
I --> J[transfer Function - Send Message]
J --> K{Transfer Result}
K -->|Success| L[Finish Delivery Phase, Report Sent Bytes]
K -->|Error| M[Log Error, Report Error, Trigger Stop Signal]

This flowchart depicts the main sender async loop that concurrently waits for signals and messages, and the internal steps within send_message to process and send a message while tracking metrics and handling errors.


Key Types Referenced

For detailed information on these types and related patterns, see the relevant topics on Networking, Metrics Collection, and Pub-Sub Architecture.