bk_set_watcher.rs
Overview
This file implements an asynchronous background task that continuously monitors and updates a set of "Bk" entities, represented by the ApiBkSet structure. The main functionality involves querying configured backend addresses (bk_addrs) for their current ApiBkSet data, selecting the most up-to-date set based on a sequence number (seq_no), and broadcasting changes in trusted public keys to other system components using Tokio watch channels.
The watcher ensures that the trusted Bk sets and their associated public keys remain synchronized with potentially multiple backend sources over time, updating only when newer data is available.
Detailed Explanation
Function: get_bk_set_url
fn get_bk_set_url(addr: SocketAddr) -> String
Purpose: Constructs the HTTP URL string to fetch the Bk set update from a given backend address.
Parameters:
addr: ASocketAddrrepresenting the backend server's IP and port.
Returns: A
StringURL in the format"http://{addr}/v2/bk_set_update".Usage: Used internally to build requests for querying backend Bk sets.
Async Function: run
pub async fn run(
config_rx: tokio::sync::watch::Receiver<ProxyConfig>,
bk_set_tx: tokio::sync::watch::Sender<HashSet<transport_layer::VerifyingKey>>,
client: reqwest::Client,
bk_set_watch_interval_sec: u64,
)
Purpose: Main watcher loop that periodically queries backend servers to update the trusted Bk set and its associated public keys.
Parameters:
config_rx: Tokio watch channel receiver that provides the currentProxyConfigcontaining backend addresses (bk_addrs) to query.bk_set_tx: Tokio watch channel sender that broadcasts updated sets of trusted public keys (HashSet<VerifyingKey>).client:reqwest::Clientused to perform HTTP GET requests to backend servers.bk_set_watch_interval_sec: Interval in seconds between polling cycles.
Returns: None (runs indefinitely).
Behavior:
Initializes an empty
ApiBkSetand an empty set of trusted public keys.Enters an infinite loop performing these steps:
Clones the current
ProxyConfigto get updated backend addresses.Spawns asynchronous tasks to query each backend address concurrently using
reqwest.Uses
join_allto await all backend responses.Selects the "winner" Bk set with the highest sequence number (
seq_no) usingget_winner_bk_set.If a newer
ApiBkSetis found, updates the internal state.Extracts owner public keys from both current and future Bk sets, and compares with the previous trusted keys.
If the trusted keys have changed, broadcasts the new set through
bk_set_tx.Sleeps for the configured interval before repeating.
Important Implementation Details:
Uses Tokio's asynchronous runtime features such as
tokio::spawnandtokio::time::sleep.Error handling logs errors for failed requests or task joins but continues operation.
The function
bk_owner_pubkeysafely attempts to parse public keys from Bk entries.
Usage Example:
This function is expected to be spawned as a background task within the system's async runtime, passing appropriate communication channels and HTTP client instances.
Function: get_winner_bk_set
fn get_winner_bk_set(current: Option<ApiBkSet>, candidate: ApiBkSet) -> Option<ApiBkSet>
Purpose: Compares two
ApiBkSetinstances and returns the one with the higher sequence number (seq_no).Parameters:
current: An optional currentApiBkSet.candidate: A candidateApiBkSetto compare against the current.
Returns:
Some(ApiBkSet)that has the greaterseq_no.If no current is present, returns the candidate.
Usage: Used internally within the watcher loop to determine the most up-to-date backend response among multiple concurrent queries.
Example:
let current = Some(ApiBkSet { seq_no: 3, .. }); let candidate = ApiBkSet { seq_no: 5, .. }; let winner = get_winner_bk_set(current, candidate); assert_eq!(winner.unwrap().seq_no, 5);
Tests
Contains unit tests for
get_winner_bk_setverifying correct selection behavior for various sequence numbers.Uses Rust's built-in testing framework with assertions to ensure expected functionality.
Important Implementation Details and Algorithms
Concurrency and Parallel Requests: The watcher performs parallel HTTP GET requests to all backend addresses using
tokio::spawnandfutures::future::join_all. This approach maximizes responsiveness and avoids blocking on any single slow backend.State Management: The watcher keeps track of the current Bk set and the trusted public keys. It only updates and broadcasts changes if the newly fetched Bk set has a strictly higher sequence number to avoid regressions.
Public Key Extraction: Owner public keys are extracted from both current and future Bk sets using a helper function that decodes raw bytes into the
VerifyingKeytype. This ensures the trusted keys reflect all potentially valid owners.Error Handling: All network or deserialization errors are logged via the
tracingcrate, but do not terminate the watcher loop, ensuring resilience.
Interaction with Other System Components
Configuration Integration: Reads backend addresses and other parameters from a shared
ProxyConfigreceived via a Tokio watch channel (config_rx). This allows dynamic reconfiguration without restarting the watcher.Trusted Public Keys Broadcast: Sends updated sets of trusted Bk owner public keys through a Tokio watch channel (
bk_set_tx). Other components dependent on trusted keys subscribe to this channel to update their access control or verification logic accordingly.HTTP Backend Interface: Sends HTTP requests to backend services exposing the
/v2/bk_set_updateendpoint. These services return JSON-serializedApiBkSetdata used to update local state.Transport Layer: Uses types and utilities from the
transport_layercrate to handle cryptographic public keys and formatting (VerifyingKey,pubkeys_info).Logging: Uses the
tracingcrate for structured logging of errors, state changes, and informational messages related to Bk set updates.
For related concurrency patterns and asynchronous communication, see Tokio Asynchronous Rust and for cryptographic key handling, refer to Transport Layer Cryptography.
Data Structures
ApiBkSet: Represents a set of Bk entities with:
seq_no: u64— sequence number denoting version.current: Vec<ApiBk>— current active Bk entries.future: Vec<ApiBk>— future (upcoming) Bk entries.
ApiBk: Represents a single Bk entity, containing an owner public key among other fields.
VerifyingKey: Cryptographic public key type used as identity for Bk owners.
Mermaid Diagram: Flowchart of Main Functions and Workflow
flowchart TD
Start["Start run()"]
GetConfig["Read ProxyConfig (bk_addrs)"]
SpawnTasks["Spawn async tasks for each bk_addr"]
WaitResults["Await all bk_set responses"]
SelectWinner["Select ApiBkSet with max seq_no"]
CompareSeqNo["Compare winner.seq_no with current.seq_no"]
UpdateBkSet["Update internal ApiBkSet if newer"]
ExtractKeys["Extract owner pubkeys from current & future"]
CompareKeys["Compare new keys with previous keys"]
Broadcast["Broadcast new keys if changed"]
Sleep["Sleep for configured interval"]
LoopBack["Repeat loop"]
Start --> GetConfig --> SpawnTasks --> WaitResults --> SelectWinner
SelectWinner --> CompareSeqNo --> UpdateBkSet --> ExtractKeys --> CompareKeys
CompareKeys -->|Keys changed| Broadcast --> Sleep --> LoopBack
CompareKeys -->|No change| Sleep --> LoopBack
This flowchart illustrates the continuous loop inside the run function that updates the Bk set and trusted keys by querying backend servers, processing responses, and broadcasting changes to other system components.