server.rs

Overview

This file implements the core server-side functionality responsible for listening for and handling incoming network connections. It defines asynchronous tasks that manage the lifecycle of network listeners, accept incoming connection requests, enforce maximum connection limits, and handle accepted connections by integrating them into the system's publish-subscribe mechanism.

The code leverages an abstract transport layer (NetTransport) to create network listeners and connections, supporting multiple protocols identified by ALPN (Application-Layer Protocol Negotiation) strings. It also integrates with metrics reporting and connection lifecycle management, including signaling connection closures.

Key responsibilities include:

Functions

listen_incoming_connections

pub async fn listen_incoming_connections<Transport>(
    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
    mut network_config_rx: tokio::sync::watch::Receiver<NetworkConfig>,
    pub_sub: PubSub<Transport>,
    metrics: Option<NetMetrics>,
    max_connections: usize,
    incoming_tx: IncomingSender,
    outgoing_messages: broadcast::Sender<OutgoingMessage>,
    connection_closed_tx: mpsc::Sender<Arc<ConnectionInfo>>,
) -> anyhow::Result<()>
where
    Transport: NetTransport + 'static,

Purpose

This asynchronous function continuously listens for incoming connections on a network interface defined by NetworkConfig. It dynamically adapts to configuration changes and shutdown signals, and forwards accepted connections for handling.

Parameters

Return Value

Behavior

Usage Example

let shutdown_rx = ...; // watch::Receiver<bool>
let network_config_rx = ...; // watch::Receiver<NetworkConfig>
let pub_sub = ...; // PubSub instance
let metrics = Some(...); // Optional metrics collector
let max_connections = 100;
let incoming_tx = ...;
let outgoing_messages = ...;
let connection_closed_tx = ...;

listen_incoming_connections(
    shutdown_rx,
    network_config_rx,
    pub_sub,
    metrics,
    max_connections,
    incoming_tx,
    outgoing_messages,
    connection_closed_tx,
).await?;

handle_incoming_connection

pub async fn handle_incoming_connection<Transport: NetTransport>(
    shutdown_rx: tokio::sync::watch::Receiver<bool>,
    pub_sub: PubSub<Transport>,
    metrics: Option<NetMetrics>,
    incoming_tx: IncomingSender,
    outgoing_messages: broadcast::Sender<OutgoingMessage>,
    connection_closed_tx: mpsc::Sender<Arc<ConnectionInfo>>,
    incoming_request: impl NetIncomingRequest<Connection = Transport::Connection>,
)

Purpose

Handles a single incoming connection request by accepting the connection, determining its role and protocol, then registering it with the publish-subscribe system.

Parameters

Behavior

Usage Example

handle_incoming_connection(
    shutdown_rx.clone(),
    pub_sub.clone(),
    metrics.clone(),
    incoming_tx.clone(),
    outgoing_messages.clone(),
    connection_closed_tx.clone(),
    incoming_request,
).await;

Important Implementation Details

Interaction with Other Components

Diagram: Structure and Workflow of server.rs

flowchart TD
A[listen_incoming_connections] --> B[Create Listener with Config]
B --> C[Wait for Incoming Requests / Config Changes / Shutdown]
C -->|Incoming Request| D{Open Connections < Max?}
D -->|Yes| E[Spawn handle_incoming_connection]
D -->|No| F[Report Max Connections Error]
C -->|Config Changed| G[Restart Listener]
C -->|Shutdown| H[Exit]
E --> I[Accept Incoming Request]
I --> J{Accept Success?}
J -->|Yes| K[Determine Connection Role]
K --> L[Add Connection Handler to PubSub]
J -->|No| M[Report Accept Failure]
style A fill:#f9f,stroke:#333,stroke-width:2px
style E fill:#bbf,stroke:#333,stroke-width:2px
style I fill:#bbf,stroke:#333,stroke-width:2px

This diagram illustrates the main workflow where listen_incoming_connections creates a listener and manages incoming requests, configuration updates, and shutdown signals. Accepted requests lead to spawning handle_incoming_connection, which processes each connection individually.


References