network.rs

Overview

The network.rs file implements core networking functionality for a distributed system by managing peer-to-peer communication using both broadcast and direct messaging patterns. It provides the BasicNetwork struct as the primary abstraction to configure, spawn, and orchestrate network-related tasks such as gossip protocol integration, pub/sub message handling, and direct message sending. The module integrates with components like transport layers, metrics, and gossip subscriptions to enable scalable and resilient network communication.

This file is responsible for initializing and managing the lifecycle of network tasks, handling incoming/outgoing messages, managing peer subscriptions, and coordinating with other subsystems like chitchat (for gossip) and transport layers (NetTransport). It serves as a bridge between low-level transport mechanics and higher-level message passing abstractions (NetDirectSender, NetBroadcastSender).


Key Structs and Functions

PeerData

#[derive(Clone, Debug, PartialEq)]
pub struct PeerData {
    pub peer_addr: SocketAddr,
    pub bk_api_socket: Option<SocketAddr>,
}

BasicNetwork<Transport>

pub struct BasicNetwork<Transport: NetTransport> {
    shutdown_tx: tokio::sync::watch::Sender<bool>,
    config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
    transport: Transport,
}

BasicNetwork::new

pub fn new(
    shutdown_tx: tokio::sync::watch::Sender<bool>,
    config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
    transport: Transport,
) -> Self

BasicNetwork::start

pub async fn start<PeerId, Message, ChannelMetrics>(
    &self,
    watch_gossip_config_rx: tokio::sync::watch::Receiver<WatchGossipConfig>,
    metrics: Option<NetMetrics>,
    channel_metrics: Option<ChannelMetrics>,
    self_peer_id: PeerId,
    self_addr: SocketAddr,
    is_proxy: bool,
    chitchat: ChitchatRef,
) -> anyhow::Result<(
    NetDirectSender<PeerId, Message>,
    NetBroadcastSender<Message>,
    InstrumentedReceiver<IncomingMessage>,
    tokio::sync::watch::Receiver<HashMap<PeerId, Vec<PeerData>>>,
)>
where
    Message: Debug + for<'de> serde::Deserialize<'de> + Serialize + Send + Sync + Clone + 'static,
    PeerId: Clone + Display + Send + Sync + Hash + Eq + FromStr<Err: Display> + 'static,
    ChannelMetrics: InstrumentedChannelMetrics + Send + Sync + 'static,
let basic_network = BasicNetwork::new(shutdown_tx, config_rx, transport);
let (direct_sender, broadcast_sender, incoming_rx, peers_rx) = basic_network.start(
    watch_gossip_config_rx,
    Some(metrics),
    Some(channel_metrics),
    self_peer_id,
    self_addr,
    is_proxy,
    chitchat_ref,
).await?;

combine_subscribe

async fn combine_subscribe(
    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
    mut network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
    mut gossip_subscribe_rx: tokio::sync::watch::Receiver<Vec<Vec<SocketAddr>>>,
    subscribe_tx: tokio::sync::watch::Sender<Vec<Vec<SocketAddr>>>,
)

Implementation Details and Algorithms


Interaction with Other Components


Constants


Mermaid Diagram: Structure of BasicNetwork

classDiagram
class PeerData {
+peer_addr: SocketAddr
+bk_api_socket: Option<SocketAddr>
}
class BasicNetwork~Transport~ {
-shutdown_tx: watch::Sender<bool>
-config_rx: watch::Receiver<NetworkConfig>
-transport: Transport
+new(shutdown_tx, config_rx, transport)
+start(...)
}
BasicNetwork ..> NetTransport : uses
BasicNetwork ..> NetDirectSender : returns
BasicNetwork ..> NetBroadcastSender : returns
BasicNetwork ..> InstrumentedReceiver : returns
BasicNetwork ..> PeerData : peer info
BasicNetwork ..> chitchat::ChitchatRef : interacts

This diagram illustrates the structural relationship between BasicNetwork, PeerData, and related components it uses or returns. It highlights the core fields and methods of BasicNetwork and its dependency on the Transport trait.


Summary of Workflow in BasicNetwork::start

flowchart TD
Start["start() Called"] --> InitChannels["Initialize Channels"]
InitChannels --> SpawnCombineSubscribe["Spawn combine_subscribe Task"]
SpawnCombineSubscribe --> SpawnGossip["Spawn Gossip Task"]
SpawnGossip --> SpawnPubSub["Spawn Pub/Sub Task"]
SpawnPubSub --> SpawnDirectSender["Spawn Direct Sender Task"]
SpawnDirectSender --> ReturnSenders["Return Senders and Receivers"]
classDef task fill:#f9f,stroke:#333,stroke-width:2px;
SpawnCombineSubscribe:::task
SpawnGossip:::task
SpawnPubSub:::task
SpawnDirectSender:::task

This flowchart depicts the sequence of steps executed during the start method, showing initialization, spawning of asynchronous tasks for subscription management, gossip, pub/sub messaging, and direct messaging, and finally returning the initialized message senders and receivers.


Notes on Error Handling and Logging


Relevant Topics