bp_resolver.rs
Overview
This file implements the BPResolverImpl struct, which provides functionality for managing and resolving Breakpoint (BP) socket addresses associated with different thread identifiers. It maintains a thread-safe mapping between thread IDs and their corresponding lists of socket addresses, supports updating this mapping dynamically, and offers a resolution mechanism to obtain the relevant socket addresses for a given thread ID. The component is designed to listen for incoming updates asynchronously and integrates with a message routing system via the BPResolver trait.
Detailed Components
BPResolverImpl Struct
Purpose:
Stores and manages the mapping from thread identifiers (String) to a list of BP socket addresses (Vec<SocketAddr>). Provides methods to update this mapping and resolve addresses based on thread IDs.Fields:
Field Name
Type
Description
default_bpSocketAddrDefault BP address used when no thread-specific addresses exist.
map_thread_addrArc<RwLock<HashMap<String, Vec<SocketAddr>>>>Thread-safe shared map of thread IDs to their BP address lists.
Methods of BPResolverImpl
new(default_bp: SocketAddr) -> Self
Description:
Constructs a newBPResolverImplinstance with a specified default BP address and initializes an empty thread-to-address mapping.Parameters:
default_bp: The default socket address used if no specific address is found for a thread.
Returns:
A new instance of
BPResolverImpl.
Usage Example:
let default_address = "127.0.0.1:1234".parse().unwrap(); let resolver = BPResolverImpl::new(default_address);
upsert(&mut self, thread_id: String, bp_list: Vec<String>) -> anyhow::Result<()>
Description:
Updates or inserts a new mapping entry for a given thread ID with a list of BP addresses. The input list of addresses is parsed from strings intoSocketAddrinstances, defaulting to a predefined port if the port is omitted.Parameters:
thread_id: The identifier of the thread whose BP addresses are being updated.bp_list: A vector of string representations of socket addresses.
Returns:
Ok(())if insertion/update is successful.Errif parsing any of the addresses fails.
Implementation Details:
Usestry_parse_socket_addrto convert string IP addresses intoSocketAddrobjects, applying a default port when necessary. Only updates the internal map if the new list differs from the existing one to prevent unnecessary writes.Usage Example:
resolver.upsert("thread-123".to_string(), vec!["192.168.1.1".to_string(), "192.168.1.2:9000".to_string()])?;
start_listener(resolver: Arc<Mutex<BPResolverImpl>>, bp_data_rx: mpsc::Receiver<(String, Vec<String>)>) -> anyhow::Result<()>
Description:
Starts a dedicated thread that listens for incoming BP data updates via a multi-producer, single-consumer (mpsc) channel. Each received message contains a thread ID and its corresponding list of BP addresses to update.Parameters:
resolver: A thread-safe, mutex-protected shared instance ofBPResolverImpl.bp_data_rx: Receiver channel for incoming BP update messages (thread ID and BP address list).
Returns:
Ok(())if the listener thread is successfully spawned.Errif spawning the thread fails.
Implementation Details:
The listener runs in a background thread named"BP update handler". Upon receiving a message, it attempts to update the resolver’s map viaupsert. Errors during update are logged usingtracing::error. Debug logs are emitted for each received update.Usage Example:
let (tx, rx) = mpsc::channel(); let resolver = Arc::new(Mutex::new(BPResolverImpl::new(default_address))); BPResolverImpl::start_listener(resolver.clone(), rx)?; // Send updates through tx
impl BPResolver for BPResolverImpl
Trait Implemented:
BPResolverfrom themessage_router::bp_resolvermodule.Method:
resolve(&mut self, thread_id: Option<String>) -> Vec<SocketAddr>Description:
Resolves the list of BP socket addresses for a given optional thread ID. If the thread ID isNoneor no mapping exists for the given ID, returns the default BP address in a vector.Parameters:
thread_id: An optional thread identifier for which to resolve BP addresses.
Returns:
A vector of
SocketAddrinstances corresponding to the thread ID or the default BP address if none found.
Usage Example:
let addresses = resolver.resolve(Some("thread-123".to_string())); for addr in addresses { println!("BP Address: {}", addr); }
Important Implementation Details and Algorithms
Thread Safety:
UsesArcfor atomic reference counting andRwLockfrom theparking_lotcrate for efficient read-write locking of the internal map. This allows concurrent reads and exclusive writes to the map of thread IDs to addresses.Address Parsing:
The methodupsertutilizestry_parse_socket_addrto convert string representations of IP addresses intoSocketAddrobjects with a default port fallback (DEFAULT_BP_PORT). This ensures consistent address formatting and validation.Update Optimization:
Theupsertmethod only writes to the map if the new list of addresses differs from the existing one, reducing unnecessary locking and writes.Asynchronous Updates:
Thestart_listenermethod enables asynchronous updates to the resolver mapping by spawning a thread that listens on an mpsc channel. This design separates the update reception from other logic, improving responsiveness and concurrency.Error Handling and Logging:
Uses theanyhowcrate for error propagation and thetracingcrate for structured logging of debug and error events.
Interaction with Other System Components
message_router::bp_resolver::BPResolverTrait:
Provides the interface for resolving BP addresses.BPResolverImplimplements this trait, allowing it to be used wherever theBPResolverabstraction is required.network::try_parse_socket_addrFunction:
Used for parsing string IP addresses intoSocketAddrinstances. This function is crucial for maintaining address validity and handling default ports.mpsc::Receiverfromstd::sync::mpsc:
Receives asynchronous BP update messages, enabling external components to push updates to the resolver.Synchronization Primitives (
Arc,Mutex,RwLock):
Ensures safe concurrent access to shared data structures.Logging via
tracing:
Integrates with system-wide logging for debugging and error reporting.
Visual Diagram
classDiagram
class BPResolverImpl {
-default_bp: SocketAddr
-map_thread_addr: Arc<RwLock<HashMap<String, Vec<SocketAddr>>>>
+new(default_bp: SocketAddr)
+upsert(thread_id: String, bp_list: Vec<String>) Result
+start_listener(resolver: Arc<Mutex<BPResolverImpl>>, bp_data_rx: mpsc::Receiver)
+resolve(thread_id: Option<String>) Vec<SocketAddr>
}
BPResolverImpl ..|> BPResolver
This diagram shows the core structure of BPResolverImpl, its fields, and public methods, as well as its implementation of the BPResolver trait.