channel.rs
Overview
This file defines two primary network message sender abstractions for a peer-to-peer communication system: NetDirectSender and NetBroadcastSender. These abstractions handle the serialization, delivery, and metrics reporting of messages sent either directly to specific peers or broadcast to multiple recipients. The file also includes error handling for message sending failures and utilities for encoding outgoing messages.
The functionality provided is critical for implementing reliable and efficient message delivery within a distributed network, ensuring that messages are correctly serialized, metrics are tracked, and delivery errors are managed gracefully.
Types and Structures
NetDirectSender<PeerId, Message>
A generic struct designed for sending messages directly to specified peers identified by PeerId. It manages an unbounded Tokio channel sender that queues outgoing messages for delivery.
Type Parameters:
Fields:
inner: A Tokio unbounded channel sender that sends tuples of(DirectReceiver<PeerId>, NetMessage, Instant).metrics: OptionalNetMetricsfor reporting message statistics.self_peer_id: The sender's own peer identifier.self_addr: The sender's own socket address._message_type: PhantomData to associate the struct with theMessagetype without storing it.
Usage:
Instantiate with
new()providing the underlying channel sender, metrics, peer ID, and address.Use send() to send a message directly to another peer or address.
Important Behavior:
Skips sending messages to self (either by peer ID or socket address).
Encodes messages using NetMessage::encode.
Reports metrics such as message size and delivery phases.
Logs delivery status with tracing.
Handles errors such as encoding failure or disconnected channels via
NetSendError.
NetBroadcastSender
A generic struct for broadcasting messages to multiple recipients via a Tokio broadcast channel.
Type Parameters:
Message: The message payload type, with requirements similar toNetDirectSender.
Fields:
inner: Tokio broadcast channel sender ofOutgoingMessage.metrics: OptionalNetMetricsfor reporting._message_type: PhantomData forMessage.
Usage:
Instantiate with
new()providing the broadcast sender and optional metrics.Use send() to broadcast a message to all subscribed receivers.
Behavior:
Encodes messages similarly to
NetDirectSender.Reports message size and delivery phases to metrics.
Logs broadcast start and forwards messages using the broadcast channel.
Handles the case when no receivers exist and logs a warning.
NetSendError
An enum representing possible errors encountered when sending messages.
Variants:
Disconnected(Message): Sending failed because the channel is closed.EncodeFailed(Message, anyhow::Error): Serialization of the message failed.
Methods:
message(&self) -> String: Returns a human-readable error description.
Trait Implementations:
Implements Debug,
Display, and std::error::Error with formatting based on the error message.
Functions
encode_outgoing<Message>(message: Message) -> Result<(Message, NetMessage, usize), NetSendError<Message>>
Purpose: Encodes a message into a
NetMessagefor network transmission.Parameters:
Returns:
Ok((original_message, encoded_net_message, uncompressed_size))on success.Err(NetSendError::EncodeFailed) if serialization fails.
Behavior:
Uses NetMessage::encode for serialization.
Logs an error via tracing::error! if encoding fails.
Methods
NetDirectSender::new(...) -> Self
Parameters:
inner: Tokio unbounded sender channel for outgoing messages.metrics: OptionalNetMetrics.self_peer_id: The peer ID of the current node.self_addr: The socket address of the current node.
Returns: New instance of
NetDirectSender.
NetDirectSender::send(&self, args: (DirectReceiver<PeerId>, Message)) -> Result<(), NetSendError<Message>>
Parameters:
args: Tuple of
(DirectReceiver<PeerId>, Message)whereDirectReceiverindicates the target peer or address.
Returns:
Ok(())if the message was successfully queued.Err(NetSendError) on failures such as encoding error or disconnected channel.
Important Implementation Details:
Skips sending if the receiver matches the sender itself.
Encodes the message using
encode_outgoing.Reports message size and delivery phases using
NetMetrics.Sends the message asynchronously through the inner channel.
Handles errors by logging and returning appropriate
NetSendError.
NetBroadcastSender::new(...) -> Self
Parameters:
inner: Tokio broadcast sender for outgoing messages.metrics: OptionalNetMetrics.
Returns: New instance of
NetBroadcastSender.
NetBroadcastSender::send(&self, message: Message) -> Result<usize, NetSendError<Message>>
Parameters:
message: The message to broadcast.
Returns:
Ok(usize): Number of receivers that received the broadcast.Err(NetSendError): If encoding fails.
Implementation Details:
Encodes the message via
encode_outgoing.Reports message size metrics.
Sends the encoded message wrapped in
OutgoingMessagethrough the broadcast channel.If no receivers exist, logs a warning and metrics event.
Returns the count of receivers that got the message.
Interactions with Other Components
DirectReceiver: Used to specify the target peer or address for direct message sending.NetMessage: Core message wrapper that handles encoding and metadata like IDs and labels.NetMetrics: Optional metrics collector used to report message sizes, delivery phases, and warnings.OutgoingMessage: Wrapper for messages when broadcasting, includes metadata about message delivery.MessageDelivery and SendMode: Enumerations used in metrics reporting to distinguish delivery types and modes.
tokio::sync::mpscandtokio::sync::broadcastChannels: Underlying async channels used for message queuing and broadcasting.
This file is thus a key part of the message dispatching subsystem, interfacing between message producers and the network transport layer, while integrating detailed metrics and error handling.
Mermaid Class Diagram
classDiagram
class NetDirectSender {
-inner: UnboundedSender<(DirectReceiver, NetMessage, Instant)>
-metrics: Option<NetMetrics>
-self_peer_id: PeerId
-self_addr: SocketAddr
-_message_type: PhantomData<Message>
+new(inner, metrics, self_peer_id, self_addr)
+send((DirectReceiver, Message)) Result<(), NetSendError>
}
class NetBroadcastSender {
-inner: broadcast::Sender<OutgoingMessage>
-metrics: Option<NetMetrics>
-_message_type: PhantomData<Message>
+new(inner, metrics)
+send(Message) Result<usize, NetSendError>
}
class NetSendError {
<<enumeration>>
+Disconnected(Message)
+EncodeFailed(Message, Error)
+message() String
}
NetDirectSender ..> DirectReceiver : uses
NetDirectSender ..> NetMessage : uses
NetDirectSender ..> NetMetrics : uses
NetBroadcastSender ..> OutgoingMessage : uses
NetBroadcastSender ..> NetMetrics : uses
NetSendError o-- Message : generic over