bp_resolver.rs
Overview
The bp_resolver.rs file implements a concrete version of the BPResolver trait, responsible for resolving and retrieving the network addresses of "block producers" (BPs) based on thread identifiers within a distributed system. It serves as a bridge between the dynamic set of peers currently connected (received via a Tokio watch channel) and the persistent repository that maps threads to their corresponding BP node identifiers.
The main functionality centers around mapping thread IDs to their associated BP network addresses (SocketAddr), enabling other components in the system to route messages or requests to the correct BP nodes. This file is particularly concerned with keeping the BP resolution up-to-date as the set of peers changes over time.
Structs and Traits
BPResolverImpl
Purpose:
Implements theBPResolvertrait to resolve block producer node addresses for given thread identifiers using a combination of live peer data and stored thread-to-node mappings.Fields:
peers_rx: tokio::sync::watch::Receiver<HashMap<NodeIdentifier, Vec<PeerData>>>
A Tokio watch channel receiver that provides live updates of the current peers in the network. Each peer is mapped by itsNodeIdentifierto a vector ofPeerDataobjects.repository: Arc<Mutex<RepositoryImpl>>
A thread-safe reference-counted pointer to a mutex-guarded repository instance, which holds persistent mappings of threads to BP node identifiers.
Constructor:
pub fn new( peers_rx: tokio::sync::watch::Receiver<HashMap<NodeIdentifier, Vec<PeerData>>>, repository: Arc<Mutex<RepositoryImpl>>, ) -> SelfParameters:
peers_rx: Receiver channel providing live peer status.repository: Shared repository instance for thread-node mappings.
Returns: A new instance of
BPResolverImpl.
Usage Example:
let resolver = BPResolverImpl::new(peers_receiver, shared_repository);
Trait Implementation: BPResolver for BPResolverImpl
The trait BPResolver defines the contract for resolving block producer addresses, which BPResolverImpl fulfills.
Method: resolve
fn resolve(&mut self, thread_id: Option<String>) -> Vec<SocketAddr>
Description:
Resolves and returns a deduplicated list of network socket addresses (SocketAddr) corresponding to block producers that serve the specified thread. If no thread ID is provided, it resolves addresses for all threads.Parameters:
thread_id: Option<String>— An optional thread identifier as a string. IfNone, all threads are considered.
Returns:
A vector of
SocketAddrrepresenting the network addresses of the resolved block producers.
Detailed Behavior:
Locks the
repositoryto obtain the current mapping from threads to BP node IDs viaget_nodes_by_threads().Converts the optional string
thread_idinto an internal thread ID type (usingtry_into()).Accesses the current peer data snapshot via the watch channel receiver (
peers_rx).Iterates over all thread-to-node mappings, filtering by the target thread if specified.
For each relevant BP node ID, retrieves the associated peer data and extracts the first peer's network address.
Replaces the port in the address with a constant default port (
DEFAULT_NODE_URL_PORT).Collects these addresses into a vector, removes duplicates, and returns it.
Important Notes:
The method uses the
parking_lot::Mutexfor efficient locking.The TODO comment indicates that the thread list might change at runtime, suggesting a future enhancement to use a shared service or dynamic subscription.
Uses
tracing::debugfor logging the current thread-to-node mapping, useful for debugging or monitoring.
Usage Example:
let mut resolver = BPResolverImpl::new(peers_rx, repository); let bp_addresses = resolver.resolve(Some("thread-42".to_string())); for addr in bp_addresses { println!("BP address: {}", addr); }
Key Dependencies and Interactions
message_router::bp_resolver::BPResolver
The trait that defines the resolver interface, implemented byBPResolverImpl.message_router::DEFAULT_NODE_URL_PORT
The default network port to assign to resolved peer addresses.network::network::PeerData
Represents metadata about a network peer, including its socket address.crate::node::NodeIdentifier
Abstract identifier type for nodes in the network.crate::repository::repository_impl::RepositoryImpl
Persistent storage managing thread-to-node mappings, accessed via a mutex to ensure thread safety.Tokio's watch channel
Provides real-time updates of peer data, enabling the resolver to reflect the current network state.
Implementation Details and Algorithms
Locking and Concurrency:
Usesparking_lot::Mutexfor efficient and deadlock-resistant locking of the repository. The peer data is accessed via Tokio's asynchronous watch channel, which allows non-blocking reads of the latest peer information.Data Filtering and Mapping:
Theresolvemethod filters the thread-to-node map based on the optional thread ID. It maps node identifiers to their peer data, then extracts the first peer's address, ensuring a consistent way to pick a network endpoint for each BP.Address Port Adjustment:
The port of each resolved peer address is forcibly set to a default constant, ensuring uniformity in network communication ports regardless of the original peer data.Deduplication:
The final list of addresses is deduplicated to avoid redundant network calls or message sending.
Interaction with Other System Components
Peers Management:
BPResolverImpllistens to updates about the current set of peers via thepeers_rxwatch receiver, which is likely managed by a network or peer discovery module.Repository Access:
It reads persistent mappings from theRepositoryImpl, which is responsible for maintaining the association between threads and their block producers. This repository is shared across components and is protected by a mutex for safe concurrent access.Message Routing:
Other components, such as message routers or network clients, invokeresolveto obtain target addresses for sending messages to block producers serving specific threads.Logging:
Utilizes thetracingcrate for debug-level logging of internal state, aiding in observability.
Visual Diagram
classDiagram
class BPResolver {
<<trait>>
+resolve(thread_id: Option<String>) Vec<SocketAddr>
}
class BPResolverImpl {
-peers_rx: watch::Receiver<HashMap<NodeIdentifier, Vec<PeerData>>>
-repository: Arc<Mutex<RepositoryImpl>>
+new(peers_rx, repository)
+resolve(thread_id: Option<String>) Vec<SocketAddr>
}
class RepositoryImpl {
+get_nodes_by_threads() HashMap<ThreadId, Option<NodeIdentifier>>
}
BPResolverImpl ..|> BPResolver
BPResolverImpl --> RepositoryImpl : uses (via mutex)
BPResolverImpl --> "watch::Receiver" : listens to peer updates
This diagram illustrates the relationship between BPResolverImpl, the BPResolver trait it implements, its dependency on the RepositoryImpl, and its consumption of peer data through a Tokio watch channel receiver. The key method resolve is highlighted within the trait and the implementation.