mod.rs
Overview
This file defines core functionality for a network communication proxy and node system focused on gossip protocol-based peer-to-peer messaging. It provides abstractions to start and manage nodes and proxies that communicate over a network transport layer, handle configuration updates, multiplex incoming and outgoing messages, and maintain node state and statistics. The file integrates with components responsible for gossip-based membership and messaging, network transport, and Pub/Sub mechanisms.
Key responsibilities include:
Managing node and proxy lifecycle and configuration.
Providing message multiplexing between incoming and outgoing channels.
Integrating gossip membership protocols.
Tracking network message statistics and node state.
Establishing network addresses for gossip and node communication.
Functions
make_addr(group: usize, index: usize) -> SocketAddr
Generates a localhost socket address based on a group and index. The port number is computed as 11000 + group * 10 + index.
Usage:
let addr = make_addr(1, 0); // returns SocketAddr for 127.0.0.1:11010
node_addr(group: usize) -> SocketAddr
Returns the socket address for a node in the given group. Calls make_addr with index 0.
gossip_addr(group: usize) -> SocketAddr
Returns the socket address for gossip communication in the given group. Calls make_addr with index 1.
init_logs()
Initializes logging for the module using tracing_subscriber. Logs are written to a file named test.log located relative to the Cargo project directory. This function ensures that logging is initialized only once using a OnceCell.
Structs and Enums
NoChannelMetrics
A no-op implementation of the InstrumentedChannelMetrics trait. It ignores channel metrics reporting.
Proxy<Transport>
Represents a proxy node that manages gossip communication and message multiplexing over a transport.
Fields:
shutdown_tx: Watch channel sender to signal shutdown.config_tx: Watch channel sender for node configuration updates.watch_gossip_config_tx: Watch channel sender for gossip watch configuration.transport: The network transport instance.chitchat: Reference to the gossip membership state.chitchat_handle: Handle for managing gossip protocol lifecycle.gossip_rest_handle: Async task handle for gossip REST API.
Methods:
async fn start(config: NodeConfig, transport: Transport) -> anyhow::Result<Self>Starts the proxy with the provided node configuration and transport. Spawns multiple asynchronous tasks to handle:
Configuration splitting between network and gossip.
Running the gossip protocol.
Multiplexing messages between incoming and outgoing channels.
Running the Pub/Sub message system.
Parameters:
config: Initial node configuration.transport: Network transport instance implementingNetTransport.
Returns: A
Proxyinstance wrapped in aResult.async fn message_multiplexor(incoming_messages: UnboundedReceiver<IncomingMessage>, outgoing_messages: broadcast::Sender<OutgoingMessage>) -> anyhow::Result<()>Continuously listens for incoming messages and forwards them as broadcast messages excluding the original sender connection. Logs forwarding events.
fn print_stat(&self)Prints the count of live nodes currently tracked by the gossip membership.
Drop Implementation
Sends a shutdown signal when the proxy is dropped.
NodeConfig
Configuration for a node including network and gossip settings.
Fields:
node_id: Unique identifier of the node.network: Network configuration (NetworkConfig).advertise_addr: Socket address the node advertises.gossip: Gossip protocol configuration (GossipConfig).
Methods:
fn new(node_addr: SocketAddr, gossip_addr: SocketAddr, node_id: String, gossip_seeds: Vec<SocketAddr>) -> SelfCreates a new
NodeConfigwith default security files and provided addresses and gossip seeds.
Node<Transport>
Represents a running node participating in the network and gossip protocol.
Fields:
shutdown_tx: Watch channel sender to signal shutdown.config_tx: Watch channel sender for node configuration updates.watch_gossip_config_tx: Watch channel sender for gossip watch configuration.transport: The network transport instance.chitchat: Reference to gossip membership state.state: Shared state of the node (NodeState).chitchat_handle: Handle managing gossip protocol lifecycle.gossip_rest_handle: Async task handle for gossip REST API.
Methods:
async fn start(config: NodeConfig, transport: Transport) -> anyhow::Result<(Self, NodeChannels)>Starts the node with the given configuration and transport. Internally:
Sets up watch channels for configuration and shutdown signals.
Launches the gossip protocol.
Initializes the network layer (
BasicNetwork).Creates channels for direct and broadcast messaging as well as incoming messages.
Maintains node state statistics.
Returns: The node instance and a
NodeChannelsstruct encapsulating communication channels.fn print_stat(&self, node_count: usize)Prints detailed statistics about unconfirmed messages, data sent/received, acknowledgments sent/received, and live nodes count.
Drop Implementation
Sends a shutdown signal when the node is dropped.
NodeState
Tracks statistics and state related to message transmission and acknowledgements.
Fields:
data_sent: Number of data messages sent.data_received: Number of data messages received.ack_sent: Number of acknowledgements sent.ack_received: Number of acknowledgements received.unconfirmed: Mutex-protected map tracking unconfirmed messages keyed by message ID.
Default Implementation
Initializes counters to zero and the unconfirmed map to empty.
Message
An enum representing messages exchanged between nodes.
Variants:
Data(String, u64, Vec<u8>): Data message with sender ID, sequence ID, and payload bytes.Ack(u64): Acknowledgement for a message ID.
Trait Implementations:
SerializeandDeserializefor network serialization.Debugfor formatted debug output.
NodeChannels
Holds communication channels for a node:
direct_tx: Sender for direct unicast messages.broadcast_tx: Sender for broadcast messages.incoming_rx: Receiver for incoming messages with instrumentation.peers_rx: Watch receiver for peer data updates.
Async Functions
async fn split_config(shutdown_rx: watch::Receiver<bool>, config_rx: watch::Receiver<NodeConfig>, network_config_tx: watch::Sender<NetworkConfig>, gossip_config_tx: watch::Sender<GossipConfig>)
Listens for changes on the node configuration watch channel and splits updates into separate network and gossip configuration watch channels. Terminates on shutdown signal.
Implementation Details and Algorithms
The proxy and node utilize
tokio::sync::watchandmpscchannels for asynchronous communication and configuration updates.Gossip membership and messaging leverage
chitchatandgossipcrates, integrating gossip configuration dynamically.Message multiplexing in the proxy forwards incoming messages to broadcast channels excluding the sender connection to avoid echo.
Node state tracks unconfirmed message counts and message statistics using atomic counters and mutexes for thread safety.
Logging is initialized once and directed to a file for test scenarios.
Network addresses are systematically generated to avoid conflicts, facilitating testing with groups of nodes.
Interactions with Other System Components
Integrates with
chitchatfor gossip membership management and network discovery.Uses
gossipcrate for gossip protocol execution and REST API exposure.Relies on
transport_layer::NetTransporttrait for abstracted transport mechanisms.Employs
crate::pub_submodules for message publication and subscription mechanics.Coordinates configuration updates with
NetworkConfigandGossipConfigstructures.Instrumentation and metrics use types from
telemetry_utilsfor channel monitoring.
Mermaid Diagram
classDiagram
class Proxy {
+shutdown_tx
+config_tx
+watch_gossip_config_tx
+transport
+chitchat
+chitchat_handle
+gossip_rest_handle
+start()
+message_multiplexor()
+print_stat()
}
class Node {
+shutdown_tx
+config_tx
+watch_gossip_config_tx
+transport
+chitchat
+state
+chitchat_handle
+gossip_rest_handle
+start()
+print_stat()
}
class NodeState {
+data_sent
+data_received
+ack_sent
+ack_received
+unconfirmed
}
class NodeConfig {
+node_id
+network
+advertise_addr
+gossip
+new()
}
class Message {
<<enum>>
+Data
+Ack
}
class NodeChannels {
+direct_tx
+broadcast_tx
+incoming_rx
+peers_rx
}
Proxy --> Transport
Node --> Transport
Node *-- NodeState
Node o-- NodeChannels
Proxy ..> NodeConfig
Node ..> NodeConfig