watch.rs

Overview

This file implements the functionality to monitor and manage gossip peers in a distributed network environment. It provides mechanisms to watch live gossip nodes, maintain subscriptions based on configurable strategies, and track peer data with respect to trusted public keys. The main asynchronous function watch_gossip continuously refreshes the state of gossip peers, updates subscription lists, and sends updates through asynchronous channels. This file handles peer verification, subscription management, and metric reporting related to gossip communication.

Enums

SubscribeStrategy<PeerId>

Defines the strategy used to subscribe to gossip peers:

This enum governs how subscription addresses are selected and filtered.

Structs

WatchGossipConfig

Configuration parameters for watching gossip peers:

Field

Type

Description

max_nodes_with_same_id

usize

Maximum number of nodes allowed with the same peer ID.

trusted_pubkeys

HashSet<transport_layer::VerifyingKey>

Set of trusted public keys used for peer verification.

This struct is clonable and comparable, allowing dynamic updating of the watch configuration.

Functions

watch_gossip

pub async fn watch_gossip<PeerId>(
    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
    mut config_rx: tokio::sync::watch::Receiver<WatchGossipConfig>,
    strategy: SubscribeStrategy<PeerId>,
    chitchat: ChitchatRef,
    subscribe_tx: tokio::sync::watch::Sender<Vec<Vec<SocketAddr>>>,
    peers_tx: tokio::sync::watch::Sender<HashMap<PeerId, Vec<PeerData>>>,
    metrics: Option<NetMetrics>,
)
where
    PeerId: Clone + Display + Send + Sync + Hash + Eq + FromStr<Err: Display> + 'static,

Purpose:
Continuously monitors live gossip nodes and updates subscription and peer data based on the configured strategy and trusted keys.

Parameters:

Behavior:

Usage Example:

let shutdown_rx = shutdown_channel.subscribe();
let config_rx = config_channel.subscribe();
let strategy = SubscribeStrategy::Peer(my_peer_id);
let chitchat = chitchat_ref.clone();
let subscribe_tx = subscribe_channel.clone();
let peers_tx = peers_channel.clone();
let metrics = Some(net_metrics);

tokio::spawn(async move {
    watch_gossip(shutdown_rx, config_rx, strategy, chitchat, subscribe_tx, peers_tx, metrics).await;
});

refresh

fn refresh<PeerId>(
    strategy: &SubscribeStrategy<PeerId>,
    chitchat: &ChitchatRef,
    subscribe: &mut HashMap<Vec<SocketAddr>, HashSet<transport_layer::VerifyingKey>>,
    peers: &mut HashMap<(PeerId, SocketAddr), (PeerData, HashSet<transport_layer::VerifyingKey>)>,
    config: &WatchGossipConfig,
) -> usize
where
    PeerId: Clone + Display + FromStr<Err: Display> + Send + Sync + Hash + Eq + 'static,

Purpose:
Refreshes the internal state of subscriptions and peers based on the current gossip nodes and the subscription strategy.

Parameters:

Returns:
The count of live nodes processed.

Implementation Details:

verify_pubkey_in

fn verify_pubkey_in<K, V, F>(
    map: &mut HashMap<K, V>,
    key: &K,
    pubkey: &transport_layer::VerifyingKey,
    is_trusted: bool,
    get_pubkeys: F,
)
where
    K: Eq + Hash,
    F: Fn(&mut V) -> &mut HashSet<transport_layer::VerifyingKey>,

Purpose:
Manages the presence of a public key inside a nested set within a map. Adds or removes trusted keys and removes entire map entries if no trusted keys remain.

Parameters:

Behavior:

peer_subscribe_addrs

fn peer_subscribe_addrs(peer_addr: SocketAddr, proxies: &[SocketAddr]) -> Vec<SocketAddr>

Purpose:
Determines the list of addresses to subscribe to for a given peer.

Parameters:

Returns:
If the proxies list is empty, returns a vector containing only the peer's advertised address. Otherwise, returns the proxies.

Helper Functions for Debugging and Logging

These functions are used primarily for tracing logs within watch_gossip.

Important Implementation Details and Algorithms

Interaction with Other System Components

Mermaid Class Diagram

classDiagram
class WatchGossipConfig {
+max_nodes_with_same_id: usize
+trusted_pubkeys: HashSet<VerifyingKey>
}
class SubscribeStrategy {
<<enum>>
+Peer(PeerId)
+Proxy(Vec<SocketAddr>)
}
class PeerData {
+peer_addr: SocketAddr
+bk_api_socket: SocketAddr
}
class watch_gossip {
+async fn watch_gossip()
}
class refresh {
+fn refresh()
}
class verify_pubkey_in {
+fn verify_pubkey_in()
}
class peer_subscribe_addrs {
+fn peer_subscribe_addrs()
}
class strategy_info {
+fn strategy_info()
}
class subscribe_info {
+fn subscribe_info()
}
class peers_info {
+fn peers_info()
}
watch_gossip --> SubscribeStrategy
watch_gossip --> WatchGossipConfig
watch_gossip --> ChitchatRef
watch_gossip --> NetMetrics
watch_gossip --> PeerData
watch_gossip --> refresh
refresh --> SubscribeStrategy
refresh --> ChitchatRef
refresh --> WatchGossipConfig
refresh --> PeerData
verify_pubkey_in --> HashMap
verify_pubkey_in --> VerifyingKey
peer_subscribe_addrs --> SocketAddr
strategy_info --> SubscribeStrategy
subscribe_info --> HashMap
peers_info --> HashMap

This diagram shows the primary structs, enum, and functions of the file and their relationships, highlighting how the main watcher function depends on configuration, strategy, and underlying data types to perform its task.