service.rs
Overview
This file implements the RoutingService, which acts as a central router and dispatcher for network messages and external messages within the system. It manages the lifecycle of message-handling threads, routes incoming messages to appropriate handlers, and handles feedback for external messages. The service ensures efficient message forwarding, thread management, and feedback processing using multiple threads and channels.
The core responsibilities include:
Receiving and routing network messages and external messages.
Managing the creation and coordination of node threads responsible for specific blockchain threads.
Maintaining a dispatcher to route messages to the appropriate thread handlers.
Handling feedback for external messages to prevent duplicates and signal errors.
Providing an interface for starting and joining message-processing threads.
Key Types and Constants
MAX_POISONED_QUEUE_SIZE: usize = 10000
Maximum capacity for the poisoned queue which temporarily holds messages that cannot be dispatched immediately.FeedbackMessage
Alias for(NetworkMessage, Option<oneshot::Sender<ExtMsgFeedback>>)representing an external network message paired optionally with a feedback sender.FeedbackRegistry
HashMap keyed by message hash strings, mapping to oneshot::Sender to track outstanding feedback requests.PoisonedQueue
A specialized queue type (PoisonedQueue<(NetworkMessage, SocketAddr)>) that stores messages that failed routing temporarily.Node
Type alias forNodeImpl<ExternalFileSharesBased, rand::prelude::SmallRng>, representing the node handler instances.
Enumerations
Command
Defines commands sent to the routing service's control loop.
Variants:
ExtMessage(NetworkMessage)
Represents an external network message to be routed.Route(NetworkMessage, SocketAddr)
Route a network message originating from a specified socket address.StartThread((ThreadIdentifier, BlockIdentifier))
Command to start a new thread handler with the given thread ID and its parent block ID.JoinThread(ThreadIdentifier)
Command to join an existing thread identified by the thread ID.
Structs
RoutingService
Main service struct managing routing and feedback channels:
Fields
cmd_sender: InstrumentedSender<Command>
Sender channel for routing commands.feedback_sender: InstrumentedSender<ExtMsgFeedbackList>
Sender channel for external message feedback lists.
Methods
new(...) -> (RoutingService, InstrumentedReceiver<Command>, JoinHandle<()>, JoinHandle<()>)
Creates a newRoutingServiceinstance and spawns two forwarding threads: one for network messages and one for external messages.Parameters:
inbound_network_receiver: InstrumentedReceiver<IncomingMessage>— Incoming network messages.inbound_ext_messages_receiver: InstrumentedReceiver<FeedbackMessage>— Incoming external messages with feedback.metrics: Option<BlockProductionMetrics>— Optional metrics for block production.net_metrics: Option<NetMetrics>— Optional network metrics.
Returns:
A tuple containing:
The
RoutingServiceinstance.Receiver for commands (
InstrumentedReceiver<Command>).Join handles for the two forwarding threads.
Usage example:
let (routing_service, cmd_receiver, net_thread, ext_thread) = RoutingService::new( inbound_network_receiver, inbound_ext_messages_receiver, Some(metrics), Some(net_metrics), );start<F>(channel: (RoutingService, InstrumentedReceiver<Command>), metrics: Option<BlockProductionMetrics>, node_factory: F) -> (Self, JoinHandle<()>)
Starts the main routing service loop that listens for commands and manages node threads.Parameters:
channel— Tuple with theRoutingServiceand command receiver.metrics— Optional metrics.node_factory— Factory function to create node instances. Must implement the specified signature for node creation.
Returns:
The
RoutingServiceinstance and a join handle for the main routing thread.
join_thread(&mut self, thread_id: ThreadIdentifier)
Sends a command to join an existing thread.stub() -> (Self, InstrumentedReceiver<Command>)(test only)
Creates a stubRoutingServicewith no real message forwarding, useful for testing.
Internal Functions (Private)
create_node_thread<F>(...) -> anyhow::Result<Node>
Creates a new node thread instance, establishes the required channels, and adds routes to the dispatcher.Parameters:
dispatcher: &mut Dispatcher— Reference to the dispatcher.feedback_sender: InstrumentedSender<ExtMsgFeedbackList>— Feedback sender channel.thread_identifier: ThreadIdentifier— ID of the thread to create.parent_block_id: Option<BlockIdentifier>— Optional parent block ID.node_factory: &mut F— Node factory function.ext_message_receiver: Receiver<WrappedMessage>— Receiver for external wrapped messages.metrics: Option<BlockProductionMetrics>— Optional metrics.
Returns:
Created node instance or error.
route(dispatcher: &Dispatcher, message: (NetworkMessage, SocketAddr), poisoned_queue: &mut PoisonedQueue)
Attempts to dispatch a network message via the dispatcher. If dispatch fails due to no route, the message is stored in the poisoned queue for retry later.inner_main_loop<F>(control: InstrumentedReceiver<Command>, feedback_sender: InstrumentedSender<ExtMsgFeedbackList>, mut dispatcher: Dispatcher, mut node_factory: F, metrics: Option<BlockProductionMetrics>) -> anyhow::Result<()>
The main control loop of the routing service which processes commands, manages nodes, routes messages, and handles thread lifecycle.inner_network_messages_forwarding_loop(inbound_network: InstrumentedReceiver<IncomingMessage>, cmd_sender: InstrumentedSender<Command>, net_metrics: Option<NetMetrics>) -> anyhow::Result<()>
Thread loop that receives network messages, extracts them, and forwards routing commands.inner_external_messages_forwarding_loop(inbound_ext_messages: InstrumentedReceiver<FeedbackMessage>, feedback_receiver: InstrumentedReceiver<ExtMsgFeedbackList>, cmd_sender: InstrumentedSender<Command>) -> anyhow::Result<()>
Thread loop that receives external messages with feedback, manages the feedback registry to detect duplicates, and forwards valid messages.inner_feedback_loop(feedback_receiver: InstrumentedReceiver<ExtMsgFeedbackList>, feedback_registry: Arc<Mutex<FeedbackRegistry>>) -> anyhow::Result<()>
Dedicated loop to handle sending feedback to the original requesters by using the feedback registry.
Implementation Details and Algorithms
Dispatcher-based Routing:
The service uses aDispatcherinstance that mapsThreadIdentifiers to message sender channels. Incoming network messages are dispatched by thread ID. If dispatch fails due to absence of a route, the message is temporarily stored in aPoisonedQueueto retry later.Thread Lifecycle Management:
RoutingServicelistens for commands to start or join threads. When starting a thread, it creates message channels, adds routes to the dispatcher, spawns a new node thread using the provided factory, and retains poisoned messages for potential re-dispatch.Feedback Handling for External Messages:
External messages carry optional feedback channels. The service maintains a thread-safefeedback_registrytracking message hashes and associated feedback senders. Duplicate messages are rejected with appropriate feedback errors. Feedback is asynchronously sent back via a dedicated feedback loop.Multithreaded Design:
The routing service runs several threads:Network message forwarding thread.
External messages forwarding thread.
Feedback handling thread.
Main routing thread that manages node threads and dispatching.
Shutdown Coordination:
The loops periodically check a globalSHUTDOWN_FLAGto gracefully exit on shutdown.Metrics Instrumentation:
Channels are instrumented with optional metrics to monitor throughput and performance.
Interaction with Other Components
Dispatcher (in
dispatchermodule)
Used for routing messages to appropriate thread handlers.Node (in
nodemodule)
Represents processing nodes spawned per thread for message execution.ExternalFileSharesBased (in
syncservice)
Used as a generic type parameter for constructingNodeinstances.WrappedMessage
Represents wrapped external messages passed to nodes.Metrics
Integrates withBlockProductionMetricsandNetMetricsfor instrumentation.SHUTDOWN_FLAG
Global flag used to coordinate graceful shutdown across threads.PoisonedQueue (in
poisoned_queuemodule)
Temporarily stores messages that cannot currently be dispatched.Telemetry Utils
Usesinstrumented_channeland extensions for channel instrumentation for metrics and tracing.
Visual Diagram
classDiagram
class RoutingService {
+cmd_sender: InstrumentedSender<Command>
+feedback_sender: InstrumentedSender<ExtMsgFeedbackList>
+new()
+start()
+join_thread()
-create_node_thread()
-route()
-inner_main_loop()
-inner_network_messages_forwarding_loop()
-inner_external_messages_forwarding_loop()
-inner_feedback_loop()
}
class Dispatcher {
+add_route()
+dispatch()
+has_route()
}
class Node {
+execute()
}
RoutingService --> Dispatcher : uses
RoutingService --> Node : manages nodes
RoutingService --> "InstrumentedSender<Command>" : sends commands
RoutingService --> "InstrumentedSender<ExtMsgFeedbackList>" : sends feedback
Usage Example
// Setup inbound receivers (e.g., from network stack)
let inbound_network_receiver = ...; // InstrumentedReceiver<IncomingMessage>
let inbound_ext_messages_receiver = ...; // InstrumentedReceiver<FeedbackMessage>
let metrics = Some(BlockProductionMetrics::new());
let net_metrics = Some(NetMetrics::new());
// Create routing service and forwarding threads
let (routing_service, cmd_receiver, network_thread, ext_thread) = RoutingService::new(
inbound_network_receiver,
inbound_ext_messages_receiver,
metrics.clone(),
net_metrics.clone(),
);
// Define node factory function
let node_factory = |parent_block: Option<BlockIdentifier>,
thread_id: &ThreadIdentifier,
incoming_rx,
authority_rx,
incoming_tx,
authority_tx,
feedback_tx,
ext_rx| {
// Create and return node instance
Node::new(...)
};
// Start main routing service loop
let (service, main_thread) = RoutingService::start((routing_service, cmd_receiver), metrics, node_factory);
// Start handling threads and messages...
Trait Implementation
The RoutingService implements the Subscriber trait from threads_tracking_service:
handle_start_thread(...)
Sends a StartThread command to start message handling for a new thread.handle_stop_thread(...)
No operation implemented since thread stops are handled implicitly by thread exit.
This integration allows the routing service to react to thread lifecycle events within the threading tracking infrastructure.
Please refer to the topics related to Network Message Routing, Thread Management, External Message Feedback, and Dispatcher Pattern for further context on how this service fits within the overall system.