metrics.rs
Overview
This file implements the NetMetrics struct, a comprehensive telemetry metrics collector for network-related activities. It leverages the opentelemetry crate to define and manage a variety of counters, gauges, histograms, and observable gauges to monitor network message flows, buffer states, delivery phases, transfer durations, errors, and warnings.
The primary purpose of this file is to provide detailed insight into network performance and health by recording metrics such as message counts, sizes, delivery durations, buffer occupancy, and error rates. These metrics are critical for diagnostics, monitoring, and performance tuning of the network subsystem.
Structs and Data Structures
NetMetrics
A cloneable structure encapsulating numerous telemetry instruments related to network operations.
Fields
Counters for counting discrete events:
incoming_message: Counts incoming network messages.outgoing_message: Counts outgoing network messages.sent_to_outgoing_buffer_bytes: Number of bytes sent to the outgoing buffer.sent_bytes: Total bytes sent.received_bytes: Total bytes received.errors: Counts error events.warns: Counts warning events.outgoing_transfer_error: Counts errors during outgoing transfers.
Histograms for measuring distributions of durations and sizes:
incoming_buffer_duration: Time spent in the incoming buffer.incoming_message_delivery_duration: Duration of message delivery.outgoing_buffer_duration: Time spent in the outgoing buffer.outgoing_transfer_duration: Duration of outgoing transfer.transfer_after_ser: Time elapsed after serialization during transfer.receive_before_deser: Time from reception to deserialization.original_message_size: Sizes of original (uncompressed) messages.compressed_message_size: Sizes of compressed messages.
Gauges for instantaneous values:
subscriber_count: Number of subscribers.gossip_peers: Number of gossip peers (nodes only).gossip_live_nodes: Number of live gossip nodes plus proxies.
UpDownCounter:
outgoing_buffer_counter: Tracks the number of messages currently in the outgoing buffer.
Observable Gauges (prefixed with underscore):
_incoming_buffer_size: Observes the size of the incoming buffer._outgoing_buffer_size: Observes the size of the outgoing buffer._network_incoming_transfer_inflight: Observes the count of inflight incoming transfers._outgoing_transfer_inflight: Observes the count of inflight outgoing transfers.
state: Shared, thread-safe mutable state for tracking counts of buffers and transfers in various delivery phases.
Usage Example
let meter = opentelemetry::metrics::MeterProvider::new_meter("network");
let net_metrics = NetMetrics::new(&meter);
// Report an incoming message delivery duration
net_metrics.report_incoming_message_delivery_duration(100, "request");
// Start an outgoing buffer phase with 3 messages
net_metrics.start_delivery_phase(DeliveryPhase::OutgoingBuffer, 3, "response", SendMode::Broadcast);
// Finish the outgoing buffer phase after 120ms
net_metrics.finish_delivery_phase(
DeliveryPhase::OutgoingBuffer,
3,
"response",
SendMode::Broadcast,
std::time::Duration::from_millis(120),
);
NetworkState
A private struct holding counts of messages/transfers in various delivery phases:
incoming_buffer_countincoming_transfer_countoutgoing_buffer_countoutgoing_transfer_count
This state is wrapped in a thread-safe mutex (parking_lot::Mutex) and shared using an Arc to enable concurrent updates and observations.
Functions and Methods
NetMetrics::new(meter: &Meter) -> Self
Initializes a new NetMetrics instance with all metrics instruments registered on the provided Meter.
Sets up observable gauges with callbacks that read from the shared
NetworkState.Defines histogram boundaries for durations (in milliseconds) and sizes (in bytes), using
gen_boundariesto generate ranges.Instantiates counters, gauges, histograms, and observable gauges with meaningful metric names reflecting network telemetry.
update_delivery_phase_counter(&self, phase: DeliveryPhase, delta: isize)
Updates the count of messages/transfers in the specified delivery phase by an increment or decrement (delta). This modifies the internal NetworkState.
Parameters
phase: The delivery phase enum variant (IncomingBuffer, IncomingTransfer, OutgoingTransfer, OutgoingBuffer).delta: Signed integer representing the count change (positive to increment, negative to decrement).
report_incoming_message_delivery_duration(&self, value: u64, msg_type: &str)
Records the duration of processing an incoming message delivery.
Uses an out-of-bounds guard macro to prevent recording invalid values.
Tags the measurement with the
msg_type.
report_outgoing_transfer_error(&self, msg_type: &str, send_mode: SendMode, error: TransportError)
Increments the counter for outgoing transfer errors, tagged with message type, send mode, and error kind.
report_subscribers_count(&self, value: usize)
Records the current number of subscribers as a gauge.
report_transfer_after_ser(&self, value: u128)
Records the elapsed time after serialization during transfer, with bounds checking.
report_receive_before_deser(&self, value: u128)
Records the elapsed time from reception to deserialization, with bounds checking.
report_message_size(&self, orig_size: usize, compressed_size: usize, msg_type: &str)
Records sizes of original and compressed messages with bounds checking (max 10 million bytes).
Logs a warning if sizes are out of bounds.
report_gossip_peers(&self, peers: usize, live_nodes_total: u64)
Records the number of gossip peers (nodes only) and live nodes including proxies.
report_sent_to_outgoing_buffer_bytes(&self, bytes: u64, msg_type: &str, send_mode: SendMode)
Accumulates the number of bytes sent to the outgoing buffer, tagged by message type and send mode.
report_sent_bytes(&self, bytes: usize, msg_type: &str, send_mode: SendMode)
Accumulates the total number of bytes sent.
report_received_bytes(&self, bytes: usize, msg_type: &str, send_mode: SendMode)
Accumulates the total number of bytes received.
report_warn(&self, kind: impl Into<Cow<'static, str>>)
Increments the warning counter for a specified kind.
report_error(&self, kind: impl Into<Cow<'static, str>>)
Increments the error counter for a specified kind.
start_delivery_phase(&self, phase: DeliveryPhase, msg_count: usize, msg_type: &str, send_mode: SendMode)
Marks the start of a specified delivery phase:
Updates internal counters.
If the phase is
OutgoingBuffer, increments theoutgoing_buffer_counter.
finish_delivery_phase(&self, phase: DeliveryPhase, msg_count: usize, msg_type: &str, send_mode: SendMode, duration: Duration)
Marks the completion of a delivery phase:
Updates internal counters by decrementing message counts.
Records durations in histograms with bounds checking.
Updates message counters for incoming and outgoing phases accordingly.
Specific handling per phase for metrics updates.
Helper Functions
msg_type_attr(msg_type: &str) -> KeyValue
Returns aKeyValueattribute for the message type.send_mode_attr(send_mode: SendMode) -> KeyValue
Returns aKeyValueattribute indicating whether the send mode is broadcast.transfer_err_attr(error: TransportError) -> KeyValue
Returns aKeyValueattribute representing the kind of transport error.attrs(msg_type: &str, send_mode: SendMode) -> [KeyValue; 2]
Returns an array of attributes combining message type and send mode.gen_boundaries(low: u32, high: u32, step: u32) -> Vec<f64>
Generates a vector of floating point boundary values fromlowtohighwith increments ofstep. Used for histogram bucket boundaries.to_label_kind<S: Into<Cow<'static, str>>>(input: S) -> String
Sanitizes a string to be suitable as a label by replacing invalid characters with underscores and converting to lowercase.
Implementation Details
Thread-safety: Internal mutable state for buffer and transfer counts is protected with a
parking_lot::Mutexwrapped in anArc, allowing concurrent access from metric callbacks and reporting methods.Metric Boundaries: Histogram boundaries are carefully constructed to capture typical network operation latencies and message sizes. Duration boundaries are non-uniform to provide finer granularity for lower latencies and coarser granularity for higher latencies.
Out-of-Bounds Guards: The
out_of_bounds_guard!macro is used to prevent recording metrics with values outside expected ranges, preventing metric pollution and potential issues in visualization.Observable Gauges: These are registered with callbacks that read the current internal state, providing real-time observations of buffer sizes and in-flight transfers.
Interaction with Other Modules
Uses
TransportError,DeliveryPhase, andSendModeenums from other parts of the system (crate::transferand the crate root). These types represent network transfer errors, stages of message delivery, and modes of sending messages, respectively.Uses the
opentelemetry::metricscrate to register and record metrics, integrating with the overall telemetry and observability infrastructure.Uses
telemetry_utils::out_of_bounds_guardmacro to ensure metric values are within acceptable ranges.
This file is integral to the network telemetry pipeline, providing both cumulative counts and real-time state for network message processing, enabling higher-level monitoring and alerting systems to react to network health and performance.
Mermaid Diagram
classDiagram
class NetMetrics {
-incoming_message: Counter<u64>
-incoming_buffer_duration: Histogram<u64>
-incoming_message_delivery_duration: Histogram<u64>
-outgoing_message: Counter<u64>
-outgoing_buffer_counter: UpDownCounter<i64>
-outgoing_buffer_duration: Histogram<u64>
-outgoing_transfer_duration: Histogram<u64>
-outgoing_transfer_error: Counter<u64>
-subscriber_count: Gauge<u64>
-transfer_after_ser: Histogram<u64>
-receive_before_deser: Histogram<u64>
-original_message_size: Histogram<u64>
-compressed_message_size: Histogram<u64>
-gossip_peers: Gauge<u64>
-gossip_live_nodes: Gauge<u64>
-sent_to_outgoing_buffer_bytes: Counter<u64>
-sent_bytes: Counter<u64>
-received_bytes: Counter<u64>
-errors: Counter<u64>
-warns: Counter<u64>
-_incoming_buffer_size: ObservableGauge<u64>
-_outgoing_buffer_size: ObservableGauge<u64>
-_network_incoming_transfer_inflight: ObservableGauge<u64>
-_outgoing_transfer_inflight: ObservableGauge<u64>
-state: Arc<Mutex<NetworkState>>
+new(meter: &Meter) NetMetrics
+report_incoming_message_delivery_duration(value: u64, msg_type: &str)
+report_outgoing_transfer_error(msg_type: &str, send_mode: SendMode, error: TransportError)
+report_subscribers_count(value: usize)
+report_transfer_after_ser(value: u128)
+report_receive_before_deser(value: u128)
+report_message_size(orig_size: usize, compressed_size: usize, msg_type: &str)
+report_gossip_peers(peers: usize, live_nodes_total: u64)
+report_sent_to_outgoing_buffer_bytes(bytes: u64, msg_type: &str, send_mode: SendMode)
+report_sent_bytes(bytes: usize, msg_type: &str, send_mode: SendMode)
+report_received_bytes(bytes: usize, msg_type: &str, send_mode: SendMode)
+report_warn(kind: Cow<'static, str>)
+report_error(kind: Cow<'static, str>)
+start_delivery_phase(phase: DeliveryPhase, msg_count: usize, msg_type: &str, send_mode: SendMode)
+finish_delivery_phase(phase: DeliveryPhase, msg_count: usize, msg_type: &str, send_mode: SendMode, duration: Duration)
}
class NetworkState {
-incoming_buffer_count: usize
-incoming_transfer_count: usize
-outgoing_buffer_count: usize
-outgoing_transfer_count: usize
}
NetMetrics *-- NetworkState