network.rs
Overview
The network.rs file implements core networking functionality for a distributed system by managing peer-to-peer communication using both broadcast and direct messaging patterns. It provides the BasicNetwork struct as the primary abstraction to configure, spawn, and orchestrate network-related tasks such as gossip protocol integration, pub/sub message handling, and direct message sending. The module integrates with components like transport layers, metrics, and gossip subscriptions to enable scalable and resilient network communication.
This file is responsible for initializing and managing the lifecycle of network tasks, handling incoming/outgoing messages, managing peer subscriptions, and coordinating with other subsystems like chitchat (for gossip) and transport layers (NetTransport). It serves as a bridge between low-level transport mechanics and higher-level message passing abstractions (NetDirectSender, NetBroadcastSender).
Key Structs and Functions
PeerData
#[derive(Clone, Debug, PartialEq)]
pub struct PeerData {
pub peer_addr: SocketAddr,
pub bk_api_socket: Option<SocketAddr>,
}
Purpose: Represents metadata about a network peer, including its primary socket address and an optional backup API socket.
Fields:
peer_addr: The main socket address of the peer.bk_api_socket: An optional backup API socket address.
Usage: Used as the value type in peer maps to track connected peers and their relevant network addresses.
BasicNetwork<Transport>
pub struct BasicNetwork<Transport: NetTransport> {
shutdown_tx: tokio::sync::watch::Sender<bool>,
config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
transport: Transport,
}
Purpose: Main network abstraction providing setup and management of network messaging tasks.
Type Parameter:
Transport: Must implementNetTransport, representing the underlying network transport mechanism (e.g., TCP, UDP).
Fields:
shutdown_tx: Watch channel sender to signal shutdown to spawned tasks.config_rx: Watch channel receiver to receive dynamic network configuration updates.transport: The transport layer instance used for sending and receiving messages.
BasicNetwork::new
pub fn new(
shutdown_tx: tokio::sync::watch::Sender<bool>,
config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
transport: Transport,
) -> Self
Purpose: Constructs a new
BasicNetworkinstance.Parameters:
shutdown_tx: Channel to notify tasks of shutdown events.config_rx: Receiver providing network configuration updates.transport: Transport layer instance.
Returns: A new
BasicNetworkinstance.
BasicNetwork::start
pub async fn start<PeerId, Message, ChannelMetrics>(
&self,
watch_gossip_config_rx: tokio::sync::watch::Receiver<WatchGossipConfig>,
metrics: Option<NetMetrics>,
channel_metrics: Option<ChannelMetrics>,
self_peer_id: PeerId,
self_addr: SocketAddr,
is_proxy: bool,
chitchat: ChitchatRef,
) -> anyhow::Result<(
NetDirectSender<PeerId, Message>,
NetBroadcastSender<Message>,
InstrumentedReceiver<IncomingMessage>,
tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
)>
where
Message: Debug + for<'de> serde::Deserialize<'de> + Serialize + Send + Sync + Clone + 'static,
PeerId: Clone + Display + Send + Sync + Hash + Eq + FromStr<Err: Display> + 'static,
ChannelMetrics: InstrumentedChannelMetrics + Send + Sync + 'static,
Purpose: Starts the network by spawning and initializing all necessary networking tasks.
Parameters:
watch_gossip_config_rx: Receiver for gossip configuration updates.metrics: Optional network metrics collector.channel_metrics: Optional metrics for instrumented channels.self_peer_id: Identifier for the current node/peer.self_addr: Socket address of the current node.is_proxy: Flag to indicate if this node acts as a proxy.chitchat: Reference to the gossip protocol instance.
Returns: A tuple containing:
NetDirectSender<PeerId, Message>for sending direct messages.NetBroadcastSender<Message>for broadcasting messages.InstrumentedReceiver<IncomingMessage>for receiving incoming messages.tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>to observe current peers.
Functionality:
Initializes instrumented channels for incoming messages.
Creates broadcast and unbounded channels for outgoing messages.
Sets up subscription management via a watch channel.
Spawns tasks for managing gossip subscriptions, pub/sub connections, and direct message sending.
Integrates with metrics and handles configuration updates dynamically.
Usage Example:
let basic_network = BasicNetwork::new(shutdown_tx, config_rx, transport);
let (direct_sender, broadcast_sender, incoming_rx, peers_rx) = basic_network.start(
watch_gossip_config_rx,
Some(metrics),
Some(channel_metrics),
self_peer_id,
self_addr,
is_proxy,
chitchat_ref,
).await?;
combine_subscribe
async fn combine_subscribe(
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
mut network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
mut gossip_subscribe_rx: tokio::sync::watch::Receiver<Vec<Vec<SocketAddr>>>,
subscribe_tx: tokio::sync::watch::Sender<Vec<Vec<SocketAddr>>>,
)
Purpose: Combines multiple subscription sources (
network_config.subscribe,network_config.proxies, and gossip subscriptions) into a unified subscription list that is published viasubscribe_tx.Parameters:
shutdown_rx: Receiver that signals shutdown.network_config_rx: Receiver for network configuration changes.gossip_subscribe_rx: Receiver for gossip subscription updates.subscribe_tx: Sender to publish combined subscription lists.
Behavior:
Monitors changes to network config and gossip subscriptions.
Prefers
network_config.subscribeif non-empty.Otherwise, falls back to
network_config.proxiesif non-empty.Otherwise, uses the gossip subscriptions.
Runs a continuous async loop that updates the combined subscriptions and exits on shutdown.
Implementation Detail: Uses
tokio::select!to asynchronously await changes on multiple watch receivers.Usage: Internally spawned by
BasicNetwork::startto maintain subscription state for pub/sub.
Implementation Details and Algorithms
The
BasicNetwork::startmethod orchestrates multiple asynchronous tasks, leveraging Tokio's concurrency features.Messaging follows a pub/sub pattern where broadcast messages are sent via a Tokio broadcast channel, while direct messages use unbounded mpsc channels.
Network configuration and gossip subscriptions are dynamically watched using Tokio watch channels, enabling real-time updates to peer subscriptions without restarting the network.
The
combine_subscribefunction implements a priority-based subscription merging algorithm with the following preference order:Explicit
subscribeaddresses from network config.Proxy addresses from network config.
Gossip protocol's subscription addresses.
The tasks spawned in
startare wrapped usingspawn_critical_taskwhich ensures telemetry reporting and critical task management.The integration with
chitchatallows the network to participate in a gossip protocol, facilitating peer discovery and state synchronization.Instrumented channels (
instrumented_channel) are used to collect metrics on message queues for operational visibility.The file uses generic type parameters (
PeerId,Message,ChannelMetrics) with trait bounds to allow flexibility in peer identity, message formats, and telemetry implementations.
Interaction with Other Components
Transport Layer (
NetTransport):BasicNetworkdepends on a concrete network transport implementation to send and receive low-level network packets.Gossip Protocol (
chitchat): Used to manage peer discovery and gossip subscriptions; integrated viawatch_gossipandchitchat::ChitchatRef.Messaging Channels:
NetDirectSenderandNetBroadcastSenderprovide high-level abstractions for sending direct and broadcast messages respectively.IncomingSenderandIncomingMessagehandle incoming messages from the network.
Configuration (
NetworkConfig): Network configuration changes are observed in real-time via watch channels, affecting subscription lists and task behaviors.Metrics (
NetMetrics,InstrumentedChannelMetrics): Metrics are collected and reported for network operations, message queues, and task health.Pub/Sub System (
pub_submodule): Responsible for managing pub/sub connections and message routing.Direct Sender (
direct_sendermodule): Manages the sending of direct, peer-to-peer messages.Resolver Module (
resolver): Provides functionality to watch gossip subscriptions and manage subscription strategies.
Constants
BROADCAST_RETENTION_CAPACITY: Defines the retention capacity for broadcast messages (100 messages).DEFAULT_MAX_CONNECTIONS: Default maximum allowed network connections (1000).
Mermaid Diagram: Structure of BasicNetwork
classDiagram
class PeerData {
+peer_addr: SocketAddr
+bk_api_socket: Option<SocketAddr>
}
class BasicNetwork~Transport~ {
-shutdown_tx: watch::Sender<bool>
-config_rx: watch::Receiver<NetworkConfig>
-transport: Transport
+new(shutdown_tx, config_rx, transport)
+start(...)
}
BasicNetwork ..> NetTransport : uses
BasicNetwork ..> NetDirectSender : returns
BasicNetwork ..> NetBroadcastSender : returns
BasicNetwork ..> InstrumentedReceiver : returns
BasicNetwork ..> PeerData : peer info
BasicNetwork ..> chitchat::ChitchatRef : interacts
This diagram illustrates the structural relationship between BasicNetwork, PeerData, and related components it uses or returns. It highlights the core fields and methods of BasicNetwork and its dependency on the Transport trait.
Summary of Workflow in BasicNetwork::start
flowchart TD
Start["start() Called"] --> InitChannels["Initialize Channels"]
InitChannels --> SpawnCombineSubscribe["Spawn combine_subscribe Task"]
SpawnCombineSubscribe --> SpawnGossip["Spawn Gossip Task"]
SpawnGossip --> SpawnPubSub["Spawn Pub/Sub Task"]
SpawnPubSub --> SpawnDirectSender["Spawn Direct Sender Task"]
SpawnDirectSender --> ReturnSenders["Return Senders and Receivers"]
classDef task fill:#f9f,stroke:#333,stroke-width:2px;
SpawnCombineSubscribe:::task
SpawnGossip:::task
SpawnPubSub:::task
SpawnDirectSender:::task
This flowchart depicts the sequence of steps executed during the start method, showing initialization, spawning of asynchronous tasks for subscription management, gossip, pub/sub messaging, and direct messaging, and finally returning the initialized message senders and receivers.
Notes on Error Handling and Logging
Errors in spawned tasks (e.g., pub/sub task) are logged with warnings using the tracing crate.
Metrics are updated when critical tasks fail, enabling alerting or monitoring systems to detect issues.
The use of
anyhow::Resultfor thestartfunction provides flexible error propagation.
Relevant Topics
Networking and Peer-to-Peer Communication Networking
Gossip Protocol Integration Gossip Protocols
Asynchronous Task Management Asynchronous Programming
Metrics and Telemetry Instrumentation Telemetry
Message Passing and Pub/Sub Systems Pub/Sub Messaging
Rust Generics and Trait Bounds Generics in Rust
Tokio Synchronization Primitives Tokio Synchronization