external_fileshares_based.rs
Overview
This file implements a state synchronization mechanism for a distributed system based on external file shares. It provides functionality to save the application state for sharing and to load state snapshots from multiple external storage URLs, including distributed nodes discovered via a gossip protocol. The core component is the ExternalFileSharesBased struct, which integrates with a blob synchronization service to download state snapshots from configured static storages and dynamic sources discovered through gossip.
The file relies on concurrent task execution and synchronization primitives to manage state loading in the background. It also tracks download retries, timeouts, and error reporting during state synchronization.
Structs and Implementations
ExternalFileSharesBased
A struct that implements the StateSyncService trait to handle state synchronization using external file shares.
Fields
static_storages: Vec<Url>
A list of static URLs representing external storage services where state snapshots can be downloaded.max_download_tries: u8
Maximum number of retry attempts for downloading a state snapshot.retry_download_timeout: Duration
Time interval between retry attempts.download_deadline_timeout: Duration
Overall deadline timeout for completing a download.blob_sync: ServiceInterface
Interface to a blob synchronization service responsible for downloading blobs (state snapshots).file_saving_service: FileSavingService
Service responsible for saving state snapshots to local storage.state_load_thread: Arc<Mutex<Option<JoinHandle<()>>>>
Thread handle wrapped in a mutex and atomic reference for managing the background state loading task.chitchat: ChitchatRef
Reference to the gossip protocol state used to discover other nodes and their advertised storage URLs.bk_set_rx: tokio::sync::watch::Receiver<ApiBkSet>
Receiver channel for updates to the API backend set, used to determine active nodes.
Methods
new(blob_sync: ServiceInterface, file_saving_service: FileSavingService, chitchat: ChitchatRef, bk_set_rx: tokio::sync::watch::Receiver<ApiBkSet>) -> Self
Creates a new instance of ExternalFileSharesBased with default retry and timeout configurations. The static storage list is initially empty.
Usage example:
let service = ExternalFileSharesBased::new(blob_sync, file_saving_service, chitchat, bk_set_rx);
Private Function: get_node_id_and_download_url_from_gossip
Parses a chitchat::NodeState entry to extract the node identifier and the URL advertised for API access.
Parameters:
entry: &chitchat::NodeState- A gossip node state entry.Returns:
Option<(NodeIdentifier, &str)>- The node ID and advertised URL if both are present and valid.Details:
It relies on specific gossip keys:"node_id"andGOSSIP_API_ADVERTISE_ADDR_KEYto extract the node ID and the API URL respectively.
Trait Implementation: StateSyncService for ExternalFileSharesBased
This implementation allows the struct to participate in state synchronization workflows.
Associated Types
Repository = RepositoryImpl
Specifies the repository type used for storing and retrieving state.
Methods
save_state_for_sharing(&self, state: Arc<OptimisticStateImpl>) -> anyhow::Result<()>
Saves the current optimistic state to persistent storage for sharing with other nodes.
Parameters:
state- Shared reference to the optimistic state snapshot.Returns:
anyhow::Result<()>indicating success or failure.Implementation:
Constructs a file name based on the state's block identifier and delegates saving tofile_saving_service.Usage example:
service.save_state_for_sharing(state)?;
reset_sync(&self)
Resets the synchronization process by dropping the current background state loading thread, if any.
Usage:
Called to cancel any ongoing state loading tasks.
add_load_state_task(&mut self, resource_address: BTreeMap<ThreadIdentifier, BlockIdentifier>, repository: RepositoryImpl, output: InstrumentedSender<anyhow::Result<BTreeMap<ThreadIdentifier, BlockIdentifier>>>) -> anyhow::Result<()>
Starts asynchronous tasks to load state snapshots for the specified resource addresses from external sources.
Parameters:
resource_address: Mapping of thread identifiers to block identifiers representing states to load.repository: Repository instance to apply loaded state snapshots.output: Channel sender to emit success or error results of the loading operation.
Returns:
anyhow::Result<()>indicating task scheduling success or failure.Workflow:
Checks if a state loading thread is already running; skips if so.
Collects current active nodes from the API backend set and merges their advertised storage URLs with static storages.
For each
(thread_id, block_id)pair:Initiates blob download via
blob_sync.load_blobfrom the aggregated external storage URLs.On successful download, reads the data into a buffer and sets the state snapshot in the repository.
On failure, reports error metrics and sends error through the output channel.
Spawns a monitoring thread that polls periodically to check if all requested states have been loaded.
When all states are loaded, sends a success result with the original resource address mapping.
Concurrency:
UsesArc<Mutex<...>>wrappers to safely share mutable state across threads.Retry and timeout:
Respects the configured retry counts and timeouts for downloading blobs.Usage example:
service.add_load_state_task(resource_address, repository, output_sender)?;
Important Implementation Details
Blob download sources:
The download URLs are collected from a combination of static storages and dynamic nodes discovered via gossip. The gossip entries must contain the node ID and an advertised API URL. These URLs are normalized by appending"v2/storage/"path segments.State snapshot storage:
Local saving of state snapshots leverages theFileSavingServiceabstraction, which handles object persistence.Metrics reporting:
The method updates metrics for state requests and error occurrences via themetricsinterface from the repository.Thread lifecycle management:
The background loading thread is stored inside a mutex-protectedOption<JoinHandle<()>>. This allows safe cancellation and status checking.Shutdown handling:
The background thread periodically checks a globalSHUTDOWN_FLAGto terminate cleanly upon application shutdown.
Interactions with Other Parts of the System
blob_sync::ServiceInterface
Used to download blob objects (state snapshots) from external storages.FileSavingService
Used to save state snapshots to local persistent storage.ChitchatRef
Provides access to gossip protocol state for discovering nodes and their advertised storage URLs.ApiBkSet
Represents the API backend set, used to identify active nodes.RepositoryImplandOptimisticStateImpl
Used to apply loaded state snapshots for state synchronization.InstrumentedSender
Used for emitting results and errors from asynchronous state loading tasks.SHUTDOWN_FLAG
Global flag monitored to gracefully stop running threads during shutdown.
Mermaid Diagram: ExternalFileSharesBased Structure and Workflow
classDiagram
class ExternalFileSharesBased {
+static_storages: Vec<Url>
+max_download_tries: u8
+retry_download_timeout: Duration
+download_deadline_timeout: Duration
-blob_sync: ServiceInterface
-file_saving_service: FileSavingService
-state_load_thread: Arc<Mutex<Option<JoinHandle<()>>>>
-chitchat: ChitchatRef
-bk_set_rx: tokio::sync::watch::Receiver<ApiBkSet>
+new()
+save_state_for_sharing()
+reset_sync()
+add_load_state_task()
}
class RepositoryImpl {
+get_metrics()
+set_state_from_snapshot()
}
class FileSavingService {
+save_object()
}
class ServiceInterface {
+load_blob()
}
ExternalFileSharesBased --> ServiceInterface : uses
ExternalFileSharesBased --> FileSavingService : uses
ExternalFileSharesBased --> ChitchatRef : uses
ExternalFileSharesBased --> RepositoryImpl : interacts with
ExternalFileSharesBased --> "State Load Thread" : manages
note for ExternalFileSharesBased "Downloads state snapshots\nfrom static and dynamic URLs\nand applies them to repository"
References to Related Topics
State Synchronization- Concepts and mechanisms for synchronizing distributed state across nodes.Blob Synchronization Service- Service responsible for downloading and managing blob data.Gossip Protocol- Distributed node state sharing and discovery mechanism.Repository Pattern- Abstraction for data storage and retrieval.Concurrency and Threading- Techniques for safe concurrent execution and synchronization.