server.rs
Overview
This file implements the core server-side functionality responsible for listening for and handling incoming network connections. It defines asynchronous tasks that manage the lifecycle of network listeners, accept incoming connection requests, enforce maximum connection limits, and handle accepted connections by integrating them into the system's publish-subscribe mechanism.
The code leverages an abstract transport layer (NetTransport) to create network listeners and connections, supporting multiple protocols identified by ALPN (Application-Layer Protocol Negotiation) strings. It also integrates with metrics reporting and connection lifecycle management, including signaling connection closures.
Key responsibilities include:
Establishing and maintaining a network listener based on dynamic configuration.
Accepting incoming connection requests asynchronously.
Creating connection handlers with appropriate roles and remote host identification.
Enforcing maximum concurrent connection limits.
Functions
listen_incoming_connections
pub async fn listen_incoming_connections<Transport>(
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
mut network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
pub_sub: PubSub<Transport>,
metrics: Option<NetMetrics>,
max_connections: usize,
incoming_tx: IncomingSender,
outgoing_messages: broadcast::Sender<OutgoingMessage>,
connection_closed_tx: mpsc::Sender<Arc<ConnectionInfo>>,
) -> anyhow::Result<()>
where
Transport: NetTransport + 'static,
Purpose
This asynchronous function continuously listens for incoming connections on a network interface defined by NetworkConfig. It dynamically adapts to configuration changes and shutdown signals, and forwards accepted connections for handling.
Parameters
shutdown_rx: A watch channel receiver signaling server shutdown. When it signalstrueor is closed, the listener terminates.network_config_rx: A watch channel receiver providing the current network configuration, including the bind address and credentials. On changes, the listener restarts with updated settings.pub_sub: The publish-subscribe system instance for managing connections.metrics: Optional metrics collector for reporting errors or relevant network events.max_connections: Maximum allowed concurrent open connections.incoming_tx: Sender channel to forward incoming messages to the executor.outgoing_messages: Broadcast channel for outgoing messages to connections.connection_closed_tx: Sender channel to notify when a connection closes.
Return Value
Returns
anyhow::Result<()>indicating success or failure of the listening task.
Behavior
Reads initial bind address and credentials from
network_config_rx.Creates a listener supporting three protocols:
ACKI_NACKI_SUBSCRIPTION_FROM_PROXY_PROTOCOLACKI_NACKI_SUBSCRIPTION_FROM_NODE_PROTOCOLACKI_NACKI_DIRECT_PROTOCOL
Enters an inner loop accepting incoming requests or reacting to:
Configuration changes: rebinds listener if bind or credentials changed.
Shutdown signals: terminates the listener loop.
For each accepted request:
Checks if the number of open connections is below
max_connections.Spawns a new task
handle_incoming_connectionto process the request.If max connections reached, logs an error and reports metrics.
Usage Example
let shutdown_rx = ...; // watch::Receiver<bool>
let network_config_rx = ...; // watch::Receiver<NetworkConfig>
let pub_sub = ...; // PubSub instance
let metrics = Some(...); // Optional metrics collector
let max_connections = 100;
let incoming_tx = ...;
let outgoing_messages = ...;
let connection_closed_tx = ...;
listen_incoming_connections(
shutdown_rx,
network_config_rx,
pub_sub,
metrics,
max_connections,
incoming_tx,
outgoing_messages,
connection_closed_tx,
).await?;
handle_incoming_connection
pub async fn handle_incoming_connection<Transport: NetTransport>(
shutdown_rx: tokio::sync::watch::Receiver<bool>,
pub_sub: PubSub<Transport>,
metrics: Option<NetMetrics>,
incoming_tx: IncomingSender,
outgoing_messages: broadcast::Sender<OutgoingMessage>,
connection_closed_tx: mpsc::Sender<Arc<ConnectionInfo>>,
incoming_request: impl NetIncomingRequest<Connection = Transport::Connection>,
)
Purpose
Handles a single incoming connection request by accepting the connection, determining its role and protocol, then registering it with the publish-subscribe system.
Parameters
shutdown_rx: Shutdown signal watcher.pub_sub: Publish-subscribe instance.metrics: Optional metrics collector.incoming_tx: Channel to forward incoming messages.outgoing_messages: Broadcast channel for outgoing messages.connection_closed_tx: Channel to notify connection closure.incoming_request: The incoming connection request to be accepted.
Behavior
Attempts to accept the incoming connection asynchronously.
Logs and reports failure metrics if acceptance fails.
Determines the connection's ALPN protocol to classify its role:
If protocol is
ACKI_NACKI_SUBSCRIPTION_FROM_PROXY_PROTOCOL, role isPublisherwithremote_is_proxy= true.If protocol is
ACKI_NACKI_SUBSCRIPTION_FROM_NODE_PROTOCOL, role isPublisherwithremote_is_proxy= false.Otherwise, role is
DirectReceiver.
Extracts remote host ID from the connection.
Invokes
pub_sub.add_connection_handlerto integrate the connection into the system.Logs errors and reports metrics if adding the connection fails.
Usage Example
handle_incoming_connection(
shutdown_rx.clone(),
pub_sub.clone(),
metrics.clone(),
incoming_tx.clone(),
outgoing_messages.clone(),
connection_closed_tx.clone(),
incoming_request,
).await;
Important Implementation Details
Dynamic Configuration Handling: The listener reacts to changes in network configuration (
NetworkConfig) via a watch channel. When binding address or credentials change, it cleanly disconnects untrusted connections and restarts the listener with new settings.Protocol Negotiation: The system uses ALPN (Application-Layer Protocol Negotiation) to identify connection roles and protocols. This enables differentiated handling for proxy subscriptions, node subscriptions, and direct connections.
Concurrency Limits: The listener enforces a maximum number of open connections (
max_connections). If this limit is reached, new connection requests are rejected, and an error is reported.Asynchronous Task Spawning: Each accepted connection is handled in a separate Tokio task to allow concurrent processing without blocking the listener.
Metrics Integration: Metrics are reported on errors such as failure to accept connections, exceeding max connections, and errors adding connections.
Interaction with Other Components
Transport Layer (
NetTransport,NetListener,NetIncomingRequest): The file depends on the transport layer abstraction for creating network listeners and accepting connections, enabling pluggable transport protocols.Publish-Subscribe System (
PubSub): Connections are registered and managed via thePubSubcomponent, which handles message delivery and connection lifecycle.Metrics (
NetMetrics): Optional component used to collect and report network-related metrics and errors.Connection Management (
ConnectionInfo,ConnectionRole,OutgoingMessage): Used to identify, classify, and manage outgoing messages and connection states.Channels (
tokio::sync::watch,broadcast,mpsc): Utilized for signaling shutdown, configuration changes, message passing, and connection closure notifications.
Diagram: Structure and Workflow of server.rs
flowchart TD
A[listen_incoming_connections] --> B[Create Listener with Config]
B --> C[Wait for Incoming Requests / Config Changes / Shutdown]
C -->|Incoming Request| D{Open Connections < Max?}
D -->|Yes| E[Spawn handle_incoming_connection]
D -->|No| F[Report Max Connections Error]
C -->|Config Changed| G[Restart Listener]
C -->|Shutdown| H[Exit]
E --> I[Accept Incoming Request]
I --> J{Accept Success?}
J -->|Yes| K[Determine Connection Role]
K --> L[Add Connection Handler to PubSub]
J -->|No| M[Report Accept Failure]
style A fill:#f9f,stroke:#333,stroke-width:2px
style E fill:#bbf,stroke:#333,stroke-width:2px
style I fill:#bbf,stroke:#333,stroke-width:2px
This diagram illustrates the main workflow where listen_incoming_connections creates a listener and manages incoming requests, configuration updates, and shutdown signals. Accepted requests lead to spawning handle_incoming_connection, which processes each connection individually.
References
NetTransportand related traitsPubSubmoduleNetworkConfigstructureNetMetricsmetrics collectionConnectionInfoandConnectionRole