utils.rs
Overview
This file provides utility wrappers around the core Transport and Socket traits to simulate network conditions such as message delay and message drop (loss). These wrappers enable the testing and simulation of unreliable network behaviors by introducing probabilistic delays or drops to message sending operations. The utilities are implemented as decorator-like structures that wrap existing transports and sockets, enhancing their behavior without modifying underlying implementations.
The key abstractions are:
TransportWithDelay: A transport that delays sending messages by a random time sampled from a given distribution.TransportWithMessageDrop: A transport that probabilistically drops outgoing messages.Extensions to the
Transporttrait to enable easy application of these wrappers through theTransportExttrait.
By applying these wrappers, developers can test how their distributed protocols handle network latency and packet loss scenarios.
Detailed Explanation of Components
Traits and Type Aliases
DelayMillisDist
pub trait DelayMillisDist: Distribution<f32> + Send + Sync + Clone + 'static {}
This is a marker trait that constrains the distribution types used for delay sampling.
It requires the distribution to produce
f32samples, be thread-safe (Send + Sync), cloneable, and'static.This trait is implemented implicitly for all types satisfying these bounds.
Used to enforce type safety on delay distributions passed to delay wrappers.
Structs and Implementations
TransportWithDelay<D>
struct TransportWithDelay<D: Distribution<f32> + Send + Sync + 'static> {
delay_secs: D,
transport: Box<dyn Transport>,
}
Wraps an inner
Transportand adds artificial delay to outgoing messages.delay_secs: A generic distribution used to sample the delay duration in seconds.transport: The underlying transport being wrapped.
Implementation of Transport for TransportWithDelay<D>
max_datagram_payload_size: Delegates to the inner transport.open(listen_addr): Opens a socket from the inner transport and wraps it in aSocketWithDelaywhich applies the delay on sends.
Usage Example
let base_transport: Box<dyn Transport> = ...; // Some transport
let delay_distribution = rand::distributions::Uniform::new(0.0, 0.5);
let delayed_transport = TransportWithDelay {
delay_secs: delay_distribution,
transport: base_transport,
};
SocketWithDelay<D>
struct SocketWithDelay<D: Distribution<f32> + Send + Sync + 'static> {
delay_secs: D,
socket: Arc<RwLock<Box<dyn Socket>>>,
rng: SmallRng,
}
Wraps an inner socket to add artificial delay on outgoing messages.
delay_secs: Distribution for delay sampling.socket: Thread-safe reference to the inner socket.rng: Pseudorandom number generator for sampling delays.
Implementation of Socket for SocketWithDelay<D>
send(to, message): Samples a delay duration, then asynchronously sleeps for that duration before forwarding the message to the inner socket. The send is non-blocking to the caller.recv(): Delegates directly to the inner socket's receive method.
Important Implementation Detail
The send delay is implemented using
tokio::task::spawnto avoid blocking the async runtime, allowing multiple delayed sends to proceed concurrently.
TransportWithMessageDrop
struct TransportWithMessageDrop {
drop_probability: Bernoulli,
transport: Box<dyn Transport>,
}
Wraps a transport to probabilistically drop outgoing messages.
drop_probability: A Bernoulli distribution indicating the probability of dropping a message.transport: The underlying transport.
Implementation of Transport for TransportWithMessageDrop
max_datagram_payload_size: Delegates to inner transport.open(listen_addr): Opens an inner socket and wraps it inSocketWithMessageDrop.
SocketWithMessageDrop
struct SocketWithMessageDrop {
drop_probability: Bernoulli,
socket: Box<dyn Socket>,
rng: SmallRng,
}
Wraps a socket to probabilistically drop outgoing messages.
drop_probability: Bernoulli distribution for drop decision.socket: Inner socket to send messages through.rng: RNG to sample drop decisions.
Implementation of Socket for SocketWithMessageDrop
send(to, message): Samples the drop distribution. If the message is dropped, the send returns successfully without forwarding; otherwise, forwards the message.recv(): Delegates directly to the inner socket.
Trait TransportExt
pub trait TransportExt {
fn drop_message(self, drop_probability: f64) -> Box<dyn Transport>;
fn delay<D: DelayMillisDist>(self, delay_proba: D) -> Box<dyn Transport>;
}
Provides extension methods to create transport wrappers easily.
drop_message(drop_probability): Wraps the transport in aTransportWithMessageDrop.delay(delay_proba): Wraps the transport in aTransportWithDelay.
Implementation for All Transport
Any type implementing
Transportautomatically gains these extension methods.These methods box the original transport and return a new wrapped transport.
Usage Example
let transport: Box<dyn Transport> = ...;
let unreliable_transport = transport
.drop_message(0.1) // 10% drop probability
.delay(rand::distributions::Uniform::new(0.0, 0.2)); // Delay between 0 and 0.2 seconds
Important Implementation Details
The use of asynchronous traits (
async_trait) allows non-blocking operations for socket open, send, and receive.The delay is implemented by spawning a new asynchronous task for each send call, which sleeps before forwarding the message. This design avoids blocking the caller and allows multiple delayed sends to proceed concurrently.
The drop logic is implemented synchronously inside the
sendmethod by sampling the Bernoulli distribution before deciding whether to forward the message.The wrapped sockets use interior mutability (
Arc<RwLock<Box<dyn Socket>>>) to safely share and mutate the underlying socket asynchronously.Random number generation uses
SmallRngseeded from the thread RNG for performance and reproducibility.
Interactions with Other Parts of the System
Relies on the
TransportandSockettraits for network communication abstractions.Uses the
ChitchatMessagetype as the message payload, tying into the messaging protocol layer.Integrates with Tokio's async runtime for asynchronous task spawning and timers.
The wrappers are designed to be composable, allowing layered behaviors such as delay and drop to be combined.
Can be applied to any transport implementation, facilitating testing of fault tolerance and latency handling in the network stack or distributed protocols.
Diagram: Flowchart of Main Functions and Their Relationships
flowchart TD
TransportExt -->|drop_message| TransportWithMessageDrop
TransportExt -->|delay| TransportWithDelay
TransportWithMessageDrop --> Transport
TransportWithDelay --> Transport
TransportWithMessageDrop -->|open| SocketWithMessageDrop
TransportWithDelay -->|open| SocketWithDelay
SocketWithMessageDrop --> Socket
SocketWithDelay --> Socket
SocketWithMessageDrop -->|send| DropDecision
DropDecision -->|drop| SendDropped
DropDecision -->|no drop| ForwardSend
SocketWithDelay -->|send| DelaySample
DelaySample -->|spawn task| DelayWait
DelayWait --> ForwardSend
ForwardSend --> InnerSocketSend
InnerSocketSend --> Socket
TransportExtprovides extension methods to create the wrappers.TransportWithMessageDropandTransportWithDelaywrap transports and provide wrapped sockets onopen.SocketWithMessageDropdecides whether to drop or forward messages.SocketWithDelaysamples delays and schedules sends asynchronously.Both ultimately delegate to the inner socket's send method.