mod.rs
Overview
This file implements a service for synchronizing binary large objects (blobs) using external file shares. It provides functionality to share blobs and load blobs from external services while managing these operations asynchronously through an internal service loop. The service leverages an instrumented messaging channel for communication and supports metrics integration to monitor blob synchronization commands.
The main abstraction is the ServiceInterface which implements the BlobSyncService trait, exposing methods to share and load blobs. The service is started via the ExternalFileSharesBased struct, which encapsulates the base path for local storage of shared files.
Structs and Their Responsibilities
ExternalFileSharesBased
Purpose: Configuration holder that defines the local base path for storing shared blobs.
Fields:
local_storage_share_base_path: PathBuf- Filesystem path where blobs are locally stored/shared.
Methods:
start(metrics: Option<BlockProductionMetrics>) -> anyhow::Result<Service>
Spawns the service's internal thread and initializes the messaging channel used for blob synchronization commands.Parameters:
metrics: Optional metrics collector for monitoring command channel operations.
Returns:
Serviceinstance managing the inner loop thread and interface.
Usage Example:
let config = ExternalFileSharesBased::builder() .local_storage_share_base_path("/path/to/storage".into()) .build(); let service = config.start(Some(metrics_collector))?;
Service
Purpose: Represents the running blob synchronization service, managing the internal thread and interface.
Fields:
inner_loop: std::thread::JoinHandle<()>- Handle to the spawned thread running the service inner loop.interface: ServiceInterface- Interface for sending commands to the inner loop.
Methods:
interface(&self) -> ServiceInterface
Returns a clone of the interface to interact with the service.join(self)
Shuts down the service by dropping the interface and joining the internal thread.
Usage Example:
let service_interface = service.interface(); // Use service_interface to share/load blobs service.join();
ServiceInterface
Purpose: Provides an API to send commands to the internal service loop, implementing the
BlobSyncServicetrait.Fields:
control: InstrumentedSender<service_inner_loop::Command>- Instrumented channel sender for command dispatch.
Traits Implemented:
Clone- Allows cloning the interface for concurrent use.BlobSyncService- Defines the core methods to share and load blobs.
BlobSyncService Trait Implementation
ServiceInterface implements the following methods:
share_blob
fn share_blob<Callback>(
&mut self,
resource_id: ResourceId,
blob: impl Blob,
on_complete: Callback,
) -> anyhow::Result<()>
where
Callback: FnOnce(anyhow::Result<()>) + Send + Sync + 'static,
Purpose: Shares a blob identified by
resource_idvia the service, notifying completion via a callback.Parameters:
resource_id: Identifier for the resource/blob.blob: Blob data to be shared, convertible into aReadtrait object.on_complete: Callback executed once sharing completes, receiving aResultindicating success or failure.
Behavior:
Converts the blob to a readable stream and sends aSharecommand to the inner service loop channel. Errors in sending commands or blob conversion are propagated.Return:
Ok(())if the command was enqueued successfully.Errif sending the command or blob conversion fails.
load_blob
fn load_blob<SuccessCallback, ErrorCallback>(
&mut self,
resource_id: ResourceId,
known_external_blob_share_services: Vec<url::Url>,
max_tries: u8,
retry_download_timeout: Option<std::time::Duration>,
deadline: Option<std::time::Instant>,
on_success: SuccessCallback,
on_error: ErrorCallback,
) -> anyhow::Result<()>
where
SuccessCallback: FnOnce(&mut dyn std::io::Read) + Send + Sync + 'static,
ErrorCallback: FnOnce(anyhow::Error) + Send + Sync + 'static,
Purpose: Attempts to load a blob from a set of external blob share services, retrying as specified.
Parameters:
resource_id: Identifier of the blob to load.known_external_blob_share_services: List of URLs representing external blob share services.max_tries: Maximum number of retry attempts.retry_download_timeout: Optional delay between retries.deadline: Optional time by which the load operation should complete.on_success: Callback invoked with a readable blob stream upon successful load.on_error: Callback invoked if loading fails.
Behavior:
Constructs aDownloadOptionsstruct encapsulating retry and timeout parameters, then sends aLoadcommand to the internal loop.Return:
Ok(())if the command was successfully enqueued.Errif command sending fails.
Internal Command Loop
The service runs an internal thread (inner_loop) which receives commands via an instrumented channel. These commands are defined in the service_inner_loop module and include:
Command::Share(ResourceId, Box<dyn Read>, Box<FnOnce>)Command::Load(ResourceId, Vec<url::Url>, DownloadOptions, Box<FnOnce>, Box<FnOnce>)
The inner loop handles these commands asynchronously, performing the actual file I/O and network operations necessary to share or download blobs. This design decouples the interface from the blocking operations and allows the service to process multiple commands efficiently.
Interaction with Other Modules
BlobandResourceId: Core abstractions representing binary data and their identifiers.BlobSyncServicetrait: Defines the interface implemented byServiceInterface.service_inner_loopmodule: Contains the internal command processing logic and command definitions.download_blobandshare_blobmodules: Presumably contain implementations related to downloading and sharing blobs, invoked by the inner loop.telemetry_utils::mpsc: Provides instrumented messaging channels used for inter-thread communication, integrating metrics.helper::metrics::BlockProductionMetrics: Optional metrics used to monitor service performance.
Important Implementation Details
The use of
instrumented_channelallows metrics collection on command send/receive rates, which is crucial for monitoring performance in high-throughput scenarios.The
TypedBuildermacro simplifies constructing theExternalFileSharesBasedconfiguration with type-safe builders.The
ServiceInterfaceimplementsCloneto allow multiple handles to the control channel, enabling concurrent command submissions.Asynchronous processing is achieved by spawning a dedicated thread for the service's inner loop, isolating blocking I/O operations from the caller's thread.
Callback functions for success and error handling enable flexible, asynchronous response management for both sharing and loading operations.
The retry logic in
load_blobis configurable via parameters likemax_tries,retry_download_timeout, anddeadline, allowing fine-tuned control over network resilience.
Diagram: Service Structure and Interaction Flow
classDiagram
class ExternalFileSharesBased {
+local_storage_share_base_path: PathBuf
+start()
}
class Service {
+inner_loop: JoinHandle
+interface: ServiceInterface
+interface()
+join()
}
class ServiceInterface {
+control: InstrumentedSender
+share_blob()
+load_blob()
}
ExternalFileSharesBased --> Service : starts
Service --> ServiceInterface : provides interface
ServiceInterface ..> service_inner_loop::Command : sends commands
This diagram illustrates the relationships between the main structs: ExternalFileSharesBased starts the Service, which holds an internal thread and provides a ServiceInterface for command submission. The ServiceInterface sends commands to the internal loop defined in service_inner_loop.
Usage Workflow
Initialization:
Create anExternalFileSharesBasedinstance with the local storage path and optional metrics.Service Start:
Call.start()to launch the service thread and obtain aServiceinstance.Command Submission:
Use theServiceInterfaceobtained from Service.interface() to call:share_blobfor sharing blobs.load_blobfor loading blobs from external services.
Service Shutdown:
Call Service.join() to gracefully shut down the internal thread after dropping interfaces.