connection.rs
Overview
This file provides core abstractions and functionality to manage network connections within a distributed system. It encapsulates connection metadata, roles, message delivery semantics, and supervises asynchronous sending and receiving of network messages over transport-layer connections. It interfaces with cryptographic identity verification and certificate handling, message encoding/decoding, and metrics reporting for network operations. The file primarily bridges the transport layer connections (NetConnection and NetTransport) with the higher-level publish-subscribe messaging system implemented via PubSub.
Enumerations
ConnectionRole
Defines the role of a network connection in the message exchange topology.
Variants:
Subscriber: The connection receives messages from a publisher.Publisher: The connection sends messages to subscribers.DirectReceiver: The connection is a direct sender without broadcast semantics.
Methods:
is_publisher() -> bool: Returnstrueif the role isPublisher.is_subscriber() -> bool: Returnstrueif the role isSubscriber.
Usage example:
let role = ConnectionRole::Publisher;
if role.is_publisher() {
// perform publisher-specific logic
}
Structures
ConnectionInfo
Holds metadata about a network connection, including identity, addressing, role, and cryptographic credentials.
Fields:
id: u64: Unique identifier for the connection.local_is_proxy: bool: Indicates if the local endpoint is a proxy.role: ConnectionRole: Role of the connection (publisher, subscriber, etc.).remote_addr: SocketAddr: Remote socket address.remote_host_id: String: Full remote host identifier.remote_host_id_prefix: String: Prefix of the remote host ID used for optimized logging or comparisons.remote_is_proxy: bool: Indicates if the remote endpoint is a proxy.remote_cert_hash: CertHash: Cryptographic hash of the remote's certificate.remote_cert_pubkeys: Vec<VerifyingKey>: Public keys extracted from the remote's certificate.
Methods:
remote_info(&self) -> String: Returns a formatted string describing the remote address and inferred peer role.is_publisher(&self) -> bool: Checks if connection role isPublisher.is_subscriber(&self) -> bool: Checks if connection role isSubscriber.is_broadcast(&self) -> bool: True if the connection role involves broadcast communication (SubscriberorPublisher).send_mode(&self) -> SendMode: Determines whether sending should use broadcast or direct mode based on role.
Example usage:
let info = connection.info.clone();
if info.is_broadcast() {
let mode = info.send_mode();
// use mode for message sending logic
}
ConnectionWrapper<Connection: NetConnection>
Generic wrapper struct pairing a concrete network connection with its associated immutable connection metadata.
Fields:
info: Arc<ConnectionInfo>: Shared immutable connection metadata.connection: Connection: The actual network connection implementingNetConnection.
Methods:
new(...) -> anyhow::Result<Self>: Constructs a new wrapper instance by extracting certificate information and other metadata from theconnection.allow_sending(&self, outgoing: &OutgoingMessage) -> bool: Determines if an outgoing message should be sent over this connection based on delivery semantics and connection properties.
Parameters for new:
id: Unique connection ID.local_is_proxy: Local proxy flag.remote_addr: Optional remote socket address; ifNone, fetched from connection.remote_host_id: Remote host identifier string.remote_is_proxy: Remote proxy flag.connection: The actual transport connection.role: Role of the connection (ConnectionRole).
Example creation:
let wrapper = ConnectionWrapper::new(
1,
false,
Some(remote_socket),
remote_host_id_string,
false,
connection,
ConnectionRole::Publisher,
)?;
Functions
connection_remote_host_id(connection: &impl NetConnection) -> String
Utility function returning the remote host identifier string from a given network connection.
connection_supervisor<Transport: NetTransport + 'static>
Asynchronous function supervising the lifecycle of a connection. It manages concurrent sending and receiving tasks, coordinates shutdown signals, and reports connection closure.
Parameters:
shutdown_rx: Watch receiver to listen for shutdown signals.pub_sub: Reference to thePubSubsystem managing connections.metrics: Optional network metrics collector.connection: SharedConnectionWrapperinstance.incoming_messages_tx: Optional channel sender to forward incoming messages.outgoing_messages_rx: Optional broadcast receiver for outgoing messages.connection_closed_tx: Channel sender to notify when connection closes.
Behavior:
Spawns separate asynchronous tasks for sending and receiving messages using
sender::senderandreceiver::receiver.Uses
tokio::select!to wait for either sender or receiver task to finish.Reports errors and task completion via the helper
trace_connection_task_result.Removes the connection from the
PubSubregistry upon completion.Sends closure notification on
connection_closed_tx.
This function acts as the main event loop for handling a network connection, bridging transport layer logic with application-layer message handling.
trace_connection_task_result
Helper function to log results of sender or receiver tasks and update metrics if applicable.
Parameters:
result: Result of task execution, potentially containing errors or panics.name: Task name (e.g., "Sender", "Receiver").connection_info: Metadata about the connection.metrics: Optional metrics collector.
Logs informational, error, or critical messages based on task outcome and increments error counters in metrics.
Message Structures
IncomingMessage
Represents a message received from the network, including metadata and timing information.
Fields:
connection_info: Arc<ConnectionInfo>: Source connection metadata.message: NetMessage: The raw network message.duration_after_transfer: Instant: Timestamp indicating when the message completed transfer.
Methods:
finish<Message>(&self, metrics: &Option<NetMetrics>) -> Option<(Message, SocketAddr)>:Decodes and deserializes the network message into a strongly typed application message. Reports metrics on delivery phases including decoding time and message delivery duration. Returns
Some((message, source_address))on success orNoneon failure.
Parameter:
Message: Generic message type implementing [serde::Deserialize] andDebug.
Example usage:
if let Some((msg, src_addr)) = incoming_message.finish::<MyMessageType>(&metrics) {
// process the deserialized message
}
MessageDelivery
Enumeration describing how an outgoing message should be delivered.
Variants:
Broadcast: Send to all subscribers (broadcast).BroadcastExcluding(Arc<ConnectionInfo>): Broadcast excluding a specific connection.Addr(SocketAddr): Send directly to a specific socket address.
OutgoingMessage
Represents a message to be sent over the network, including delivery semantics and timing.
Fields:
delivery: MessageDelivery: Delivery mode for the message.message: NetMessage: The serialized network message.duration_before_transfer: Instant: Timestamp marking when message sending began.
Implementation Details and Algorithms
Certificate and Public Key Handling: When wrapping a connection, the remote's certificate is extracted and parsed to retrieve public keys used for cryptographic verification (
get_pubkeys_from_cert_der). The certificate hash is also computed and stored for identity verification.Connection Role and Send Mode: The role of a connection (
Publisher,Subscriber,DirectReceiver) determines whether messages are sent in broadcast mode or direct mode. This affects routing and filtering logic in message sending.Message Sending Authorization: The
allow_sendingmethod implements filtering logic for outgoing messages:If both sender and receiver are proxies and the message was last sent by a proxy, sending is disallowed to avoid proxy loops.
Broadcast messages are sent only to publishers.
Broadcast with exclusion skips sending to the excluded connection.
Addressed messages are sent only to the matching socket address.
Connection Supervision: Uses
tokioasynchronous primitives to concurrently run sender and receiver tasks, handling orderly shutdown by sending stop signals and notifying the system when a connection closes. Usestokio::select!to await completion of either task.Metrics Integration: Throughout message processing and connection supervision, metrics are collected and reported via
NetMetrics, including error rates, message delivery durations, and task failures.
Interaction with Other Modules
Interfaces with
transport_layerfor low-level connection and certificate management:NetConnection,NetTransport,CertHash, and certificate public key extraction.Uses
pub_submodule for subscription management, sending (sender), and receiving (receiver) message tasks.Utilizes message types
NetMessageand delivery phases (DeliveryPhase) from themessageandDeliveryPhasemodules, respectively.Integrates with
metricsmodule for performance and error tracking.Employs
SendModeto determine message transmission semantics.Uses cryptographic verification keys from
ed25519_dalek.
Mermaid Diagram: Class Structure
classDiagram
class ConnectionRole {
+is_publisher()
+is_subscriber()
}
class ConnectionInfo {
+id: u64
+local_is_proxy: bool
+role: ConnectionRole
+remote_addr: SocketAddr
+remote_host_id: String
+remote_host_id_prefix: String
+remote_is_proxy: bool
+remote_cert_hash: CertHash
+remote_cert_pubkeys: Vec<VerifyingKey>
+remote_info()
+is_publisher()
+is_subscriber()
+is_broadcast()
+send_mode()
}
class ConnectionWrapper~Connection~ {
+info: Arc<ConnectionInfo>
+connection: Connection
+new()
+allow_sending()
}
class IncomingMessage {
+connection_info: Arc<ConnectionInfo>
+message: NetMessage
+duration_after_transfer: Instant
+finish()
}
class OutgoingMessage {
+delivery: MessageDelivery
+message: NetMessage
+duration_before_transfer: Instant
}
class MessageDelivery {
}
ConnectionWrapper --> ConnectionInfo : uses
IncomingMessage --> ConnectionInfo : references
OutgoingMessage --> MessageDelivery : uses
ConnectionInfo --> ConnectionRole : has a