delta.rs
Overview
This file defines the Delta abstraction used for synchronizing state updates between nodes in a distributed system. A Delta encapsulates a set of state changes related to nodes, expressed as sequences of operations that update nodes' key-value data and metadata. It supports efficient serialization and deserialization of these operations into a compressed stream for network transmission.
The core purpose of the Delta type and its related components is to represent incremental updates to node states, including key-value mutations and version tracking, and to provide mechanisms for encoding these updates into a compact binary format that can be sent to other nodes.
Key Components
Delta
A Delta is a collection of NodeDelta instances representing updates for multiple nodes. It primarily holds:
node_deltas: Vec<NodeDelta>— The list of node-specific deltas.serialized_len: usize— Cached length of the serialized delta payload.
Methods
default() -> Self
Creates an emptyDeltawith an initial serialized length of 1.get_operations() -> impl Iterator<Item = DeltaOpRef<'_>>
Returns an iterator over all operations (DeltaOpRef) in the delta, flattening all node deltas into a stream of operations in this order:Nodeoperation (node metadata)KeyValueoperations (each key-value mutation)Optional
SetMaxVersionoperation (max version tracking for the node)
Serialization / Deserialization
Implements
SerializableandDeserializabletraits to convert the delta into/from a compressed stream of operations usingCompressedStreamWriterand theDeltaOpenum.
Usage Example
let mut delta = Delta::default();
delta.add_node(chitchat_id, last_gc_version, from_version);
delta.add_kv(&chitchat_id, "key", "value", version, false);
let mut buf = Vec::new();
delta.serialize(&mut buf);
NodeDelta
Represents the set of changes related to a single node.
Fields
chitchat_id: ChitchatId— Unique identifier of the node.from_version_excluded: Version— Lower bound (exclusive) version from which this delta applies.last_gc_version: Version— Version indicating the last garbage collection state.key_values: Vec<KeyValueMutation>— List of key-value mutations applied to this node.max_version: Option<Version>— Optional maximum version included in this delta.
Details
from_version_excludedandlast_gc_versiondefine conditions for applying this delta, ensuring it applies to a consistent state.The delta is structured so that all versions in
(from_version_excluded..max_version]are present except for keys deleted before or atlast_gc_version.See
prepare_apply_delta(..)for rules on applying these versions in the system.
Usage Example
let node_delta = NodeDelta {
chitchat_id,
last_gc_version,
from_version_excluded,
key_values: vec![...],
max_version: Some(max_version),
};
DeltaOp and DeltaOpRef
Enumerations representing atomic operations within a delta.
DeltaOpowns its data.DeltaOpRefholds references for serialization efficiency.
Variants
Node— Contains node metadata (chitchat_id,last_gc_version,from_version_excluded).KeyValue— Encapsulates aKeyValueMutation.SetMaxVersion— Sets the maximum version for the node.
Serialization
Tagged with
DeltaOpTag(u8) to indicate operation type.Implements
SerializableandDeserializabletraits for encoding/decoding operations.
DeltaOpTag
An enum with explicit u8 discriminants to tag operation types during serialization:
Node= 0KeyValue= 1SetMaxVersion= 2
It implements TryFrom<u8> for decoding tags and From<DeltaOpTag> for u8 for encoding.
DeltaBuilder
A helper struct to incrementally build a Delta from a stream of DeltaOp operations:
Fields
existing_nodes: HashSet<ChitchatId>— Tracks nodes already added.delta: Delta— The delta under construction.current_node_delta: Option<NodeDelta>— The node delta currently being populated.
Methods
apply_op(&mut self, op: DeltaOp) -> anyhow::Result<()>
Applies an operation to the builder, enforcing correct ordering (e.g., key-values must follow a node operation).flush(&mut self)
Completes the current node delta and appends it to the delta.finish(self, len: usize) -> Delta
Finalizes the delta building, flushes pending state, and sets the serialized length.
DeltaSerializer
A utility struct to serialize parts of a delta while respecting a maximum transmission unit (MTU) size.
Fields
mtu: usize— Maximum allowed size for the serialized payload.delta_builder: DeltaBuilder— Builder to accumulate operations.compressed_stream_writer: CompressedStreamWriter— Handles compressed serialization.
Constants
BLOCK_THRESHOLD: u16 = 16_384— Threshold size for compression blocks.
Methods
with_mtu(mtu: usize) -> Self
Creates a new serializer with the specified MTU.try_add_node(chitchat_id: ChitchatId, last_gc_version: Version, from_version: Version) -> bool
Attempts to add a node operation; returnsfalseif it would exceed MTU.try_add_kv(key: &str, versioned_value: VersionedValue) -> bool
Attempts to add a key-value mutation operation; returnsfalseif it would exceed MTU.try_set_max_version(max_version: Version) -> bool
Attempts to add a max version operation; returnsfalseif it would exceed MTU.finish(self) -> Delta
Finalizes serialization and returns the builtDelta.
Important Implementation Details and Algorithms
The Delta is serialized as a compressed stream of operations using
CompressedStreamWriter. The operations are serialized in a specific order per node: first aNodeop, then zero or moreKeyValueops, and optionally aSetMaxVersionop.The
DeltaBuilderensures operations are applied in the correct sequence for each node, enforcing that key-value mutations come after a node operation and that versions increase monotonically.The
DeltaSerializermanages serialization within a size constraint (MTU), stopping when adding a new operation would exceed the MTU. It uses pessimistic upper bounds on serialized length before actually serializing to avoid overshooting the MTU.The
from_version_excludedandlast_gc_versionfields inNodeDeltaexpress the delta's applicability range relative to the node's state versions, enabling efficient incremental synchronization.Operations are tagged with a single byte (
DeltaOpTag) to identify their types during serialization/deserialization.
Interaction with Other Parts of the System
Serialization and Deserialization rely on the traits
SerializableandDeserializablefrom theserializemodule. This file depends heavily on these traits for encoding/decoding operations.The
ChitchatId,Version,VersionedValue, and mutation types (KeyValueMutation,DeletionStatusMutation) come from thetypesmodule and define the types of data mutated in deltas.The compressed stream format and compression logic are provided by
CompressedStreamWriterfrom theserializemodule. This file uses it to efficiently encode delta operations.The
Deltastructure and serialization format enable nodes to exchange state updates incrementally in the distributed system's synchronization protocol.Testing code utilizes utilities from the
tokiocrate and thetypesmodule'sDeletionStatusto verify correctness of delta serialization logic.
Data Structure Diagram
classDiagram
class Delta {
-node_deltas: Vec~NodeDelta~
-serialized_len: usize
+get_operations()
+serialize()
+deserialize()
}
class NodeDelta {
+chitchat_id: ChitchatId
+last_gc_version: Version
+from_version_excluded: Version
+key_values: Vec~KeyValueMutation~
+max_version: Option~Version~
}
class DeltaOp {
<<enum>>
+Node
+KeyValue
+SetMaxVersion
}
class DeltaOpRef {
<<enum>>
+Node
+KeyValue
+SetMaxVersion
}
class DeltaBuilder {
-existing_nodes: HashSet~ChitchatId~
-delta: Delta
-current_node_delta: Option~NodeDelta~
+apply_op()
+flush()
+finish()
}
class DeltaSerializer {
-mtu: usize
-delta_builder: DeltaBuilder
-compressed_stream_writer: CompressedStreamWriter
+try_add_node()
+try_add_kv()
+try_set_max_version()
+finish()
}
Delta "1" *-- "*" NodeDelta : contains
DeltaBuilder "1" o-- "1" Delta : builds
DeltaBuilder "1" *-- "0..1" NodeDelta : current node
DeltaSerializer "1" o-- "1" DeltaBuilder : uses
DeltaSerializer "1" o-- "1" CompressedStreamWriter : uses
DeltaOp <|-- DeltaOpRef : reference variant
Detailed Descriptions of Functions and Methods
Delta
fn get_operations(&self) -> impl Iterator<Item = DeltaOpRef<'_>>
Returns an iterator over all operations in the delta, by iterating through eachNodeDeltainnode_deltasand flattening into a stream of operations:Nodeoperation describing the node metadata.KeyValueoperations for each key-value mutation in the node.Optional
SetMaxVersionoperation if the node delta hasmax_version.
fn serialize(&self, buf: &mut Vec<u8>)
Serializes the delta into a compressed stream of operations usingCompressedStreamWriterwith a block threshold of 16_384 bytes. It appends all operations to the compressed stream and writes the final compressed payload tobuf.fn serialized_len(&self) -> usize
Returns the cached serialized length of the delta payload.fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self>
Deserializes a delta from a compressed stream of operations. It uses deserialize_stream to get a vector ofDeltaOps, applies them to aDeltaBuilderto reconstruct the delta structure, and returns the finishedDelta.
DeltaBuilder
fn apply_op(&mut self, op: DeltaOp) -> anyhow::Result<()>
Applies a single operation to the builder:On
Nodeop: flushes any current node delta, ensures node uniqueness, and starts a newcurrent_node_delta.On
KeyValueop: appends the mutation to the current node delta, verifying monotonically increasing versions.On
SetMaxVersionop: setsmax_versionon the current node delta.
Returns an error if operations are out of order or violate constraints.
fn flush(&mut self)
Finalizes and stores the current node delta into the builder's delta, clearingcurrent_node_delta.fn finish(mut self, len: usize) -> Delta
Flushes any pending node delta, sets the serialized length, and returns the built delta.
DeltaSerializer
fn with_mtu(mtu: usize) -> Self
Creates a new serializer instance with the given MTU size. It configures the compression block threshold as the minimum of the constantBLOCK_THRESHOLDand the MTU.fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version, from_version: Version) -> bool
Attempts to add a new node operation. Returnsfalseif adding this operation would exceed the MTU.fn try_add_kv(&mut self, key: &str, versioned_value: VersionedValue) -> bool
Attempts to add a key-value mutation operation. Returnsfalseif adding would exceed MTU.fn try_set_max_version(&mut self, max_version: Version) -> bool
Attempts to add a max version operation. Returnsfalseif adding would exceed MTU.fn finish(self) -> Delta
Completes serialization, finalizes the compressed stream, and returns the constructedDelta.
Types Used From Other Modules
ChitchatId: Unique identifier for nodes.Version: Numeric version type used for state versioning.VersionedValue: Struct holding a value, its version, and deletion status.KeyValueMutation/ KeyValueMutationRef: Represent mutations to key-value pairs.DeletionStatusMutation: Enum indicating whether a key is set or deleted.CompressedStreamWriter: Utility for writing compressed sequences of serialized objects.
For details on these types and serialization traits, refer to ChitchatId, [Version](/version), [VersionedValue](/versioned-value), and Serialization.
Testing
The file contains comprehensive tests to validate:
Serialization and deserialization of empty and populated deltas.
Behavior of the
DeltaSerializerrespecting MTU limits.Correct ordering and version constraints of operations.
Proper handling of failure cases (e.g., exceeding MTU, invalid operation sequences).
Correct round-trip encoding of
DeltaOpTag.
These tests use the tokio crate and utilities for creating test ChitchatId instances and VersionedValues with specific deletion statuses.
Summary of Workflow
Building a Delta:
Start with an empty
Delta.Add
NodeDeltas for each node.Add key-value mutations and optional max version per node.
Serialization:
Convert the
Deltainto a sequence ofDeltaOpoperations.Use
CompressedStreamWriterto serialize these operations into a compressed stream.
Transmission:
Send compressed delta payload over the network (respecting MTU).
Deserialization:
Receive compressed stream.
Decompress and deserialize into
DeltaOps.Use
DeltaBuilderto reconstruct theDeltastructure.