watch.rs
Overview
This file implements the functionality to monitor and manage gossip peers in a distributed network environment. It provides mechanisms to watch live gossip nodes, maintain subscriptions based on configurable strategies, and track peer data with respect to trusted public keys. The main asynchronous function watch_gossip continuously refreshes the state of gossip peers, updates subscription lists, and sends updates through asynchronous channels. This file handles peer verification, subscription management, and metric reporting related to gossip communication.
Enums
SubscribeStrategy<PeerId>
Defines the strategy used to subscribe to gossip peers:
Peer(PeerId): Subscribe directly to a specific peer identified byPeerId.Proxy(Vec<SocketAddr>): Subscribe through a list of proxy addresses.
This enum governs how subscription addresses are selected and filtered.
Structs
WatchGossipConfig
Configuration parameters for watching gossip peers:
Field | Type | Description |
|---|---|---|
|
| Maximum number of nodes allowed with the same peer ID. |
|
| Set of trusted public keys used for peer verification. |
This struct is clonable and comparable, allowing dynamic updating of the watch configuration.
Functions
watch_gossip
pub async fn watch_gossip<PeerId>(
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
mut config_rx: tokio::sync::watch::Receiver<WatchGossipConfig>,
strategy: SubscribeStrategy<PeerId>,
chitchat: ChitchatRef,
subscribe_tx: tokio::sync::watch::Sender<Vec<Vec<SocketAddr>>>,
peers_tx: tokio::sync::watch::Sender<HashMap<PeerId, Vec<PeerData>>>,
metrics: Option<NetMetrics>,
)
where
PeerId: Clone + Display + Send + Sync + Hash + Eq + FromStr<Err: Display> + 'static,
Purpose:
Continuously monitors live gossip nodes and updates subscription and peer data based on the configured strategy and trusted keys.
Parameters:
shutdown_rx: Receiver to listen for shutdown signals.config_rx: Receiver for dynamic configuration changes (WatchGossipConfig).strategy: Subscription strategy determining how to select peers.chitchat: Reference to the gossip network state.subscribe_tx: Sender channel to propagate the list of subscription addresses.peers_tx: Sender channel to propagate the map of peers and their associated data.metrics: Optional network metrics for reporting peer statistics.
Behavior:
Uses a loop to refresh live nodes via the
refreshfunction.Updates
subscribe_txandpeers_txif changes occur.Reports metrics on the number of gossip peers and live nodes.
Reacts to changes in shutdown signal, configuration, or live nodes, breaking the loop on shutdown or errors.
Usage Example:
let shutdown_rx = shutdown_channel.subscribe();
let config_rx = config_channel.subscribe();
let strategy = SubscribeStrategy::Peer(my_peer_id);
let chitchat = chitchat_ref.clone();
let subscribe_tx = subscribe_channel.clone();
let peers_tx = peers_channel.clone();
let metrics = Some(net_metrics);
tokio::spawn(async move {
watch_gossip(shutdown_rx, config_rx, strategy, chitchat, subscribe_tx, peers_tx, metrics).await;
});
refresh
fn refresh<PeerId>(
strategy: &SubscribeStrategy<PeerId>,
chitchat: &ChitchatRef,
subscribe: &mut HashMap<Vec<SocketAddr>, HashSet<transport_layer::VerifyingKey>>,
peers: &mut HashMap<(PeerId, SocketAddr), (PeerData, HashSet<transport_layer::VerifyingKey>)>,
config: &WatchGossipConfig,
) -> usize
where
PeerId: Clone + Display + FromStr<Err: Display> + Send + Sync + Hash + Eq + 'static,
Purpose:
Refreshes the internal state of subscriptions and peers based on the current gossip nodes and the subscription strategy.
Parameters:
strategy: Strategy for subscribing to peers.chitchat: Reference to the gossip network state.subscribe: Mutable map tracking subscriptions keyed by socket address vectors and their associated verifying keys.peers: Mutable map tracking peer data keyed by(PeerId, SocketAddr)and their verifying keys.config: Current watch configuration.
Returns:
The count of live nodes processed.
Implementation Details:
Locks the
chitchatstate for reading live nodes.Filters and verifies peers based on the trusted public keys from the configuration.
Determines subscription addresses based on the strategy:
For
Peerstrategy, subscribes to addresses of peers different from self.For
Proxystrategy, subscribes through proxies if they match configured proxies.
Updates
subscribeandpeersmaps, adding or modifying entries accordingly.Uses
verify_pubkey_into maintain sets of trusted public keys in the maps.Counts live nodes processed and returns the count.
verify_pubkey_in
fn verify_pubkey_in<K, V, F>(
map: &mut HashMap<K, V>,
key: &K,
pubkey: &transport_layer::VerifyingKey,
is_trusted: bool,
get_pubkeys: F,
)
where
K: Eq + Hash,
F: Fn(&mut V) -> &mut HashSet<transport_layer::VerifyingKey>,
Purpose:
Manages the presence of a public key inside a nested set within a map. Adds or removes trusted keys and removes entire map entries if no trusted keys remain.
Parameters:
map: Mutable reference to a map keyed byK.key: Key in the map to update.pubkey: Public key to add or remove.is_trusted: Boolean indicating if the key is trusted.get_pubkeys: Function to extract the mutableHashSetof public keys from the map value.
Behavior:
If
is_trustedis true, inserts the public key.If
is_trustedis false, removes the public key and removes the entire entry if the set becomes empty.
peer_subscribe_addrs
fn peer_subscribe_addrs(peer_addr: SocketAddr, proxies: &[SocketAddr]) -> Vec<SocketAddr>
Purpose:
Determines the list of addresses to subscribe to for a given peer.
Parameters:
peer_addr: The advertised address of the peer.proxies: A list of proxy addresses associated with the peer.
Returns:
If the proxies list is empty, returns a vector containing only the peer's advertised address. Otherwise, returns the proxies.
Helper Functions for Debugging and Logging
strategy_info: Converts aSubscribeStrategyinto a concise string representation.subscribe_info: Produces a string containing all subscription address groups.peers_info: Produces a string summary of the peer IDs and addresses contained in the map.
These functions are used primarily for tracing logs within watch_gossip.
Important Implementation Details and Algorithms
Uses a watch channel mechanism (
tokio::sync::watch) for reactive updates to configuration, shutdown signals, and live nodes.Employs a hash map keyed by subscription addresses and peer identifiers for efficient lookups and updates.
Maintains sets of trusted public keys to allow dynamic trust verification and peer filtering.
Limits the number of nodes with the same peer ID according to configuration to avoid overload or redundancy.
Differentiates subscription logic based on whether the strategy is direct peer subscription or proxy-based.
Interaction with Other System Components
Chitchat (
ChitchatRef): The file depends on a shared, lockable reference to the gossip network state (live nodes and their states).Metrics (
NetMetrics): Optionally reports metrics on the number of gossip peers and live nodes.Peer Data (
PeerData): Uses and updates peer information such as addresses for the network layer.Transport Layer Verifying Keys: Uses cryptographically verified keys for trustworthy peer verification.
Tokio Channels: Utilizes asynchronous watch channels to receive shutdown signals, configuration updates, and send out subscription and peer data updates.
Resolver (
GossipPeer): Converts raw node states into structured peer information.
Mermaid Class Diagram
classDiagram
class WatchGossipConfig {
+max_nodes_with_same_id: usize
+trusted_pubkeys: HashSet<VerifyingKey>
}
class SubscribeStrategy {
<<enum>>
+Peer(PeerId)
+Proxy(Vec<SocketAddr>)
}
class PeerData {
+peer_addr: SocketAddr
+bk_api_socket: SocketAddr
}
class watch_gossip {
+async fn watch_gossip()
}
class refresh {
+fn refresh()
}
class verify_pubkey_in {
+fn verify_pubkey_in()
}
class peer_subscribe_addrs {
+fn peer_subscribe_addrs()
}
class strategy_info {
+fn strategy_info()
}
class subscribe_info {
+fn subscribe_info()
}
class peers_info {
+fn peers_info()
}
watch_gossip --> SubscribeStrategy
watch_gossip --> WatchGossipConfig
watch_gossip --> ChitchatRef
watch_gossip --> NetMetrics
watch_gossip --> PeerData
watch_gossip --> refresh
refresh --> SubscribeStrategy
refresh --> ChitchatRef
refresh --> WatchGossipConfig
refresh --> PeerData
verify_pubkey_in --> HashMap
verify_pubkey_in --> VerifyingKey
peer_subscribe_addrs --> SocketAddr
strategy_info --> SubscribeStrategy
subscribe_info --> HashMap
peers_info --> HashMap
This diagram shows the primary structs, enum, and functions of the file and their relationships, highlighting how the main watcher function depends on configuration, strategy, and underlying data types to perform its task.