process_ext_messages.rs
Overview
This file defines functionality for processing external messages forwarded to Block Producer (BP) nodes within the system. It primarily exposes an asynchronous function run which accepts a JSON array of node requests and a shared MessageRouter instance, then forwards these requests to resolved BP addresses. The responses from these BPs are collected, processed, and returned to the caller. The forwarding mechanism includes token injection for authentication or tracking, error handling for request or response failures, and detailed telemetry logging.
The file also contains a helper function construct_url to build the HTTP endpoint URLs used for forwarding requests.
Detailed Explanation
Imports and Constants
Uses standard Rust libraries such as
HashMap,SocketAddr,Arc, andDuration.Employs
serde_jsonfor JSON manipulation.Uses
tracingfor telemetry and logging.References internal crates and modules such as
base64_id_decode,MessageRouter, and default configuration constants.Lazy static variables
NODE_URL_PATHandROUTER_URL_PATHare initialized from environment variables or default values for URL path configuration.
run Function
pub async fn run(
node_requests: serde_json::Value,
message_router: Arc<MessageRouter>,
) -> anyhow::Result<serde_json::Value>
Purpose
Processes an array of external messages (node_requests) by forwarding them to Block Producer nodes resolved via the MessageRouter. It returns a JSON response from one of the BPs or an error message.
Parameters
node_requests: Aserde_json::Valueexpected to be an array of JSON objects representing individual node requests. Each object is expected to have at least an"id"and optionally a"thread_id".message_router: AnArc<MessageRouter>providing thread-safe access to the message routing logic, including BP address resolution and token issuance.
Return Value
Returns a
serde_json::Valuewrapped in ananyhow::Result.On success, this is the JSON response from a BP node.
On failure, this is a JSON error object conforming to the
ExtMsgResponsestructure.
Usage Example
let node_requests = serde_json::json!([
{"id": "abc123", "thread_id": "thread_1", /* other fields */},
{"id": "def456", "thread_id": "thread_1", /* other fields */}
]);
let response = process_ext_messages::run(node_requests, message_router.clone()).await?;
println!("BP response: {:?}", response);
Function Workflow and Implementation Details
Input Validation:
Checks if
node_requestsis an array; if not, logs an error and returns aBAD_REQUESTerror JSON.
Thread ID Extraction:
Extracts a
thread_idfrom the first element ofnode_requests, falling back to a default 64-character zero string if missing.
ID Decoding:
Uses
base64_id_decodeto decode the"id"field of each request, building aHashMapassociating the original base64 ID with its decoded form.
Logging:
Logs the received external messages by their
"id"values.
BP Resolution:
Uses a mutex-locked resolver in
message_router.bp_resolverto obtain a list of BP socket addresses associated with thethread_id.If no BPs are found, returns an
INTERNAL_ERROR.
Token Injection:
Obtains a fresh token from
message_router.issue_token()and injects it into each request object under the"ext_message_token"field.
HTTP Client Setup:
Constructs a
reqwest::Clientwith a timeout defined byDEFAULT_BK_API_TIMEOUT.
Request Forwarding Loop:
For each BP address resolved:
Constructs the forwarding URL using
construct_url.Sends a POST request with the modified requests JSON.
Sets a custom header
"X-EXT-MSG-SENT"with the current timestamp in milliseconds.Awaits the response asynchronously.
Response Handling:
On successful response:
Parses the response body as JSON.
Injects a fresh token into the response JSON.
Returns the response JSON immediately.
On failure (HTTP request failure or JSON parsing error):
Logs the error.
Builds an error response JSON with detailed error data including the recipient IP, the decoded ID, and the thread ID.
Continues to the next BP in the list.
Return Value:
If all BP requests fail, returns the last constructed error JSON.
Otherwise, returns the first successful BP response.
construct_url Function
fn construct_url(host: SocketAddr) -> String
Purpose
Generates the full URL string to which external messages will be forwarded for a given BP socket address.
Parameters
host: ASocketAddrcontaining the IP address and port of the BP node.
Return Value
Returns a
Stringrepresenting the full URL, composed of a protocol prefix, the IP, port, and a path suffix.
Implementation Detail
Uses a formatted string with constant protocol
DEFAULT_NODE_URL_PROTO, the IP and port fromhost, and the path from the lazily loadedNODE_URL_PATHvariable.
Example
let addr = "192.168.1.10:8080".parse().unwrap();
let url = construct_url(addr);
// Example output: "http://192.168.1.10:8080/api/node"
Important Implementation Details and Algorithms
Thread-safe BP Resolution: Accessing
message_router.bp_resolveris protected by a mutex lock to safely resolve BP nodes for the given thread. This prevents race conditions in concurrent environments.Token Management: Tokens are issued both when forwarding requests and when returning responses to ensure that messages are authenticated or tracked appropriately within the system.
Robust Error Handling: The function handles various failure modes including bad input formats, inability to resolve BPs, HTTP request failures, and JSON parsing failures. Each failure generates structured error responses with detailed diagnostic information.
Request Augmentation: Each request JSON object is augmented with an
"ext_message_token"prior to forwarding, allowing downstream BP nodes to verify or use this token.Use of Async/Await: The forwarding requests use asynchronous HTTP requests with
reqwestto avoid blocking operations and support concurrent handling in the wider system.
Interaction with Other System Components
MessageRouter: Central to the file’s operation,
MessageRouterprovides BP resolution and token issuance. It is passed intorunas anArcto allow shared concurrent access.BP Nodes: The system forwards external messages to BP nodes resolved by the
MessageRouter. These nodes are expected to expose HTTP APIs at predictable URLs constructed byconstruct_url.Base64 ID Decoding: The utility function
base64_id_decodeis used to decode request IDs for validation and error reporting.Telemetry and Logging: Uses the
tracingcrate to log informational, trace, and error messages for observability.HTTP Server Response Types: Structures such as
http_server::ExtMsgResponseandhttp_server::ExtMsgErrorDataare used to format error responses according to the system’s messaging protocols.
Mermaid Diagram: Flowchart of run function workflow
flowchart TD
A["Start: run(node_requests, message_router)"] --> B{Is node_requests an array?}
B -- No --> C[Log error: BAD_REQUEST]
C --> D[Return error JSON: BAD_REQUEST]
B -- Yes --> E[Extract thread_id from first request or use default]
E --> F[Decode IDs from requests]
F --> G[Resolve BP addresses via message_router.bp_resolver]
G --> H{Any BPs resolved?}
H -- No --> I["Return error JSON: INTERNAL_ERROR (no BPs)"]
H -- Yes --> J[Inject ext_message_token into each request]
J --> K[Create HTTP client with timeout]
K --> L[For each BP address]
L --> M[Construct forwarding URL]
M --> N[Send POST request with requests JSON and timestamp header]
N --> O{Response received?}
O -- Yes --> P[Parse response body as JSON]
P --> Q{Parsing successful?}
Q -- Yes --> R[Inject ext_message_token into response]
R --> S[Return response JSON]
Q -- No --> T[Log JSON parse error]
T --> U[Create error JSON with details]
U --> L
O -- No --> V[Log request failure]
V --> W[Create error JSON with details]
W --> L
This diagram illustrates the primary decision points and operations within the asynchronous run function, showing message validation, BP resolution, request forwarding, and error handling loops.