dispatcher.rs
Overview
This file implements a message dispatching mechanism for routing network messages within a multi-threaded environment, where messages are associated with specific threads identified by a ThreadIdentifier. The Dispatcher struct manages routing information and directs incoming messages to the appropriate thread-specific channels for processing, distinguishing between regular node messages and those related to authority (consensus or control) protocols.
The dispatcher handles various types of network messages (NetworkMessage) by evaluating their thread association and routing them accordingly. It supports dynamic route addition and checks for existing routes before dispatching messages.
Types and Enumerations
Payload
type Payload = (NetworkMessage, SocketAddr);
Description: A tuple consisting of a network message and the socket address of its origin.
Usage: Represents the unit of data dispatched through the routing system.
DispatchError
pub enum DispatchError {
NoRoute(ThreadIdentifier, Payload),
DestinationClosed(ThreadIdentifier, Payload),
}
Variants:
NoRoute(ThreadIdentifier, Payload): Indicates that no route exists for the message's thread.DestinationClosed(ThreadIdentifier, Payload): Indicates that the destination channel for the thread is closed, and the message could not be sent.
Usage: Returned as an error type from the dispatching process to signal routing failures.
Structs
Dispatcher
pub struct Dispatcher {
routes: HashMap<ThreadIdentifier, (XInstrumentedSender<Payload>, XInstrumentedSender<Payload>)>,
}
Fields:
routes: A hash map that associates eachThreadIdentifierwith a pair of instrumented senders:The first sender is for regular node messages.
The second sender is for authority-related messages.
Purpose: Maintains routing information and provides methods to add routes and dispatch messages based on their thread association.
Implementation Details
Uses
XInstrumentedSender<Payload>from thetelemetry_utilscrate, which wraps sending channels with instrumentation capabilities, likely for monitoring or tracing purposes.The two senders per thread separate authority protocol messages from other node messages, reflecting distinct processing paths.
Trait Implementations
Default for Dispatcher
impl Default for Dispatcher {
fn default() -> Self {
Self::new()
}
}
Provides a default constructor delegating to
Dispatcher::new().
Methods
new
pub fn new() -> Self
Description: Constructs a new
Dispatcherwith an empty routing table.Returns: A new
Dispatcherinstance.
has_route
pub fn has_route(&mut self, thread_identifier: &ThreadIdentifier) -> bool
Parameters:
thread_identifier: Reference to the thread ID to check.
Description: Checks if a route exists for the specified thread.
Returns:
trueif a route exists,falseotherwise.Usage Example:
if dispatcher.has_route(&thread_id) {
// Route exists, safe to dispatch messages
}
add_route
pub fn add_route(
&mut self,
thread_identifier: ThreadIdentifier,
node: XInstrumentedSender<Payload>,
authority: XInstrumentedSender<Payload>,
)
Parameters:
thread_identifier: The thread ID to associate with the route.node: Sender for node-related messages.authority: Sender for authority-related messages.
Description: Adds or updates a routing entry for a thread, associating it with two senders.
Usage Example:
dispatcher.add_route(thread_id, node_sender, authority_sender);
dispatch
pub fn dispatch(&self, message: Payload) -> anyhow::Result<(), DispatchError>
Parameters:
message: The message payload (network message and source socket address) to be dispatched.
Description:
Determines the thread ID and whether the message pertains to an authority protocol or regular node processing by matching on the message variant.
Ignores
StartSynchronizationcommands as they are for local use only.If a route exists for the thread, it sends the message to the appropriate sender (authority or node).
Returns
Ok(())if successful.Returns a
DispatchErrorif no route exists or the destination channel is closed.
Return:
Ok(())on success.Err(DispatchError)on failure.
Implementation Detail:
Uses pattern matching on
NetworkMessagevariants to extract routing information.Logs warnings when messages are received for unexpected threads.
Uses the
WrappedItemtype fromtelemetry_utilsto wrap the message with a label for instrumentation.
Usage Example:
match dispatcher.dispatch(message) {
Ok(_) => println!("Message dispatched successfully"),
Err(DispatchError::NoRoute(thread, _)) => eprintln!("No route for thread {:?}", thread),
Err(DispatchError::DestinationClosed(thread, _)) => eprintln!("Channel closed for thread {:?}", thread),
}
Important Implementation Notes
The dispatcher maintains separate channels for node and authority messages per thread, reflecting the different handling requirements for these categories.
The routing key is derived from the message content, often from embedded thread IDs or from authority protocol routing logic (
authority_switch::routing::route).The
dispatchmethod explicitly ignores theStartSynchronizationinner command, indicating that some messages are meant for local intra-thread signaling rather than network dispatch.Error handling differentiates between missing routes and closed destination channels, allowing upstream components to react accordingly.
The file uses
tracingmacros for debug and warning logs to facilitate runtime monitoring of dispatch operations.
Interactions with Other Components
NetworkMessageandCommand: The dispatcher relies on these enums to determine routing and message type. Their variants are used extensively in thedispatchmethod's match statement.ThreadIdentifier: Acts as the routing key for messages, linking them to the appropriate processing thread.XInstrumentedSenderandWrappedItem(fromtelemetry_utils): Provide instrumented message channels used for sending dispatched messages, enabling telemetry and tracing.authority_switch::routing: Contains routing logic specific to authority protocol messages, invoked to get the thread ID for such messages.
Diagram: Dispatcher Structure and Message Flow
flowchart TD
A["Incoming Payload (NetworkMessage + SocketAddr)"]
B{Message Type?}
C[Authority Switch Protocol]
D[Other Network Messages]
E[Extract ThreadIdentifier]
F{Route Exists for Thread?}
G[Send via Authority Sender]
H[Send via Node Sender]
I[Return Ok]
J[Return DispatchError::NoRoute]
K[Return DispatchError::DestinationClosed]
A --> B
B -->|AuthoritySwitchProtocol| C
B -->|Others| D
C --> E
D --> E
E --> F
F -->|Yes| G
F -->|No| J
G --> I
G -->|Send Error| K
H --> I
H -->|Send Error| K
E -->|Is Authority| G
E -->|Is Node| H
The flowchart illustrates how an incoming message is examined to determine its type, extract the routing thread identifier, and then dispatched through the appropriate sender channel if a route exists. Errors in routing or closed destinations lead to specific error returns.
This file is central to the message routing subsystem, enabling thread-specific message handling with clear separation between authority and node message flows, while incorporating instrumentation hooks for monitoring. For thread and message type definitions, see ThreadIdentifier and NetworkMessage. For details on authority protocol routing logic, see authority_switch::routing.