mod.rs
Overview
This file provides core utilities and entry points for managing direct message sending over a network within a peer-to-peer or distributed system context. It defines the DirectReceiver enum for identifying message recipients either by peer ID or socket address, and contains the asynchronous function run_direct_sender to drive the process of sending network messages directly to peers.
The file acts as a coordinator that integrates the lower-level sender implementations (from the sender and peer_sender modules) and handles the interaction between the network transport, peer data, configuration updates, and messaging channels.
Detailed Explanation of Components
Enum: DirectReceiver<PeerId>
A generic enum that represents the recipient of a direct network message.
Variants
Peer(PeerId): Identifies the receiver by its peer ID.Addr(SocketAddr): Identifies the receiver by its socket address.
Type Constraints on PeerId
Must implement:
Display,Hash,Eq,Clone,Send,Sync,'static.
Implementations
From for DirectReceiver
Allows automatic conversion from a
PeerIdto DirectReceiver::Peer.
Display for DirectReceiver
Formats the
DirectReceiveras a string by delegating to the innerPeerId's orSocketAddr's string representation.
Usage Example
let peer_id: PeerId = ...;
let receiver: DirectReceiver<PeerId> = peer_id.into(); // Automatically wraps in Peer variant
println!("Sending message to {}", receiver);
Async Function: run_direct_sender
pub async fn run_direct_sender<Transport, PeerId>(
shutdown_rx: tokio::sync::watch::Receiver<bool>,
network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
transport: Transport,
metrics: Option<NetMetrics>,
messages_rx: tokio::sync::mpsc::UnboundedReceiver<(
DirectReceiver<PeerId>,
NetMessage,
Instant,
)>,
outgoing_reply_tx: tokio::sync::broadcast::Sender<OutgoingMessage>,
incoming_reply_tx: IncomingSender,
peers_rx: tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
)
where
Transport: NetTransport + 'static,
PeerId: Display + Hash + Eq + Clone + Send + Sync + 'static,
Purpose
This function acts as the main driver for running the direct message sender subsystem. It initializes a DirectSender instance with the provided components and awaits its run loop.
Parameters
shutdown_rx: A watch receiver signaling shutdown events.network_config_rx: A watch receiver for updates to the network configuration.transport: An implementation of theNetTransporttrait responsible for actual network communication.metrics: Optional network-related metrics for monitoring.messages_rx: An unbounded mpsc receiver channel that receives tuples consisting of:DirectReceiver<PeerId>: The intended recipient.NetMessage: The network message to send.Instant: The timestamp when the message was created or queued.
outgoing_reply_tx: A broadcast sender channel to announce outgoing messages to other system components.incoming_reply_tx: An incoming sender channel to handle replies or incoming messages.peers_rx: A watch receiver providing the current mapping of peer IDs to their connection data.
Behavior
Creates a
DirectSenderinstance with the passed dependencies.Invokes the
.run()async method ofDirectSender, which controls message dispatch, sender lifecycle, and error handling.
Usage Context
Intended to be launched as a background task within the application runtime, managing outgoing direct network messages and integrating with the broader messaging and peer management system.
Enum: PeerEvent<PeerId>
Internal event enumeration representing notable peer-related events.
Variants
AddrsResolved(PeerId, Vec<SocketAddr>): Indicates that the socket addresses for a peer have been resolved.SenderStopped(PeerId, SocketAddr): Indicates that the sender connection to a particular peer at a given address has stopped.
Role
Used internally by sender implementations to track and respond to changes in peer connectivity and lifecycle.
Function: peer_info
fn peer_info<PeerId>(id: &PeerId, addr: SocketAddr) -> String
Description
Constructs a formatted string combining a peer's identifier and socket address for logging or display purposes.
Parameters
id: Reference to a peer identifier.addr: Socket address of the peer.
Returns
A
Stringformatted as{id} ({addr}), e.g.,"peer123 (192.168.1.5:8080)".
Usage Example
let info = peer_info(&peer_id, socket_addr);
println!("Connected to peer: {}", info);
Important Implementation Details and Algorithms
The
DirectReceiverabstraction enables flexible addressing of peers either by logical ID or network address, accommodating scenarios where peer ID resolution to an address may be pending or unavailable.The
run_direct_senderfunction leverages Tokio's asynchronous channels and watch receivers to dynamically respond to configuration changes, peer updates, and shutdown signals.The actual message sending logic is encapsulated within the
DirectSendertype (defined in thesendermodule), which is responsible for managing sender lifecycles, connection retries, and metrics collection.The retry timeout constant
RESOLVE_RETRY_TIMEOUT(1 second) hints at a retry mechanism for resolving peer addresses, presumably used within the sender modules.
Interaction with Other System Components
Sender Modules: This file imports
peer_senderandsendermodules, which contain the detailed sender implementations. It acts as the integration point that launches the direct sender task.Network Transport: Depends on the
NetTransporttrait from thetransport_layercrate to abstract over the underlying network communication protocol.Configuration and Metrics: Listens to runtime network configuration updates (
NetworkConfig) and optionally reports metrics (NetMetrics).Messaging System: Interacts with the pub-sub messaging framework via
OutgoingMessageandIncomingSenderchannels to coordinate message dispatch and incoming responses.Peer Management: Receives peer data updates from a watch channel that maps
PeerIdtoPeerData, allowing dynamic adaptation to peer connectivity changes.
Mermaid Diagram: Structure of mod.rs
classDiagram
class DirectReceiver~PeerId~ {
<<enum>>
+Peer(PeerId)
+Addr(SocketAddr)
+from(PeerId)
+to_string()
}
class PeerEvent~PeerId~ {
<<enum>>
+AddrsResolved(PeerId, Vec<SocketAddr>)
+SenderStopped(PeerId, SocketAddr)
}
class run_direct_sender~Transport, PeerId~ {
<<async fn>>
+shutdown_rx: watch::Receiver<bool>
+network_config_rx: watch::Receiver<NetworkConfig>
+transport: Transport
+metrics: Option<NetMetrics>
+messages_rx: mpsc::UnboundedReceiver<(DirectReceiver<PeerId>, NetMessage, Instant)>
+outgoing_reply_tx: broadcast::Sender<OutgoingMessage>
+incoming_reply_tx: IncomingSender
+peers_rx: watch::Receiver<HashMap<PeerId, Vec<PeerData>>>
}
class peer_info~PeerId~ {
+fn(&PeerId, SocketAddr) -> String
}
run_direct_sender ..> DirectReceiver : uses
run_direct_sender ..> PeerEvent : may handle internally
run_direct_sender ..> peer_info : utility
This diagram visualizes the key enums and functions within the file and their relationships. The run_direct_sender function acts as the orchestrator, using DirectReceiver as the recipient abstraction, handling PeerEvent internally, and employing peer_info for formatting.