subscribe.rs

Overview

This file implements the core logic for managing subscriptions to publishers in a networked environment. It continuously monitors changes in subscription requests, maintains active connections to publishers, and handles reconnection attempts upon disconnections or failures. The file uses asynchronous Rust features provided by the Tokio runtime to concurrently manage multiple subscription tasks, connection lifecycle events, and shutdown signals.

The main functionality is encapsulated in the handle_subscriptions asynchronous function, which orchestrates subscription lifecycle management, including scheduling new subscriptions, closing obsolete ones, and retrying failed connections. Utility functions support formatting and logging of subscription-related information.


Functions and Methods

join_addrs(addrs: &[SocketAddr]) -> String

Concatenates a slice of socket addresses into a comma-separated string.

task_addrs(addrs: &HashMap<tokio::task::Id, Vec<SocketAddr>>, id: tokio::task::Id) -> String

Fetches and joins the socket addresses associated with a specific Tokio task ID.

handle_subscriptions<Transport: NetTransport + 'static>(...) -> anyhow::Result<()>

The primary asynchronous function managing subscription connections to publishers.

diff_info(total: usize, include: usize, exclude: usize) -> String

Generates a formatted string describing subscription count differences for logging purposes.


Implementation Details and Algorithms


Interaction with Other System Components


Diagram: Subscription Management Workflow

flowchart TD
A[Start handle_subscriptions] --> B{Check shutdown signal}
B -- Yes --> Z[Exit function]
B -- No --> C[Get current subscription list]
C --> D[Filter out recently closed addresses]
D --> E[Schedule subscriptions via PubSub]
E --> F{Subscriptions to close?}
F -- Yes --> G[Close obsolete connections]
F -- No --> H[Spawn tasks to connect to new publishers]
H --> I[Monitor task completions]
I --> J{All tasks done?}
J -- No --> I
J -- Yes --> K[Calculate sleep duration]
K --> L[Wait for events]
L -->|Shutdown| B
L -->|Subscription change| C
L -->|Timeout| C
L -->|Connection closed| D

This file implements critical runtime logic for dynamic subscription management in a networked pub-sub environment, coordinating connection tasks, reacting to state changes, and maintaining system health through metrics and retry policies.

For related details on network transport abstraction and pub-sub connection protocols, see NetTransport and PubSub. For configuration structures, refer to NetworkConfig. Metrics reporting is detailed under NetMetrics.