channel.rs
Overview
This file implements an in-memory, message-passing transport layer abstraction called ChannelTransport. It simulates network communication between nodes identified by SocketAddr (network socket addresses) using asynchronous channels instead of actual network sockets. The transport facilitates sending and receiving serialized ChitchatMessage instances, tracking statistics, and managing simulated network links (including their removal and restoration). The transport supports optional MTU (Maximum Transmission Unit) size enforcement and is designed to work asynchronously with Tokio's async runtime.
The main components are:
ChannelTransport: The transport manager that maintains send channels and link state.InProcessSocket: Represents an endpoint socket bound to a local address, allowing send/receive of messages.Statistics: Tracks message and byte counts for the transport.
This file interacts closely with:
The
ChitchatMessageserialization/deserialization logic (crate::serialize).The
TransportandSockettraits fromcrate::transport.Tokio's asynchronous channels and synchronization primitives.
The overall system's messaging and network simulation layer.
Entities and Their Details
Statistics
A simple struct to track total bytes and message counts sent via the channel transport.
Fields:
num_bytes_total: u64– Total number of bytes sent.num_messages_total: u64– Total number of messages sent.
Methods:
record_message_len(message_num_bytes: usize): Updates statistics by adding the size of a newly sent message.
Usage:
let mut stats = Statistics::default(); stats.record_message_len(128); println!("Total bytes sent: {}", stats.num_bytes_total);
ChannelTransportInner
An internal state container used within a mutex to manage:
send_channels: Map fromSocketAddrto TokioSenderchannels for outgoing messages.statistics: AStatisticsinstance for message tracking.removed_links: Tracks pairs of addresses with disabled links (simulated network partitions).
ChannelTransport
The main transport struct providing asynchronous message passing between SocketAddrs. It holds an Arc<Mutex<ChannelTransportInner>> for shared mutable state and an optional mtu_opt to enforce message size limits.
Fields:
inner: Arc<Mutex<ChannelTransportInner>>mtu_opt: Option<usize>– Optional MTU size to limit message payloads.
Constructors:
with_mtu(mtu: usize) -> Self: Creates a new instance enforcing the given MTU.
Methods:
max_datagram_payload_size(&self) -> usize
Returns the configured MTU or falls back toMAX_UDP_DATAGRAM_PAYLOAD_SIZE.open(&self, listen_addr: SocketAddr) -> Result<Box<dyn Socket>>
Opens a new logical socket bound tolisten_addr. Internally creates an mpsc channel for incoming messages and registers it insend_channels. Returns anInProcessSocketinstance.Errors: Returns error if the address is already taken.
statistics(&self) -> Statistics
Returns the current message and byte statistics snapshot.add_link(&self, from_addr: SocketAddr, to_addr: SocketAddr)
Removes any simulated network partition between the two addresses, allowing messages to flow again.remove_link(&self, from_addr: SocketAddr, to_addr: SocketAddr)
Simulates a network partition by recording the link removal for both directions.send(&self, from_addr: SocketAddr, to_addr: SocketAddr, message: ChitchatMessage) -> Result<()>
Sends a message fromfrom_addrtoto_addrif the link is active. Serializes and deserializes the message to mimic real network serialization overhead. Checks MTU constraints and updates statistics.close(&self, addr: SocketAddr)
Removes the send channel associated withaddr, effectively closing that socket.
Usage Example:
let transport = ChannelTransport::with_mtu(1024); let socket = transport.open("127.0.0.1:8080".parse().unwrap()).await?; socket.send("127.0.0.1:8081".parse().unwrap(), message).await?;
InProcessSocket
Represents a socket bound to a local address within ChannelTransport.
Fields:
listen_addr: SocketAddr– The bound address.broker: ChannelTransport– Reference to the transport for sending messages.message_rx: Receiver<(SocketAddr, ChitchatMessage)>– Receiver channel for incoming messages.
Trait Implementation:
Socketasync fn send(&mut self, to_addr: SocketAddr, message: ChitchatMessage) -> Result<()>
Sends a message viaChannelTransportfrom this socket's address.async fn recv(&mut self) -> Result<(SocketAddr, ChitchatMessage)>
Waits asynchronously for a message to arrive on this socket's receiver channel.
Destructor:
On drop, callscloseon the transport to unregister this socket.Usage Example:
let mut socket = transport.open(addr).await?; socket.send(other_addr, message).await?; let (from_addr, msg) = socket.recv().await?;
Important Implementation Details
Message Serialization Roundtrip:
Before sending,ChitchatMessageis serialized and deserialized to simulate real network serialization/deserialization. This is done by:fn serialize_deserialize_chitchat_message(message: ChitchatMessage) -> ChitchatMessage { let buf = message.serialize_to_vec(); assert_eq!(buf.len(), message.serialized_len()); let mut read_cursor: &[u8] = &buf[..]; let message_ser_deser = ChitchatMessage::deserialize(&mut read_cursor).unwrap(); assert_eq!(message, message_ser_deser); assert!(read_cursor.is_empty()); message }MTU Enforcement:
The transport can be configured with an MTU (Maximum Transmission Unit). Messages exceeding the MTU cause the send operation to fail with an error.Simulated Network Partitions:
Theremoved_linksmap tracks pairs of addresses where communication is disabled. When sending, if the link is removed, the message is silently dropped.Channel Saturation Handling:
Sending to a full mpsc channel usestry_sendand silently drops messages if the channel is saturated, avoiding backpressure.Concurrency and Synchronization:
Shared state is protected byparking_lot::Mutexinside anArcto allow concurrent access to the transport from multiple tasks.
Interaction with Other System Components
Implements the
Transporttrait, used by higher-level network abstractions or protocols requiring asynchronous message transport.Implements the
Sockettrait for individual socket endpoints.Uses
ChitchatMessagefrom the message serialization module for message content.Uses Tokio's asynchronous channels (
mpsc) for message queueing.Integrates tracing via
tracing::debugfor observability.Enforces message size limits based on
MAX_UDP_DATAGRAM_PAYLOAD_SIZEconstant from the system.
Visual Diagram
classDiagram
class ChannelTransport {
+inner: Arc<Mutex<ChannelTransportInner>>
+mtu_opt: Option<usize>
+with_mtu()
+max_datagram_payload_size()
+open()
+statistics()
+add_link()
+remove_link()
+send()
+close()
}
class ChannelTransportInner {
+send_channels: HashMap<SocketAddr, Sender<(SocketAddr, ChitchatMessage)>>
+statistics: Statistics
+removed_links: HashMap<SocketAddr, HashSet<SocketAddr>>
}
class Statistics {
+num_bytes_total: u64
+num_messages_total: u64
+record_message_len()
}
class InProcessSocket {
+listen_addr: SocketAddr
+broker: ChannelTransport
+message_rx: Receiver<(SocketAddr, ChitchatMessage)>
+send()
+recv()
}
ChannelTransport "1" *-- "1" ChannelTransportInner : contains
ChannelTransportInner --> "many" Sender : holds
ChannelTransportInner --> Statistics : tracks
ChannelTransport o-- InProcessSocket : creates
InProcessSocket --> ChannelTransport : uses
This diagram illustrates the relationship between the core structs: ChannelTransport manages an inner state ChannelTransportInner, which tracks sending channels and statistics. It creates InProcessSocket instances that depend on the transport for sending and receiving messages.
Summary of Key Function Workflows
Opening a socket (ChannelTransport::open):
Creates an mpsc channel, registers it insend_channels, and returns anInProcessSocketbound to the address.Sending a message (ChannelTransport::send):
Serializes and deserializes the message, checks MTU, updates statistics, checks if the link is active, and tries to send via the appropriate channel.Receiving a message (InProcessSocket::recv):
Awaits on the mpsc receiver for incoming messages, providing the sender address and the message.Closing a socket (InProcessSocket::drop):
Automatically unregisters the socket's send channel from the transport.
References to Related Topics
For message serialization and deserialization details, see serialize::Serializable and serialize::Deserializable.
The
TransportandSockettraits implemented here are detailed intransport::Transportandtransport::Socket.Tokio asynchronous programming and channel usage are covered under
tokio::sync::mpsc.For network addressing and socket concepts, see standard
std::net::SocketAddr.