WebSocket Event Subscription
Overview
The **WebSocket Event Subscription** module provides real-time subscription management for blockchain events by leveraging WebSocket client connections, a subscription registry, and event dispatching mechanisms. Its primary purpose is to enable clients to subscribe to blockchain transaction events related to specific addresses and receive immediate updates without polling.
This module solves the problem of efficiently managing multiple client subscriptions over WebSocket connections while disseminating relevant blockchain transaction data only to interested subscribers. It abstracts the complexity of handling WebSocket connections, subscription lifecycles, and event broadcasting, ensuring scalable and responsive real-time data delivery.
Core Concepts and Purpose
Connection Handling: Manages individual WebSocket client connections, processing subscription requests, maintaining active subscriptions, and sending event notifications.
Subscription Registry: Maintains a centralized mapping of subscribed addresses to connected clients and their subscriptions, enabling efficient broadcasting of transaction events.
Event Dispatching: Upon receiving blockchain transaction or block events from upstream sources (e.g., node indexers), the module identifies affected addresses and routes notifications to subscribed clients.
Topic-Based Subscription: Supports subscription to specific topics such as transaction events (
txs), allowing extensibility for additional event types.
Key Functionalities and Workflow
1. WebSocket Connection Lifecycle and Message Handling
Each incoming WebSocket connection from a client is wrapped within a `ConnectionHandler` instance. This handler manages the connection's lifecycle, including:
Client Identification: Assigns a unique
clientId(UUID) to each connection.Ping/Pong Heartbeat: Implements a heartbeat mechanism to detect dropped connections using WebSocket ping/pong frames and timeouts.
Message Processing: Listens for JSON messages from the client representing subscription commands (
subscribe,unsubscribe,ping).Subscription Management: Routes subscription requests to appropriate handlers based on the topic (currently supports
txsfor transaction subscriptions).
Example snippet illustrating subscription routing:
this.routes = {
txs: {
subscribe: (subscriptionId, data) => this.handleSubscribeTxs(subscriptionId, data),
unsubscribe: (subscriptionId, data) => this.handleUnsubscribeTxs(subscriptionId, data),
},
}
2. Subscription Registration and Deregistration
When a client subscribes to transaction events for specific addresses, the `ConnectionHandler` adds the subscription to its internal set and delegates the registration to the centralized `Registry`. Similarly, unsubscription requests remove the subscription and update the registry.
This ensures:
Each subscription is uniquely tracked by a combination of
clientIdandsubscriptionId.The registry maintains efficient lookup mappings from addresses to client connections.
Snippet illustrating subscription registration:
this.subscriptionIds.add(subscriptionId)
this.registry.subscribe(this.clientId, subscriptionId, this, data.addresses)
this.client.subscribeAddresses(this.registry.getAddresses())
3. Centralized Subscription Registry
The `Registry` class acts as the core data structure managing all active subscriptions:
Clients Map: Maps a combined subscription key (
clientId:subscriptionId) to a set of subscribed addresses.Addresses Map: Maps each address to a map of subscription keys and their respective
ConnectionHandlerinstances.
This dual mapping allows:
Efficient subscription and unsubscription operations.
Quick identification of all clients interested in a transaction involving a given address.
Example data structures:
private clients: Record<string, Set<string>> = {}
private addresses: Record<string, Map<string, ConnectionHandler>> = {}
4. Event Handling and Broadcasting
When new blockchain blocks or transactions are received (from upstream WebSocket clients connected to blockchain indexers), the `Registry` processes these events:
Extracts addresses involved in the transaction using a configured
TransactionHandler.For each address, publishes the transaction data to all subscribed clients’ connections.
Example publishing logic:
private publishTransaction(addresses: string[], tx: unknown) {
addresses.forEach((address) => {
if (!this.addresses[address]) return
for (const [id, connection] of this.addresses[address].entries()) {
const { subscriptionId } = Registry.fromId(id)
connection.publish(subscriptionId, address, tx)
}
})
}
Interactions with Other System Components
Upstream Blockchain Indexer WebSocket Client: The module interacts with a WebSocket client connected to blockchain indexers or nodes to receive real-time block and transaction data.
Transaction and Block Handlers: The
Registryis instantiated with customtransactionHandlerand optionalblockHandlerfunctions to parse incoming blockchain events into address sets and transaction payloads.WebSocket Clients (API Layer): External clients connect to the API WebSocket endpoints, managed individually by
ConnectionHandlerinstances.Metrics and Logging: The
ConnectionHandlerintegrates with Prometheus for metrics (e.g., active WebSocket count) and uses a structured logger for error and connection event tracing.
Design Patterns and Approaches
Observer Pattern: The subscription registry acts as a subject, notifying observers (
ConnectionHandlerinstances) when relevant events occur.Pub-Sub Model: Clients subscribe to topics (addresses), and the system publishes events only to those subscribers.
Separation of Concerns: Connection management (
ConnectionHandler) is separated from subscription state management (Registry), enhancing modularity and maintainability.Robustness: Heartbeat mechanism and error handling ensure stale or broken connections are cleaned up promptly.
Code References Summary
ConnectionHandler (node/coinstacks/common/api/src/websocket.ts): Manages per-client WebSocket connections, subscription commands, and event publishing.
Registry (node/coinstacks/common/api/src/registry.ts): Maintains centralized subscription mappings and dispatches transaction events to subscribers.
Solana WebSocket Client (node/coinstacks/solana/api/src/websocket.ts): Example of a blockchain-specific WebSocket client integrating with this system, handling blockchain event messages and forwarding them through the registry.
EVM and UTXO Services: While not directly part of WebSocket subscription, these services generate and handle transaction data consumed by the event subscription system.
Mermaid Sequence Diagram: WebSocket Event Subscription Flow
sequenceDiagram
participant Client as WebSocket Client
participant Conn as ConnectionHandler
participant Reg as Registry
participant Indexer as Blockchain Indexer WS Client
Client->>Conn: Send {"method":"subscribe","subscriptionId":"sub1","data":{"topic":"txs","addresses":["addr1"]}}
Conn->>Reg: subscribe(clientId, "sub1", Conn, ["addr1"])
Reg-->>Conn: (updates internal maps)
Conn->>Indexer: subscribeAddresses(registry.getAddresses())
Indexer->>Reg: New Transaction Event (tx involving addr1)
Reg->>Reg: Extract addresses from tx
Reg->>Conn: publish("sub1", "addr1", tx)
Conn->>Client: Send tx event message
Client->>Conn: Send {"method":"unsubscribe","subscriptionId":"sub1","data":{"topic":"txs","addresses":["addr1"]}}
Conn->>Reg: unsubscribe(clientId, "sub1", ["addr1"])
Reg-->>Conn: (updates internal maps)
Conn->>Indexer: subscribeAddresses(registry.getAddresses())
Client->>Conn: Disconnect
Conn->>Reg: unsubscribe all subscriptions for clientId
Reg-->>Conn: (cleans up)
This documentation captures the core purpose, architecture, and workflow of the WebSocket Event Subscription module, highlighting its role in providing scalable, real-time blockchain event delivery to subscribed clients.