receiver.rs
Overview
This file defines the asynchronous message receiving logic of the network communication layer. It handles incoming messages on a network connection, deserializes them, collects metrics related to message transfer, and forwards them to a processing queue. The primary purpose is to continuously listen for incoming network packets, transform raw data into structured messages, and deliver them downstream while monitoring for errors and shutdown signals.
The file focuses on safe and efficient message reception, integrating with metrics and connection management components, and supports orderly shutdown through Tokio's watch channels.
Detailed Description of Components
Functions
pub async fn receiver<Connection>(...) -> anyhow::Result<()>
Purpose:
Runs the main receiver loop for a given network connection. It listens for shutdown signals or stop commands, and continuously receives messages via thereceive_messagefunction.Type Parameters:
Connection: A type implementing theNetConnectiontrait and'staticlifetime.
Parameters:
shutdown_rx: tokio::sync::watch::Receiver<bool>
A watch channel receiver signaling application-wide shutdown requests.metrics: Option<NetMetrics>
Optional network metrics collector to record statistics and errors.connection: Arc<ConnectionWrapper<Connection>>
Shared reference to the connection wrapper that manages the underlying network connection.receiver_stop_tx: tokio::sync::watch::Sender<bool>
Watch channel sender to notify when the receiver should stop.receiver_stop_rx: tokio::sync::watch::Receiver<bool>
Watch channel receiver to listen for stop signals specific to this receiver.incoming_tx: IncomingSender
Asynchronous channel sender used to forward received and deserialized messages downstream.
Returns:
anyhow::Result<()>
Result indicating success or failure of the receiver task.
Behavior:
Logs start and finish of the receiver loop with connection identity and peer info.
Uses
tokio::select!to concurrently listen for shutdown signals, stop signals, and incoming messages.Calls
receive_messagerepeatedly to handle incoming network data.Breaks loop cleanly upon shutdown or stop signals.
Usage Example:
let receiver_handle = tokio::spawn(receiver( shutdown_rx, Some(metrics), connection.clone(), receiver_stop_tx, receiver_stop_rx, incoming_tx, ));
async fn receive_message<Connection>(...)
Purpose:
Receives a single network message from the connection, deserializes it, updates metrics, and sends it into an internal incoming message channel.Type Parameters:
Connection: A type implementing theNetConnectiontrait and'staticlifetime.
Parameters:
metrics: Option<NetMetrics>
Optional metrics collector for tracking errors and message transfer statistics.connection: Arc<ConnectionWrapper<Connection>>
Shared reference to the connection wrapper managing the network connection.incoming_tx: IncomingSender
Channel sender to forward deserialized messages.receiver_stop_tx: tokio::sync::watch::Sender<bool>
Channel sender to signal stopping the receiver in case of fatal errors.
Behavior:
Awaits receiving raw data and transfer duration from the network connection.
Attempts to deserialize the raw data into a
NetMessageusingbincode.On deserialization failure, logs the error, reports metrics, and signals receiver stop.
Logs detailed debug info about the message, including broadcast status, message ID, peer info, and size.
Updates metrics on received bytes and delivery phases.
Wraps the message and connection info into an
IncomingMessagestruct.Attempts to send the incoming message into the
incoming_txchannel.If the channel is closed (consumer detached), finishes the delivery phase in metrics and signals receiver stop.
On any network receive error, logs the detailed error and reports metrics accordingly.
Stops the receiver loop on fatal connection errors.
Important Implementation Details
Asynchronous Message Processing:
Uses Tokio's async runtime to concurrently handle shutdown signals and incoming messages without blocking. Thetokio::select!macro allows reacting to multiple asynchronous events simultaneously.Error Handling and Metrics Integration:
Errors during deserialization or network receive operations trigger logging with detailed error messages and metrics reporting. This ensures operational visibility and facilitates troubleshooting of network issues.Graceful Shutdown Coordination:
Utilizes Tokio watch channels to receive shutdown commands and to signal stopping conditions internally. This design allows coordinated termination of receiver tasks.Message Deserialization:
Employsbincodefor efficient binary deserialization ofNetMessagestructs from raw bytes, ensuring compact network data representation.Delivery Phase Tracking:
Metrics track various delivery phases (DeliveryPhase::IncomingBuffer) to measure performance and latency from message reception to processing.Connection Information Propagation:
EachIncomingMessagecontains a clone of the connection's metadata for downstream components to contextualize messages with peer and network info.
Interaction with Other System Components
NetConnectionTrait:
Abstracts the underlying network connection. Provides therecv()method used to asynchronously receive raw data and transfer duration.ConnectionWrapper<Connection>:
Encapsulates the network connection along with metadata such as peer info, local identity, and remote address.NetMessageStruct:
Represents the structured network message format expected on the wire. This file deserializes raw data into this format.NetMetrics:
Responsible for collecting and reporting metrics related to network operations, errors, and message delivery phases.IncomingSenderandIncomingMessage:
The message passing channel and container used to forward received messages for further processing in thepub_subsubsystem.Logging/Tracing:
Usestracingto log detailed debug and error information with contextual tags for observability.
Diagram: Structure of receiver.rs
flowchart TD
A[receiver function] -->|calls| B[receive_message function]
B --> C{connection.connection.recv()}
C -->|Ok| D[Deserialize NetMessage]
D --> E[Send IncomingMessage via incoming_tx]
E -->|channel closed| F[Signal receiver_stop_tx]
C -->|Err| G[Log error and Signal receiver_stop_tx]
A -->|listens to| H[shutdown_rx & receiver_stop_rx]
H -->|signal received| I[Break loop & finish]
This flowchart illustrates the main workflow of the receiver loop, the message reception, deserialization, forwarding, and the coordination of shutdown signals.