transport_test.rs
Overview
This file contains asynchronous integration tests focused on evaluating network transport implementations, specifically the MsQuicTransport variant of the NetTransport trait. It orchestrates multiple nodes and optional proxies across data centers, simulating a gossip-based communication network. The tests verify message broadcasting and direct messaging functionality, as well as the reliability and performance of the transport layer under multi-threaded conditions.
The primary test function, test_msquic_transport, initializes and exercises the MsQuicTransport implementation by invoking the generic test_transport function. test_transport configures nodes and proxies, sets up gossip seeds, and runs multiple iterations to print network statistics, thereby validating the transport's behavior in a realistic multi-node environment.
The file also defines the NodeTest struct, which manages a node instance along with its associated producer and consumer threads. These threads simulate message production (broadcasting) and consumption (handling incoming messages and sending acknowledgments), respectively.
Classes and Structures
NodeTest
A generic struct parameterized by a type implementing NetTransport. It encapsulates a test node and manages two internal threads: one for producing broadcast messages and one for consuming incoming messages.
Fields
node: Node<Transport>
The actual node instance under test, parameterized by the transport layer._producer_task: std::thread::JoinHandle<anyhow::Result<()>>
Handle to the spawned thread responsible for producing and broadcasting messages._consumer_task: std::thread::JoinHandle<anyhow::Result<()>>
Handle to the spawned thread that consumes incoming messages and handles acknowledgments.
Methods
async fn start(config: NodeConfig, transport: Transport, node_count: usize) -> anyhow::Result<Self>
Starts a new NodeTest instance by creating a node with the provided configuration and transport implementation. It also launches the producer and consumer threads.
Parameters:
config: NodeConfig— Configuration for the node including addresses and gossip seeds.transport: Transport— The network transport implementation.node_count: usize— Total number of nodes in the test network.
Returns:
ANodeTestinstance wrapped in aResult.Usage example:
let node_test = NodeTest::start(config, transport.clone(), total_node_count).await?;
fn run_producer(state: Arc<NodeState>, chitchat: ChitchatRef, node_count: usize, node_id: String, broadcast_tx: NetBroadcastSender<Message>) -> anyhow::Result<()>
Continuously produces and broadcasts Message::Data messages after all nodes are live. It generates random data payloads and tracks unconfirmed messages in the node state.
Parameters:
state: Arc<NodeState>— Shared node state for tracking message statuses.chitchat: ChitchatRef— Reference to the gossip protocol handler for monitoring live nodes.node_count: usize— Number of nodes in the network.node_id: String— Identifier of the producing node.broadcast_tx: NetBroadcastSender<Message>— Channel for broadcasting messages.
Returns:
Resultindicating success or failure of the producer thread.Implementation details:
Waits until all nodes are live before starting message production.
Generates 150 KB random byte vectors as message payloads.
Maintains an
unconfirmedmap to track acknowledgments required from other nodes.Sleeps 300 ms between message broadcasts to throttle sending rate.
fn run_consumer(state: Arc<NodeState>, direct_tx: NetDirectSender<String, Message>, incoming_rx: InstrumentedReceiver<IncomingMessage>, _peers_rx: tokio::sync::watch::Receiver<HashMap<String, Vec<PeerData>>>) -> anyhow::Result<()>
Consumes incoming messages, updating internal counters and sending acknowledgments for received data messages.
Parameters:
state: Arc<NodeState>— Shared node state for tracking received and acknowledged messages.direct_tx: NetDirectSender<String, Message>— Channel to send direct messages back to peers.incoming_rx: InstrumentedReceiver<IncomingMessage>— Receiver for incoming network messages._peers_rx: watch::Receiver<HashMap<String, Vec<PeerData>>>— Watch receiver for peer updates (unused in this implementation).
Returns:
Resultindicating success or failure of the consumer thread.Implementation details:
Processes messages in a loop until the channel is closed.
For
Message::Data, incrementsdata_receivedand sends anAck.For
Message::Ack, decrements the count for the corresponding message ID inunconfirmedand removes it when all acks are received.
Functions
#[tokio::test(flavor = "multi_thread", worker_threads = 8)] #[ignore] async fn test_msquic_transport()
An asynchronous test function which invokes test_transport with an instance of MsQuicTransport. Marked to be ignored by default to prevent automatic execution in normal test runs.
Purpose:
Validate theMsQuicTransportimplementation under multi-threaded load.Usage:
Executed viacargo test -- --ignoredor explicitly invoked in test suites.
async fn test_transport(transport: impl NetTransport + 'static)
Sets up a test environment with multiple nodes and proxies distributed across data centers, initializes gossip seeds, and runs a timed loop to print network statistics.
Parameters:
transport: impl NetTransport + 'static— A generic transport layer instance to be tested.
Implementation details:
Defines two data centers (
dc_count = 2), each with two nodes (dc_node_count = 2), and no proxies (dc_proxy_count = 0).Selects certain nodes as gossip seeds to bootstrap the gossip network.
Creates and starts
NodeTestinstances and proxies for each configured entity.Runs a 6-second loop with 1-second intervals to print node and proxy statistics.
Interactions:
This function interacts with:The
NodeTeststruct to spawn nodes with producer and consumer threads.The
Proxystruct to simulate proxy nodes (though none are configured here).Utility functions such as
gossip_addrandnode_addrfor network addressing.Logging initialization via
init_logs.
Important Implementation Details and Algorithms
Producer-Consumer Pattern with Threads:
EachNodeTestruns two dedicated OS threads: one producing messages and broadcasting them, and one consuming incoming messages and sending acknowledgments. This pattern ensures continuous message flow and response handling in parallel.Use of Atomic Counters and Mutexes:
Message state tracking employs atomic counters (data_sent,data_received,ack_sent,ack_received) and a mutex-guarded hashmap (unconfirmed) to synchronize message confirmation states safely across threads.Gossip Protocol Integration:
The start of message production depends on all nodes being recognized as live via thechitchatgossip component, ensuring the network is fully connected before stress testing the transport.Random Payload Generation:
Broadcast messages carry large (150,000 bytes) random data payloads to simulate realistic network traffic and test transport layer robustness.Timeouts and Delays:
Initial delay (sleepfor 3 seconds) before starting production and intervals between messages (300 ms) regulate the test pace for stability.Transport Abstraction:
The test infrastructure is designed to accept any transport implementingNetTransport, allowing flexible testing of different transport mechanisms by swapping the implementation passed totest_transport.
Interaction with Other System Components
Node and Proxy Entities:
UsesNodeandProxystructs (from the same crate) to represent network participants. Each node uses the configured transport to communicate.Channels and Messaging:
EmploysNetBroadcastSenderandNetDirectSenderchannels for message broadcasting and direct messaging respectively.Gossip Module:
References theChitchatReffor live node detection, integrating gossip protocol functionality for network membership awareness.Telemetry and Instrumentation:
TheInstrumentedReceiverwraps incoming message receivers for telemetry, enabling monitoring of message flow during tests.Addressing Utilities:
Uses helper functionsgossip_addrandnode_addrto generate consistent network addresses for nodes and proxies.Logging:
Initializes logging withinit_logsto capture runtime information during test execution.
Visual Diagram
flowchart TD
A[test_msquic_transport] -->|calls| B[test_transport]
B --> C[Setup gossip seeds]
B --> D["Create proxies (optional)"]
B --> E[Create nodes with NodeTest::start]
E --> F["Spawn producer thread (run_producer)"]
E --> G["Spawn consumer thread (run_consumer)"]
F -->|broadcasts| H[NetBroadcastSender<Message>]
G -->|consumes| I[InstrumentedReceiver<IncomingMessage>]
G -->|sends ack| J[NetDirectSender<String, Message>]
F --> K[NodeState.unconfirmed & counters]
G --> K
B --> L[Print stats loop]
The diagram illustrates the test flow starting from the test_msquic_transport function, through the setup and creation of nodes and proxies, spawning of producer and consumer threads within each node, and the interaction between message channels and node states.
Usage Example of NodeTest::start
let config = NodeConfig::new(
node_addr,
gossip_addr,
"node_0_0".to_string(),
gossip_seeds.clone(),
);
let node_test = NodeTest::start(config, transport.clone(), total_node_count).await?;
This snippet creates and starts a new test node with the specified configuration and transport, ready to produce and consume test messages.