mod.rs
Overview
This file defines abstractions and interfaces for handling binary large objects (blobs) within a distributed or local storage system. It provides traits to encapsulate blob data sources and services that support sharing and retrieving these blobs asynchronously with callback-based completion handlers. The file also includes an implementation of the Blob trait for all compatible types implementing std::io::Read, and it contains comprehensive unit tests demonstrating usage patterns for blob sharing and loading.
The module exposes the submodule external_fileshares_based, which likely contains an implementation of the blob synchronization service based on external file share protocols.
Detailed Explanations
Types
ResourceId
pub type ResourceId = String;
Aliases
ResourceIdtoString.Represents a unique identifier for a resource or blob within the system.
Used as a key for sharing and retrieving blobs.
Traits
Blob
pub trait Blob {
fn into_read(self) -> anyhow::Result<impl std::io::Read + Send + Sync + 'static>;
}
Represents a data source that can be converted into a readable stream.
The
into_readmethod consumes the blob instance and returns aReadtrait object wrapped in ananyhow::Result.The returned reader must be
Send,Sync, and have a'staticlifetime, enabling safe concurrent and asynchronous usage.
Parameters:
self: The blob instance to convert into a reader.
Returns:
anyhow::Resultwrapping a reader implementingstd::io::Read + Send + Sync + 'static.
Usage Example:
let blob_instance: impl Blob = ...;
let reader = blob_instance.into_read()?;
BlobSyncService
pub trait BlobSyncService {
fn share_blob<Callback>(
&mut self,
resource_id: ResourceId,
blob: impl Blob,
on_complete: Callback,
) -> anyhow::Result<()>
where
Callback: FnOnce(anyhow::Result<()>) + Send + Sync + 'static;
#[allow(clippy::too_many_arguments)]
fn load_blob<SuccessCallback, ErrorCallback>(
&mut self,
resource_id: ResourceId,
known_external_blob_share_services: Vec<url::Url>,
max_tries: u8,
retry_download_timeout: Option<std::time::Duration>,
deadline: Option<std::time::Instant>,
on_success: SuccessCallback,
on_error: ErrorCallback,
) -> anyhow::Result<()>
where
SuccessCallback: FnOnce(&mut dyn std::io::Read) + Send + Sync + 'static,
ErrorCallback: FnOnce(anyhow::Error) + Send + Sync + 'static;
}
Defines an interface for services managing blob synchronization and sharing.
Supports asynchronous sharing and loading of blobs identified by
ResourceId.
share_blob
Shares a blob identified by
resource_id.Takes ownership of the blob to be shared.
The
on_completecallback is invoked once sharing completes or fails.
Parameters:
resource_id: Unique identifier for the blob.blob: The blob data source implementing theBlobtrait.on_complete: Callback invoked with the result of the sharing operation.
Returns:
anyhow::Result<()>indicating whether the share operation was successfully initiated.
load_blob
Loads a blob by
resource_id, potentially from multiple external blob share services.Supports retry logic, timeout, and deadline constraints.
Accepts two callbacks:
on_successto receive the blob as a readable stream, andon_errorfor error handling.
Parameters:
resource_id: Unique identifier for the blob to load.known_external_blob_share_services: List of external service URLs to try for loading the blob.max_tries: Maximum number of retry attempts.retry_download_timeout: Optional delay between retries.deadline: Optional instant after which the load attempt should be aborted.on_success: Callback invoked with a mutableReadstream on successful load.on_error: Callback invoked with an error if loading fails.
Returns:
anyhow::Result<()>indicating whether the load operation was successfully initiated.
Trait Implementations
Blob for all std::io::Read types
impl<T> Blob for T
where
T: std::io::Read + Send + Sync + 'static,
{
fn into_read(self) -> anyhow::Result<impl std::io::Read + Send + Sync + 'static> {
Ok(self)
}
}
Automatically implements the
Blobtrait for any type that implementsstd::io::Readand isSend,Sync, and'static.The
into_readmethod simply returnsself, making it convenient to use any compatible reader as a blob.
Testing and Example Usage
The tests module contains an example use case and tests demonstrating the usage of Blob and BlobSyncService.
ExampleUsecase Struct
Holds a thread-safe vector of
i32values wrapped withArc<Mutex<>>.Implements
Blobby converting the vector of integers into a byte stream.
struct ExampleUsecase {
data: std::sync::Arc<std::sync::Mutex<std::vec::Vec<i32>>>,
}
impl ExampleUsecase {
pub fn new() -> Self {
let data = std::sync::Arc::new(std::sync::Mutex::new(vec![1, 2, 3]));
Self { data }
}
}
impl Blob for ExampleUsecase {
fn into_read(self) -> anyhow::Result<impl std::io::Read + Send + Sync + 'static> {
let Ok(guarded) = self.data.lock() else {
anyhow::bail!("failed to aquire lock");
};
let data: Vec<i32> = guarded.clone();
let cursor = std::io::Cursor::new(
data.into_iter().flat_map(|e| e.to_be_bytes()).collect::<Vec<u8>>(),
);
Ok(cursor)
}
}
Converts internal data into a big-endian byte stream using a
Cursor.
example_usecase Function
Demonstrates how to use a
BlobSyncServiceimplementation withExampleUsecase.
fn example_usecase<T>(mut service: T)
where
T: BlobSyncService,
{
let tmp_dir = tempfile::tempdir().unwrap();
std::env::set_current_dir(&tmp_dir).unwrap();
let some_resource_id = "some-resource-id".to_string();
// Share blob
{
let (tx, rx) = std::sync::mpsc::channel();
let some_blob_source = ExampleUsecase::new();
const MARKER: u32 = 1;
service
.share_blob(some_resource_id.clone(), some_blob_source, move |_e| {
tx.send(MARKER).unwrap();
})
.expect("ok");
let data = rx.recv_timeout(std::time::Duration::from_secs(1)).expect("Must be fast");
assert!(data == MARKER);
}
// Load blob and verify content
{
let (tx, rx) = std::sync::mpsc::channel();
let tx_ok = tx.clone();
let tx_fail = tx;
const MARKER_SUCCESS: u32 = 1;
const MARKER_FAILED: u32 = 2;
service
.load_blob(
some_resource_id,
vec![],
1,
None,
None,
move |e| {
let mut buffer = Vec::new();
e.read_to_end(&mut buffer).expect("read to end on a blob");
let data: Vec<_> = buffer
.chunks(4)
.map(|e| i32::from_be_bytes(<[u8; 4]>::try_from(e).unwrap()))
.collect();
tx_ok.send((MARKER_SUCCESS, data)).unwrap();
},
move |_e| {
tx_fail.send((MARKER_FAILED, vec![])).unwrap();
},
)
.expect("ok");
let (marker, data) = rx.recv_timeout(std::time::Duration::from_secs(1)).expect("Must finish fast");
assert!(marker == MARKER_SUCCESS);
assert!(data == [1i32, 2i32, 3i32])
}
}
The function tests sharing and loading a blob using the service interface.
Utilizes channels for synchronization and verification.
Confirms correct data transmission and retrieval.
Unit Test: ensure_basic_flow_for_external_fileshares_based_service
Instantiates an external fileshares based blob sync service from the submodule
external_fileshares_based.Executes the
example_usecasetest using this service.
#[test]
fn ensure_basic_flow_for_external_fileshares_based_service() {
let service = external_fileshares_based::ExternalFileSharesBased::builder()
.local_storage_share_base_path("./tmp".into())
.build()
.start(None)
.expect("should be able to start");
example_usecase(service.interface());
}
Validates the integration of the blob sync service implementation with the provided test use case.
Implementation Details and Algorithms
The file uses asynchronous callback patterns for sharing and loading blobs to avoid blocking operations.
Implements retries, deadlines, and timeouts in
load_blobto handle unreliable network conditions or remote services.The
Blobtrait abstraction allows flexible blob data sources, including in-memory buffers, files, or complex custom data holders.
Interactions with Other Modules
The file declares a public submodule
external_fileshares_based, which contains one concrete implementation of theBlobSyncServiceinterface, presumably managing blob synchronization via external file shares.The test suite depends on the
external_fileshares_basedservice to validate the behavior of the blob sync abstractions.Uses external crates such as
anyhowfor error handling,urlfor handling service URLs, and standard concurrency primitives likeArc,Mutex, and channels for thread-safe communication in tests.
Mermaid Diagram: Structure of mod.rs
classDiagram
class Blob {
+into_read()
}
class BlobSyncService {
+share_blob()
+load_blob()
}
class ExampleUsecase {
-data: Arc<Mutex<Vec<i32>>>
+new()
+into_read()
}
BlobSyncService <|.. external_fileshares_based::ExternalFileSharesBased
ExampleUsecase ..|> Blob
Blobtrait defines the contract for readable blob sources.BlobSyncServicetrait defines async sharing/loading interface.ExampleUsecaseis a test struct implementingBlob.external_fileshares_based::ExternalFileSharesBasedimplementsBlobSyncService(implied by usage).