mod.rs

Overview

This file implements the core logic for a publish-subscribe (pub-sub) system over a generic network transport. It manages network connections, subscription scheduling, and message flow between publishers and subscribers. The primary struct, PubSub, orchestrates connection lifecycle management, including establishing, tracking, and removing connections, as well as handling subscriptions and disconnections based on trust credentials.

The file also provides utility functions for monitoring asynchronous tasks related to network connections, ensuring robustness through error and panic reporting.


Modules


Main Entities

Struct PubSub<Transport>

A generic pub-sub manager parameterized over a network transport implementing NetTransport. It maintains:

Usage example:

let pubsub = PubSub::new(my_transport, false);
let open_conn_count = pubsub.open_connections();

Struct PubSubInner<Connection>

Holds internal mutable state:


Implementations

PubSubInner<Connection>

PubSub<Transport>


Utility Functions

trace_task_result(result: Result<anyhow::Result<()>, JoinError>, name: &str) -> Result<anyhow::Result<()>, JoinError>

Logs the outcome of a task, reporting success, failure, or panic with detailed error messages.

spawn_critical_task(name: &'static str, task: impl Future<Output = ()> + Send + 'static, metrics: Option<NetMetrics>)

Spawns a Tokio task for critical background work and monitors its lifecycle.

monitor_critical_task(name: &'static str, task: JoinHandle<()>, metrics: Option<NetMetrics>)

Monitors a spawned Tokio task, logging panics and reporting errors to metrics if provided.


Internal Helper Function: diff

fn diff<Addr: Hash + Eq + Clone, Conn: Clone>(
    original: impl Iterator<Item = (Addr, Conn)>,
    target: &Vec<Vec<Addr>>,
) -> (Vec<Vec<Addr>>, Vec<Conn>)

Compares two collections of addresses and connections:

Returns a tuple:

Algorithm Highlights:


Interaction with Other Components


Visual Diagram: PubSub Structure and Workflow

classDiagram
class PubSub {
+transport: Transport
+is_proxy: bool
+open_connections()
+schedule_subscriptions()
+disconnect_untrusted()
+subscribe_to_publisher()
+add_connection_handler()
+remove_connection()
}
class PubSubInner {
-next_connection_id: u64
-connections: HashMap
-connections_by_remote_addr: HashMap
-tasks: HashMap
+generate_connection_id()
}
class ConnectionWrapper {
<<from connection module>>
}
class ConnectionSupervisor {
<<tokio task>>
}
PubSub "1" *-- "1" PubSubInner : inner
PubSubInner "1" o-- "*" ConnectionWrapper : connections
PubSubInner "1" o-- "*" ConnectionSupervisor : tasks

Test Coverage


Notes on Implementation


References