aerospike.rs
Overview
This file provides an implementation of a key-value store interface for Aerospike, a distributed NoSQL database. It defines the AerospikeStore struct, which manages interaction with the Aerospike cluster, including reading, writing, and batch operations on records. The implementation includes robust write retry logic, read/write metrics collection, and optional debug statistics for database operations. The store supports splitting large values via the SplitValueStore wrapper, facilitating storage of large data blobs with a configurable chunk size.
Constants
DEFAULT_AEROSPIKE_MESSAGE_CACHE_MAX_ENTRIES: usize
Defines the maximum number of Aerospike records to cache in an LRU cache, set to 10,000.BIN_HASH,
BIN_BLOB,BIN_SEQ: &str
String constants representing Aerospike bin names used for storing hash, binary blob, and sequence number respectively.NAMESPACE: &str
Aerospike namespace used by the store, set to "node".WRITE_RETRY_MILLIS: u64
Duration in milliseconds to wait before retrying a failed Aerospike write (set to 5 ms).
Structs and Traits
AerospikeStore
A cloneable struct encapsulating an Aerospike client and associated policies for reads, writes, and batch operations. It optionally holds metrics for block production and debug statistics.
Fields
client: Arc<Client>
Thread-safe reference-counted pointer to the Aerospike client.rpolicy: ReadPolicy
Read policy configuration for Aerospike queries.wpolicy: WritePolicy
Write policy configuration for Aerospike writes.bpolicy: BatchPolicy
Policy configuration for batch reads.metrics: Option<BlockProductionMetrics>
Optional metrics collector to report Aerospike operation timings and errors.stats: Arc<Stats>(debug only)
Debug statistics for counting read and write operations.
Methods
new(socket_address: String, metrics: Option<BlockProductionMetrics>) -> anyhow::Result<SplitValueStore<AerospikeStore>>
Creates a new AerospikeStore instance connected to the provided Aerospike cluster socket address. It initializes default client, read, write, and batch policies, sets up metrics if provided, and wraps the store in a SplitValueStore with a chunk size of 1MB - 16KB (1,048,576 - 16,384 bytes).
Parameters
socket_address— Aerospike server address string (e.g.,"127.0.0.1:3000").metrics— Optional metrics collector for block production.
Returns
Ok(SplitValueStore) — On successful connection.
Err(anyhow::Error) — If the Aerospike client fails to connect.
Usage Example
let store = AerospikeStore::new("127.0.0.1:3000".to_string(), None)?;
put_until_success(&self, key: &Key, bins: &[Bin], object_type: &'static str)
Attempts to write the given bins to Aerospike under the specified key repeatedly until the operation succeeds. On failure, logs the error, reports metrics, and sleeps for a predefined interval before retrying.
Parameters
key— Aerospike key to write to.bins— Slice of Aerospike bins representing the data.object_type— Label for metric reporting identifying the type of object being written.
Implementation Details
Uses a loop to retry writes indefinitely.
On the first successful write, reports the elapsed time via metrics.
On each failure, logs an error and reports a write error metric.
Sleeps for
WRITE_RETRY_MILLISmilliseconds between retries.
Trait Implementation: KeyValueStore for AerospikeStore
Implements the KeyValueStore trait to provide Aerospike-based persistent storage operations.
Methods
get(&self, key: &Key, values: &Bins, label: &'static str) -> anyhow::Result<Option<ValueMap>>
Retrieves a record from Aerospike by key and requested bins.
Reports read metrics and increments debug read counters if enabled.
Returns
Ok(Some(ValueMap))if the record exists.Returns
Ok(None)if the key is not found.Returns an error on other Aerospike failures.
Parameters
key— Aerospike key for the record.values— Bins to retrieve.label— Label for metric reporting.
put(&self, key: &Key, bins: &[Bin], until_success: bool, label: &'static str) -> anyhow::Result<()>
Writes bins to Aerospike under the specified key.
If
until_successis true, usesput_until_successto retry until successful.Otherwise, attempts a single write and reports metrics or errors accordingly.
Increments debug write counters if enabled.
Parameters
key— Aerospike key.bins— Data bins to write.until_success— Flag to retry until success.label— Label for metrics.
batch_get(&self, gets: Vec<(Key, Bins)>, _label: &'static str) -> anyhow::Result<Vec<Option<ValueMap>>>
Performs batch retrieval of multiple records.
Converts key/bin pairs to Aerospike
BatchReadobjects.Reports read metrics and increments debug counters.
Returns a vector of optional
ValueMaps corresponding to each requested key.
Parameters
gets— Vector of tuples(Key, Bins)specifying records and bins to fetch._label— Metric label (unused in this method).
Debug Methods (only compiled with debug assertions)
db_reads(&self) -> usize
Returns the number of database read operations performed.db_writes(&self) -> usize
Returns the number of database write operations performed.
Stats (Debug Only)
A struct to track the number of Aerospike database reads and writes using atomic counters. Used for debugging and testing purposes.
Fields
db_reads: AtomicUsize— Count of read operations.db_writes: AtomicUsize— Count of write operations.
Methods
new() -> Self
Creates a new stats counter initialized to zero.inc_reads(&self, val: usize)
Increments the read count byval.inc_writes(&self)
Increments the write count by one.db_reads(&self) -> usize
Returns the current read count.db_writes(&self) -> usize
Returns the current write count.Defaulttrait implemented to callnew().
Important Implementation Details
Write Retry Logic:
Theput_until_successmethod ensures that transient Aerospike write failures do not cause data loss by retrying indefinitely with a small backoff.Metrics Integration:
OptionalBlockProductionMetricsintegration allows tracking Aerospike read/write latencies and error rates, which is crucial for performance monitoring and alerting.Batch Operations:
Supports batch reads using Aerospike'sbatch_getAPI for efficient retrieval of multiple keys.Debug Counters:
Conditional compilation (#[cfg(debug_assertions)]) adds atomic counters for reads and writes, aiding in test verification and debugging.SplitValueStore Wrapping:
TheAerospikeStoreis wrapped in aSplitValueStorethat handles splitting large values into smaller chunks to fit Aerospike's record size limits (roughly 1MB per record minus header overhead).
Interaction with Other System Components
Aerospike Client Library:
Uses theaerospikecrate for direct communication with Aerospike servers, managing keys, bins, policies, and error handling.SplitValueStoreWrapper:
The store is wrapped bySplitValueStore, which manages value chunking and reassembly. This enables storage of large blobs by splitting them into smaller Aerospike records. See SplitValueStore for details.Metrics (
BlockProductionMetrics):
Collects and reports Aerospike operation metrics. See BlockProductionMetrics for metric definitions and reporting mechanisms.KeyValueStore Trait:
Implements the genericKeyValueStoretrait used by the storage layer of the application. See KeyValueStore Trait for interface details.
Mermaid Diagram: AerospikeStore Structure and Relationships
classDiagram
class AerospikeStore {
-client: Arc<Client>
-rpolicy: ReadPolicy
-wpolicy: WritePolicy
-bpolicy: BatchPolicy
-metrics: Option<BlockProductionMetrics>
+new()
+put_until_success()
+get()
+put()
+batch_get()
}
class SplitValueStore {
+new()
}
class Stats {
-db_reads: AtomicUsize
-db_writes: AtomicUsize
+inc_reads()
+inc_writes()
+db_reads()
+db_writes()
}
AerospikeStore o-- Client : uses
AerospikeStore o-- BlockProductionMetrics : optionally uses
AerospikeStore o-- Stats : debug only
AerospikeStore --> SplitValueStore : wrapped by
This diagram illustrates the core components and their relationships within the file. The AerospikeStore relies on the Aerospike Client for database operations, optionally reports metrics, and tracks debug statistics if enabled. It is wrapped by SplitValueStore to handle large value chunking.