process_ext_messages.rs

Overview

This file defines functionality for processing external messages forwarded to Block Producer (BP) nodes within the system. It primarily exposes an asynchronous function run which accepts a JSON array of node requests and a shared MessageRouter instance, then forwards these requests to resolved BP addresses. The responses from these BPs are collected, processed, and returned to the caller. The forwarding mechanism includes token injection for authentication or tracking, error handling for request or response failures, and detailed telemetry logging.

The file also contains a helper function construct_url to build the HTTP endpoint URLs used for forwarding requests.


Detailed Explanation

Imports and Constants


run Function

pub async fn run(
    node_requests: serde_json::Value,
    message_router: Arc<MessageRouter>,
) -> anyhow::Result<serde_json::Value>

Purpose

Processes an array of external messages (node_requests) by forwarding them to Block Producer nodes resolved via the MessageRouter. It returns a JSON response from one of the BPs or an error message.

Parameters

Return Value

Usage Example

let node_requests = serde_json::json!([
    {"id": "abc123", "thread_id": "thread_1", /* other fields */},
    {"id": "def456", "thread_id": "thread_1", /* other fields */}
]);

let response = process_ext_messages::run(node_requests, message_router.clone()).await?;
println!("BP response: {:?}", response);

Function Workflow and Implementation Details

  1. Input Validation:

    • Checks if node_requests is an array; if not, logs an error and returns a BAD_REQUEST error JSON.

  2. Thread ID Extraction:

    • Extracts a thread_id from the first element of node_requests, falling back to a default 64-character zero string if missing.

  3. ID Decoding:

    • Uses base64_id_decode to decode the "id" field of each request, building a HashMap associating the original base64 ID with its decoded form.

  4. Logging:

    • Logs the received external messages by their "id" values.

  5. BP Resolution:

    • Uses a mutex-locked resolver in message_router.bp_resolver to obtain a list of BP socket addresses associated with the thread_id.

    • If no BPs are found, returns an INTERNAL_ERROR.

  6. Token Injection:

    • Obtains a fresh token from message_router.issue_token() and injects it into each request object under the "ext_message_token" field.

  7. HTTP Client Setup:

    • Constructs a reqwest::Client with a timeout defined by DEFAULT_BK_API_TIMEOUT.

  8. Request Forwarding Loop:

    • For each BP address resolved:

      • Constructs the forwarding URL using construct_url.

      • Sends a POST request with the modified requests JSON.

      • Sets a custom header "X-EXT-MSG-SENT" with the current timestamp in milliseconds.

      • Awaits the response asynchronously.

  9. Response Handling:

    • On successful response:

      • Parses the response body as JSON.

      • Injects a fresh token into the response JSON.

      • Returns the response JSON immediately.

    • On failure (HTTP request failure or JSON parsing error):

      • Logs the error.

      • Builds an error response JSON with detailed error data including the recipient IP, the decoded ID, and the thread ID.

      • Continues to the next BP in the list.

  10. Return Value:

    • If all BP requests fail, returns the last constructed error JSON.

    • Otherwise, returns the first successful BP response.


construct_url Function

fn construct_url(host: SocketAddr) -> String

Purpose

Generates the full URL string to which external messages will be forwarded for a given BP socket address.

Parameters

Return Value

Implementation Detail

Example

let addr = "192.168.1.10:8080".parse().unwrap();
let url = construct_url(addr);
// Example output: "http://192.168.1.10:8080/api/node"

Important Implementation Details and Algorithms


Interaction with Other System Components


Mermaid Diagram: Flowchart of run function workflow

flowchart TD
A["Start: run(node_requests, message_router)"] --> B{Is node_requests an array?}
B -- No --> C[Log error: BAD_REQUEST]
C --> D[Return error JSON: BAD_REQUEST]
B -- Yes --> E[Extract thread_id from first request or use default]
E --> F[Decode IDs from requests]
F --> G[Resolve BP addresses via message_router.bp_resolver]
G --> H{Any BPs resolved?}
H -- No --> I["Return error JSON: INTERNAL_ERROR (no BPs)"]
H -- Yes --> J[Inject ext_message_token into each request]
J --> K[Create HTTP client with timeout]
K --> L[For each BP address]
L --> M[Construct forwarding URL]
M --> N[Send POST request with requests JSON and timestamp header]
N --> O{Response received?}
O -- Yes --> P[Parse response body as JSON]
P --> Q{Parsing successful?}
Q -- Yes --> R[Inject ext_message_token into response]
R --> S[Return response JSON]
Q -- No --> T[Log JSON parse error]
T --> U[Create error JSON with details]
U --> L
O -- No --> V[Log request failure]
V --> W[Create error JSON with details]
W --> L

This diagram illustrates the primary decision points and operations within the asynchronous run function, showing message validation, BP resolution, request forwarding, and error handling loops.