subscribe.rs
Overview
This file implements the core logic for managing subscriptions to publishers in a networked environment. It continuously monitors changes in subscription requests, maintains active connections to publishers, and handles reconnection attempts upon disconnections or failures. The file uses asynchronous Rust features provided by the Tokio runtime to concurrently manage multiple subscription tasks, connection lifecycle events, and shutdown signals.
The main functionality is encapsulated in the handle_subscriptions asynchronous function, which orchestrates subscription lifecycle management, including scheduling new subscriptions, closing obsolete ones, and retrying failed connections. Utility functions support formatting and logging of subscription-related information.
Functions and Methods
join_addrs(addrs: &[SocketAddr]) -> String
Concatenates a slice of socket addresses into a comma-separated string.
Parameters:
addrs: Slice ofSocketAddrrepresenting network addresses.
Returns: A
Stringcontaining all addresses joined by commas.Usage Example:
let addrs = vec![SocketAddr::from(([127,0,0,1], 8080)), SocketAddr::from(([192,168,1,1], 3030))]; let joined = join_addrs(&addrs); // joined == "127.0.0.1:8080,192.168.1.1:3030"
task_addrs(addrs: &HashMap<tokio::task::Id, Vec<SocketAddr>>, id: tokio::task::Id) -> String
Fetches and joins the socket addresses associated with a specific Tokio task ID.
Parameters:
addrs: A map from Tokio task IDs to vectors ofSocketAddr.id: The Tokio task ID for which to retrieve addresses.
Returns: A comma-separated string of addresses related to the task ID, or an empty string if none exist.
Usage: Used in logging to associate errors or panics with the relevant network addresses.
handle_subscriptions<Transport: NetTransport + 'static>(...) -> anyhow::Result<()>
The primary asynchronous function managing subscription connections to publishers.
Parameters:
shutdown_rx: Receiver watching for shutdown signals (bool).network_config_rx: Receiver with updates to network configuration (NetworkConfig).pub_sub: Instance ofPubSubmanaging pub-sub logic over theTransport.metrics: Optional network metrics collector (NetMetrics).subscribe_rx: Receiver watching for subscription target address lists (Vec<Vec<SocketAddr>>).incoming_messages: Sender for incoming messages (IncomingSender).outgoing_messages: Broadcast sender for outgoing messages (broadcast::Sender<OutgoingMessage>).connection_closed_tx: Sender for notifying closed connections (mpsc::Sender<Arc<ConnectionInfo>>).connection_closed_rx: Receiver for closed connection notifications (mpsc::Receiver<Arc<ConnectionInfo>>).
Returns:
anyhow::Result<()>indicating success or an error in subscription management.Functionality:
Starts an infinite loop that monitors subscription lists and connection closures.
Maintains a map of recently closed addresses to avoid immediate resubscription.
Calls
pub_sub.schedule_subscriptionsto determine which connections to open or close.Spawns asynchronous tasks to connect to publishers, tracking tasks with
JoinSet.Handles task completions, logging errors, incrementing success counters, and reporting metrics.
Implements retry logic with adaptive sleep durations based on subscription success and recent closures.
Waits for events: shutdown signals, changes in subscription lists, timeouts, or connection closures.
Cleans up inactive connections gracefully.
Usage Example:
// Typical usage inside a Tokio runtime context: handle_subscriptions( shutdown_rx, network_config_rx, pub_sub, Some(metrics), subscribe_rx, incoming_messages, outgoing_messages, connection_closed_tx, connection_closed_rx, ).await?;Important Details:
The function filters out addresses recently closed within 200 milliseconds to prevent rapid reconnects.
Uses
tokio::select!to concurrently await multiple asynchronous events.Connection tasks are spawned via
JoinSet, allowing collection of their results and task IDs.Metrics are reported for subscriber counts and error situations.
The
reasonstring is used for tracing subscription loop events and changes.
diff_info(total: usize, include: usize, exclude: usize) -> String
Generates a formatted string describing subscription count differences for logging purposes.
Parameters:
total: Total number of subscriptions.include: Number of subscriptions added.exclude: Number of subscriptions removed.
Returns: A formatted string indicating counts and changes (e.g., "10 (+2 -1)").
Usage: Used for tracing subscription updates to provide concise state diffs.
Implementation Details and Algorithms
Subscription Scheduling and Management:
The function relies onpub_sub.schedule_subscriptionsto obtain two lists:should_be_subscribed: Connections that need to be established.should_be_unsubscribed: Connections to be closed.
Connection Lifecycle:
Connections scheduled for unsubscription are closed asynchronously with a close code0.Task Management:
For each new subscription, a connection task is spawned usingJoinSet. The tasks concurrently attempt to establish connections to publishers. Each task's completion is monitored:On success, increments a counter.
On failure, logs errors and reports metrics.
On panic, logs error and reports metrics.
Retry Logic:
Adaptive delays control retry timing:Short delay (100 ms) after failed subscriptions.
Slightly longer delay (200 ms) if recently closed connections exist.
Otherwise, a very long sleep to minimize CPU usage.
Event Handling:
Usestokio::select!to react to:Shutdown signals.
Changes in the subscription list.
Timeouts for retries.
Notifications of closed subscription connections.
Connection Filtering:
The file maintains alast_closed_addrsmap to temporarily exclude recently closed addresses from subscription attempts, preventing rapid cycling.
Interaction with Other System Components
PubSubModule:
The file heavily interacts with thePubSubabstraction, which provides methods likeschedule_subscriptionsandsubscribe_to_publisher. These handle the pub-sub protocol-specific connection management.Network Configuration (
NetworkConfig):
Receives updated network credentials and configuration dynamically to authenticate or configure connections.Metrics (
NetMetrics):
Optional collection and reporting of subscriber counts and errors to support monitoring and observability.Connection Management (
ConnectionInfo):
Receives notifications about connection closures to update subscription state.Message Passing:
Uses Tokio channels for incoming messages (IncomingSender), outgoing broadcast messages (broadcast::Sender), and connection closure notifications (mpscchannels).Transport Layer (
NetTransport):
Generic over a transport type implementing theNetTransporttrait, allowing flexibility in underlying network protocols.
Diagram: Subscription Management Workflow
flowchart TD
A[Start handle_subscriptions] --> B{Check shutdown signal}
B -- Yes --> Z[Exit function]
B -- No --> C[Get current subscription list]
C --> D[Filter out recently closed addresses]
D --> E[Schedule subscriptions via PubSub]
E --> F{Subscriptions to close?}
F -- Yes --> G[Close obsolete connections]
F -- No --> H[Spawn tasks to connect to new publishers]
H --> I[Monitor task completions]
I --> J{All tasks done?}
J -- No --> I
J -- Yes --> K[Calculate sleep duration]
K --> L[Wait for events]
L -->|Shutdown| B
L -->|Subscription change| C
L -->|Timeout| C
L -->|Connection closed| D
Description:
The loop begins by checking if a shutdown has been requested.
It fetches the current subscription address list and filters out recently closed connections.
The file schedules subscriptions and unsubscriptions accordingly.
Connections to new publishers are established via spawned asynchronous tasks.
Task results are monitored for success or failure.
The system waits for relevant events such as subscription changes, connection closures, or timeouts before recalculating the subscription state.
This file implements critical runtime logic for dynamic subscription management in a networked pub-sub environment, coordinating connection tasks, reacting to state changes, and maintaining system health through metrics and retry policies.
For related details on network transport abstraction and pub-sub connection protocols, see NetTransport and PubSub. For configuration structures, refer to NetworkConfig. Metrics reporting is detailed under NetMetrics.