message.rs
Overview
This file defines the NetMessage struct and associated methods for encapsulating, serializing, compressing, transmitting, decompressing, and deserializing network messages. The primary purpose is to provide a robust and efficient mechanism for packaging arbitrary data types into transferable messages with metadata, optimized for network communication scenarios where message size and serialization/deserialization performance are critical.
The file implements features such as:
Compression of large serialized messages using
zstd.Tracking of timing metrics for serialization, compression, deserialization, and decompression.
Unique message identification via timestamps.
Delivery duration measurement to monitor network latency or processing delays.
The NetMessage type uses Arc<Vec<u8>> to allow shared ownership of the message payload bytes, facilitating efficient cloning and concurrency.
NetMessage Struct
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetMessage {
pub delivery_start_timestamp_ms: u64,
pub id: String,
pub label: String,
pub compressed: bool,
pub data: Arc<Vec<u8>>,
pub last_sender_is_proxy: bool,
#[serde(skip)]
pub received_at: u64,
}
Fields:
delivery_start_timestamp_ms: u64
Timestamp in milliseconds since UNIX epoch when the message delivery started. Used for measuring latency.id: String
Unique identifier for the message, based on nanosecond precision system time.label: String
A debug string label describing the message type or contents, generated from theDebugimplementation of the original message.compressed: bool
Indicates whether thedatapayload is compressed.data: Arc<Vec<u8>>
The serialized byte payload of the message, optionally compressed. Wrapped inArcfor shared ownership.last_sender_is_proxy: bool
Flag indicating if the last sender was a proxy; usage depends on application logic.received_at: u64
Timestamp indicating when the message was received. Not serialized/deserialized (#[serde(skip)]).
Constants
MAX_UNCOMPRESSED_SIZE: usize = 1000
Threshold size in bytes above which message payloads will be compressed.
Methods
transfer_size(msg: &NetMessage) -> u64
Calculates the approximate serialized size in bytes of the given NetMessage instance.
Parameters:
msg: Reference to aNetMessage.
Returns:
u64representing the serialized size in bytes.
Details:
Usesbincode::serialized_sizeto get the size. If serialization fails, a fallback estimation is computed based on lengths of components.Usage Example:
let size = NetMessage::transfer_size(&net_message); println!("Message size: {}", size);
delivery_duration_ms(&self) -> Result<u64, String>
Computes the elapsed time in milliseconds since the message delivery was started.
Returns:
Ok(u64)with elapsed milliseconds if the current time is synchronized with the delivery start time.Err(String)if the system clock appears out of sync beyond an acceptable 5 ms difference.
Usage Example:
match net_message.delivery_duration_ms() { Ok(duration) => println!("Delivery took {} ms", duration), Err(e) => eprintln!("Error: {}", e), }
encode<Message: Debug + Serialize>(message: &Message) -> anyhow::Result<(Self, usize)>
Encodes and optionally compresses a given message into a NetMessage.
Parameters:
message: Reference to a message implementingDebugandSerialize.
Returns:
Ok((NetMessage, usize))tuple containing the encoded message and the uncompressed size.Errif serialization or compression fails.
Implementation Details:
Serializes the input message using
bincode.Compresses the serialized bytes with
zstdif size exceedsMAX_UNCOMPRESSED_SIZE.Generates a unique ID based on system time at nanosecond precision.
Logs warnings if serialization or compression takes longer than thresholds (50 ms total or 10 ms compression).
Wraps the resulting bytes in an
Arc<Vec<u8>>.
Usage Example:
let my_message = MyStruct { ... }; let (net_msg, original_size) = NetMessage::encode(&my_message)?;
decode<Message: DeserializeOwned>(&self) -> anyhow::Result<(Message, usize, usize)>
Decodes and decompresses a NetMessage back into the original message type.
Returns:
Ok((Message, usize, usize))tuple containing:The decoded message instance.
Decompression time in milliseconds.
Deserialization time in milliseconds.
Errif decompression or deserialization fails.
Implementation Details:
If the message is compressed, decompresses data using
zstd.Deserializes the byte payload using
bincode.Logs warnings if decompression + deserialization time exceeds 100 ms.
Usage Example:
let (original_message, decompress_time, deserialize_time) = net_msg.decode::<MyStruct>()?;
Important Implementation Details
Compression Thresholding: Compression is only applied to serialized data exceeding
MAX_UNCOMPRESSED_SIZEbytes to balance CPU overhead and bandwidth savings.Time Measurement: Uses
Instantfor measuring serialization, compression, deserialization, and decompression duration with millisecond granularity to monitor performance.Unique Message ID: Generated using nanosecond precision from system time to ensure uniqueness in distributed systems.
Error Handling: Utilizes
anyhowcrate for error propagation with detailed messages including the message label for traceability.Thread Safety: Uses
Arc<Vec<u8>>for sharing the message payload safely across threads or async tasks without unnecessary copies.Metrics Logging: Conditional
tracing::warn!calls provide runtime warnings on excessive processing time, indicating potential performance bottlenecks.
Interaction with Other System Components
Expects any message type to implement
SerializeandDeserializeOwnedto participate in encoding and decoding.Utilizes
bincodefor efficient binary serialization andzstdfor compression, integrating with third-party crates for these functionalities.Uses
telemetry_utils::now_msto obtain current time in milliseconds, indicating interaction with telemetry or timing utilities.The resulting
NetMessageinstances can be sent over network channels, message queues, or other IPC mechanisms.The
last_sender_is_proxyflag suggests integration with proxy or relay components in the messaging system.
Mermaid Class Diagram
classDiagram
class NetMessage {
+u64 delivery_start_timestamp_ms
+String id
+String label
+bool compressed
+Arc<Vec<u8>> data
+bool last_sender_is_proxy
+u64 received_at
+transfer_size(msg: &NetMessage) uint64
+delivery_duration_ms() Result<uint64, String>
+encode<Message: Debug + Serialize>(message: &Message) Result<(NetMessage, usize), anyhow::Error>
+decode<Message: DeserializeOwned>() Result<(Message, usize, usize), anyhow::Error>
}
This diagram illustrates the NetMessage struct with its fields and primary methods, emphasizing the structure and functionality encapsulated within this file.