mod.rs
Overview
This file implements the core logic for a publish-subscribe (pub-sub) system over a generic network transport. It manages network connections, subscription scheduling, and message flow between publishers and subscribers. The primary struct, PubSub, orchestrates connection lifecycle management, including establishing, tracking, and removing connections, as well as handling subscriptions and disconnections based on trust credentials.
The file also provides utility functions for monitoring asynchronous tasks related to network connections, ensuring robustness through error and panic reporting.
Modules
config: Configuration definitions related to certificates and keys.
connection: Connection management utilities includingConnectionWrapper.executor: Contains the run function and
IncomingSendertype.receiver: Implements message receiving logic.
sender: Implements message sending logic.
server: Server-side networking logic.
subscribe: Subscription management utilities.
Main Entities
Struct PubSub<Transport>
A generic pub-sub manager parameterized over a network transport implementing NetTransport. It maintains:
transport: The underlying network transport instance.is_proxy: Boolean flag indicating if this instance acts as a proxy.inner: Thread-safe, shared inner state containing connection and task information.
Usage example:
let pubsub = PubSub::new(my_transport, false);
let open_conn_count = pubsub.open_connections();
Struct PubSubInner<Connection>
Holds internal mutable state:
next_connection_id: Counter for assigning unique connection IDs.connections: Map of connection IDs toConnectionWrapperinstances.connections_by_remote_addr: Map of remote socket addresses to connections.tasks: Map of connection IDs to Tokio task handles supervising connection lifecycles.
Implementations
PubSubInner<Connection>
generate_connection_id(&mut self) -> u64
Generates a unique connection ID using wrapping addition to avoid zero.
PubSub<Transport>
new(transport: Transport, is_proxy: bool) -> Self
Constructs a newPubSubinstance with an empty connection set.open_connections(&self) -> usize
Returns the current number of open connections.schedule_subscriptions(&self, subscribe: &Vec<Vec<SocketAddr>>) -> (Vec<Vec<SocketAddr>>, Vec<Arc<ConnectionWrapper<Transport::Connection>>>)
Compares currently subscribed connections with a target subscription list and returns:New subscriptions to initiate.
Existing connections to terminate.
Internally uses thedifffunction to compute the differences.
disconnect_untrusted(&self, credential: &NetCredential) -> impl Future<Output = ()>
Disconnects all connections that fail verification against the provided credentials.subscribe_to_publisher(...) -> anyhow::Result<()>
Attempts to connect to one of the given publisher addresses and, on success, adds a connection handler. Uses ALPN protocols that differ based on proxy mode.add_connection_handler(...) -> anyhow::Result<()>
Adds a new connection, spawns a supervisor task to handle message sending/receiving, and updates internal state maps.remove_connection(&self, conn: &ConnectionInfo)
Removes a connection and its associated task from internal maps and logs the disconnection event with role-specific details.
Utility Functions
trace_task_result(result: Result<anyhow::Result<()>, JoinError>, name: &str) -> Result<anyhow::Result<()>, JoinError>
Logs the outcome of a task, reporting success, failure, or panic with detailed error messages.
spawn_critical_task(name: &'static str, task: impl Future<Output = ()> + Send + 'static, metrics: Option<NetMetrics>)
Spawns a Tokio task for critical background work and monitors its lifecycle.
monitor_critical_task(name: &'static str, task: JoinHandle<()>, metrics: Option<NetMetrics>)
Monitors a spawned Tokio task, logging panics and reporting errors to metrics if provided.
Internal Helper Function: diff
fn diff<Addr: Hash + Eq + Clone, Conn: Clone>(
original: impl Iterator<Item = (Addr, Conn)>,
target: &Vec<Vec<Addr>>,
) -> (Vec<Vec<Addr>>, Vec<Conn>)
Compares two collections of addresses and connections:
original: Iterator of current subscribed addresses and their connection wrappers.target: Desired subscription groups (each group is a vector of addresses).
Returns a tuple:
Vec<Vec<Addr>>: Subscription groups that should be newly included.Vec<Conn>: Connections that should be excluded (disconnected).
Algorithm Highlights:
Marks existing connections to preserve if their addresses appear in target.
Detects fully new address groups to include.
Identifies obsolete connections to exclude.
Interaction with Other Components
Uses
NetTransport,NetConnection, andNetCredentialtraits from the transport layer for abstracted network I/O.Utilizes
ConnectionWrapperfor enriched connection metadata and management.Coordinates message flow with
IncomingSenderand Tokio channels (broadcast::Sender,mpsc::Sender).Reports metrics via
NetMetrics.Employs ALPN protocols defined as constants for pub-sub handshake compatibility.
Interacts with
connectionmodule'sconnection_supervisorfor connection lifecycle management.
Visual Diagram: PubSub Structure and Workflow
classDiagram
class PubSub {
+transport: Transport
+is_proxy: bool
+open_connections()
+schedule_subscriptions()
+disconnect_untrusted()
+subscribe_to_publisher()
+add_connection_handler()
+remove_connection()
}
class PubSubInner {
-next_connection_id: u64
-connections: HashMap
-connections_by_remote_addr: HashMap
-tasks: HashMap
+generate_connection_id()
}
class ConnectionWrapper {
<<from connection module>>
}
class ConnectionSupervisor {
<<tokio task>>
}
PubSub "1" *-- "1" PubSubInner : inner
PubSubInner "1" o-- "*" ConnectionWrapper : connections
PubSubInner "1" o-- "*" ConnectionSupervisor : tasks
Test Coverage
The
difffunction is tested with three cases verifying inclusion and exclusion logic for address groups and connections.
Notes on Implementation
Uses
parking_lot::RwLockfor efficient concurrent read-write synchronization.Connection IDs wrap around to avoid zero, ensuring valid unique identifiers.
Handles connection roles (Publisher, Subscriber, DirectReceiver) with role-specific behaviors and logging.
Employs
tokio::spawnfor asynchronous task management with detailed tracing for error handling.Uses conditional compilation attributes (#[allow(clippy::too_many_arguments)]) where function signatures require multiple parameters due to networking context.
References
For details on network transport traits, see
NetTransport.Message passing and subscription protocols are defined in pub_sub::connection.
Metrics reporting and tracing are covered in
metrics.Certificate and credential handling can be found in config.