service_inner_loop.rs
Overview
This file implements the core loop of a blob sharing and downloading service that operates on files identified by ResourceId. It provides asynchronous mechanisms for sharing local blobs (files) and loading blobs from external sources, handling retries and timeouts as specified. The service runs in a scoped thread environment, processing commands received through a channel and spawning worker threads to perform share or load operations concurrently.
The main functionality centers around the service_inner_loop function, which listens for Command messages to either share a blob by storing it locally or to download a blob from known external URLs with retry and timeout logic. This file interacts with the submodules download_blob and share_blob that encapsulate the actual file transfer logic.
Types and Data Structures
ShareCallback
type ShareCallback = Box<dyn FnOnce(anyhow::Result<()>) + Send + Sync + 'static>;
Description: A one-time callback called upon completion of a share operation. It receives a
Resultindicating success or failure.Usage: Passed along in the
Command::Sharevariant. Called with the result ofshare_blob.
LoadSuccessCallback
type LoadSuccessCallback = Box<dyn FnOnce(&mut dyn std::io::Read) + Send + Sync + 'static>;
Description: Callback invoked when a blob is successfully loaded and opened for reading.
Usage: Used in
Command::Loadto provide the loaded blob stream for further processing.
LoadErrCallback
type LoadErrCallback = Box<dyn FnOnce(anyhow::Error) + Send + Sync + 'static>;
Description: Callback invoked when loading fails, providing an error describing the failure.
Usage: Used in
Command::Loadwhen download or file opening fails.
KnownExternalFileshares
type KnownExternalFileshares = Vec<url::Url>;
Description: A vector of base URLs representing known external shares from which the blob can be downloaded.
Usage: Used in
Command::Loadto attempt downloading blobs from external sources.
DownloadOptions
pub struct DownloadOptions {
pub max_tries: u8,
pub retry_timeout: Option<std::time::Duration>,
pub deadline: Option<std::time::Instant>,
}
Fields:
max_tries: Maximum number of download attempts.retry_timeout: Optional timeout between retries.deadline: Optional absolute deadline for the entire download operation.
Purpose: Controls the retry and timeout behavior in downloading blobs.
Usage: Passed to
download_blobduring load operations.
Command
pub(super) enum Command {
Share(ResourceId, Box<dyn std::io::Read + Send + Sync + 'static>, ShareCallback),
Load(
ResourceId,
KnownExternalFileshares,
DownloadOptions,
LoadSuccessCallback,
LoadErrCallback,
),
}
Variants:
Share: Request to share a local blob identified byResourceId. Contains a readable stream and a callback to signal completion.Load: Request to load a blob for the givenResourceIdfrom external sources, with download options and success/error callbacks.
Usage: Commands are sent through the
InstrumentedReceiverchannel toservice_inner_loop.
Functions
service_inner_loop
pub(super) fn service_inner_loop(
local_storage_share_base_path: PathBuf,
control: InstrumentedReceiver<Command>,
)
Parameters:
local_storage_share_base_path: The base directory path under which blobs are saved or loaded locally.control: A receiver channel for incomingCommandmessages controlling the service.
Behavior:
Runs an infinite loop receiving commands until the channel disconnects.
For
Sharecommands:Spawns a scoped thread named
"share-<resource_id>"to invokeshare_blob.Calls the provided completion callback with the result.
For
Loadcommands:Spawns a scoped thread named
"load-<resource_id>".Shuffles the list of known external URLs, appending the
resource_idto each.Calls
download_blobwith retry and timeout parameters.If download succeeds, opens the local file and calls the success callback with the file reader.
On failures at any stage, invokes the error callback.
Implementation Details:
Uses scoped threads (
std::thread::scope) to ensure proper thread lifecycle management.Uses
Box::leakto extend the lifetime of the stream reference passed toshare_blob.Uses
tracing::tracefor detailed logging of operations.Relies on
rand::seq::SliceRandomto randomize the download URL order, increasing chances of successful retrieval.
Example Usage:
let options = DownloadOptions { max_tries: 3, retry_timeout: Some(std::time::Duration::from_secs(5)), deadline: None, }; control_sender.send(Command::Load( resource_id, vec![url::Url::parse("https://example.com/shares/").unwrap()], options, Box::new(|reader| { // Process the blob stream }), Box::new(|error| { eprintln!("Failed to load blob: {:?}", error); }), ));
Interaction with Other Modules
download_blobmodule:The
service_inner_loopcallsdownload_blobto attempt downloading blobs from external URLs.download_blobhandles retries, timeouts, and actual HTTP or file retrieval mechanisms.
share_blobmodule:Used by the
Sharecommand handler to write blobs to local storage.
ResourceIdtype:Serves as a unique identifier for blobs. It is used as a path component when saving or loading blobs.
telemetry_utils::mpsc::InstrumentedReceiver:The channel abstraction for receiving commands, potentially with instrumentation for metrics or logging.
Important Implementation Algorithms and Details
Randomized URL Selection:
When loading a blob, the list of known external shares is shuffled before attempting downloads. This randomization helps distribute load and increases the chance of successful downloads if some URLs are unavailable or slow.
Scoped Thread Spawning:
Threads spawned for sharing or loading blobs are scoped, ensuring they do not outlive the parent thread scope, helping manage concurrency safely.
Efficient Stream Handling:
The share stream is leaked (
Box::leak) to obtain a'staticlifetime required byshare_blob, enabling asynchronous processing without ownership conflicts.
Retry and Timeout Management:
Download attempts respect
max_tries,retry_timeout, anddeadlinespecified inDownloadOptions, allowing robust download behavior under unreliable network conditions.
Workflow Diagram
flowchart TD
A[Start service_inner_loop] --> B{Receive Command}
B --> |Share| C[Spawn share thread]
B --> |Load| D[Spawn load thread]
C --> E[Construct share path]
E --> F[Call share_blob]
F --> G[Invoke ShareCallback]
G --> B
D --> H[Shuffle URLs]
H --> I[Construct local file path]
I --> J[Call download_blob with retry & timeout]
J --> K{Download success?}
K --> |Yes| L[Open downloaded file]
L --> M[Invoke LoadSuccessCallback]
K --> |No| N[Invoke LoadErrCallback]
M --> B
N --> B
This flowchart illustrates the main control flow of the service_inner_loop function handling Share and Load commands, showing the steps and callbacks involved in each process.