websocket.ts
Overview
The [websocket.ts](/projects/291/69055) file implements a robust **ConnectionHandler** class that manages individual WebSocket client connections within a real-time blockchain event subscription system. Its primary purpose is to handle client subscription requests, maintain connection health via heartbeats, and deliver blockchain transaction event updates to subscribed clients efficiently.
This file acts as the per-connection interface between external WebSocket clients and the centralized **Registry** that manages global subscription state. It supports topic-based subscriptions (currently only `'txs'` for transaction events) and integrates with other components including:
Registry: Manages the mapping of subscriptions to blockchain addresses.WebsocketClient: An underlying client that subscribes to blockchain data feeds.Prometheus: For metrics tracking (e.g., active websocket count).Logger: Structured logging for observability.
ConnectionHandler uses a ping/pong heartbeat mechanism to detect stale connections, and it cleanly manages subscription lifecycles to ensure accurate and efficient event delivery.
Detailed Breakdown
Interfaces and Types
RequestPayload
Represents the structure of incoming client messages to manage subscriptions or send ping commands.
interface RequestPayload { subscriptionId: string method: 'subscribe' | 'unsubscribe' | 'ping' data?: TxsTopicData }ErrorResponse
Standardized error message format sent to clients when requests fail validation or processing.
interface ErrorResponse { subscriptionId: string type: 'error' message: string }Topics
Defines valid subscription topics. Currently only supports
'txs'for transaction events.type Topics = 'txs'TxsTopicData
Payload data specific to the
'txs'topic, containing a list of blockchain addresses to subscribe to.interface TxsTopicData { topic: 'txs' addresses: Array<string> }MessageResponse
Structure of event messages pushed to clients upon relevant blockchain events.
interface MessageResponse { address: string data: unknown subscriptionId: string }Methods
Defines the signature for subscription and unsubscription handlers per topic.
interface Methods { subscribe: (subscriptionId: string, data?: any) => void unsubscribe: (subscriptionId: string, data?: any) => void }
Class: ConnectionHandler
Manages a single WebSocket connection, including subscription handling, message processing, heartbeat maintenance, and event publishing.
Properties
Property | Type | Description |
|---|---|---|
`clientId` | `string` | Unique UUID identifier for this connection instance. |
`websocket` | `WebSocket` | The underlying WebSocket connection object. |
`registry` | `Registry` | Central subscription registry for managing subscriptions. |
`client` | `WebsocketClient` | Client managing blockchain data feed subscriptions. |
`prometheus` | `Prometheus` | Metrics collector for monitoring WebSocket connections. |
`logger` | `Logger` | Logger scoped to this connection's namespace. |
`routes` | `Record` | Maps topics to their subscription/unsubscription handlers. |
`pingIntervalMs` | `number` | Interval in milliseconds for sending ping frames (default 10s). |
`pingTimeout` | `NodeJS.Timeout \ | undefined` |
`subscriptionIds` | `Set` | Tracks active subscription IDs for this connection. |
Constructor (private)
private constructor(
websocket: WebSocket,
registry: Registry,
client: WebsocketClient,
prometheus: Prometheus,
logger: Logger
)
Initializes the connection, assigning a
clientId.Sets up heartbeat ping/pong mechanism.
Defines routing for subscription methods based on topic.
Binds WebSocket event handlers:
onerror: Logs error and closes connection.onclose: Cleans up subscriptions and decrements metrics.onmessage: Processes incoming subscription messages.ping/pong: Manages heartbeat.
Static Method: start
static start(
websocket: WebSocket,
registry: Registry,
client: WebsocketClient,
prometheus: Prometheus,
logger: Logger
): void
Factory method to instantiate a new
ConnectionHandlerfor each incoming WebSocket connection.Usage example:
ConnectionHandler.start(ws, registryInstance, wsClient, prometheusInstance, loggerInstance);
Private Methods
heartbeat()
Maintains connection health by resetting the ping timeout whenever a pong frame is received.
If a pong is not received within
pingIntervalMs + 1000, terminates the WebSocket connection.
sendError(message: string, subscriptionId: string): void
Sends a standardized error response to the client.
Parameters:
message: Describes the error.subscriptionId: ID of the subscription related to the error.
onMessage(event: WebSocket.MessageEvent): void
Processes incoming JSON messages from the client.
Parses the message as
RequestPayload.Supports methods:
'ping': Responds with'pong'(fallback for browsers lacking ping/pong support).'subscribe'/'unsubscribe': Routes to the appropriate topic handler.
Validates presence of topic in message.
Sends errors if method or topic is invalid or missing.
close(interval: NodeJS.Timeout): void
Handles cleanup on WebSocket closure or error.
Clears heartbeat timeout and ping interval.
Unsubscribes all active subscriptions from the registry.
Clears internal subscription tracking.
Updates the
WebsocketClientwith current global subscription addresses.
handleSubscribeTxs(subscriptionId: string, data?: TxsTopicData): void
Handles subscription requests to the `'txs'` topic.
Validates presence of
subscriptionIdand at least one address.Adds subscription ID to internal set.
Registers the subscription with the registry.
Updates the
WebsocketClientwith current addresses.
Example usage snippet:
handleSubscribeTxs('sub1', { topic: 'txs', addresses: ['addr1', 'addr2'] });
handleUnsubscribeTxs(subscriptionId: string, data?: TxsTopicData): void
Handles unsubscription requests from the `'txs'` topic.
If
subscriptionIdis provided, removes it from internal tracking and unsubscribes addresses.If no
subscriptionIdprovided, unsubscribes all subscriptions for this client.Updates the
WebsocketClientwith current addresses.
publish(subscriptionId: string, address: string, data: unknown): void
Sends an event message to the client for a particular subscription.
Constructs a
MessageResponsewith the address, data, and subscription ID.Sends the serialized JSON message over the WebSocket connection.
Important Implementation Details
Heartbeat Mechanism: Regular ping frames are sent every 10 seconds to ensure the client is alive. A pong response resets a timeout; failure to receive pong results in connection termination to free resources.
Subscription Routing: Uses a topic-to-method mapping (
routes) to delegate subscription and unsubscription logic for different event types, enabling extensibility.Subscription Tracking: Stores active subscription IDs locally to ensure accurate cleanup on disconnect and to coordinate with the central registry.
Integration with Registry and Client: Delegates the actual subscription bookkeeping to the
Registryand updates the underlyingWebsocketClientwith the aggregate set of subscribed addresses for efficient upstream subscription management.Error Handling: Comprehensive validation and error responses guard against malformed client requests, ensuring robustness.
Interaction with Other System Components
Registry
The
ConnectionHandlercallsregistry.subscribe(...)andregistry.unsubscribe(...)to register and deregister subscriptions globally. The registry maintains mappings from addresses to client subscriptions and is responsible for broadcasting incoming blockchain events to all relevant clients.WebsocketClient
Acts as the interface to the upstream blockchain data sources. When subscriptions change,
ConnectionHandlerupdates theWebsocketClientwith the current addresses of interest viasubscribeAddresses().Prometheus
Tracks the number of active WebSocket connections using an increment/decrement counter.
Logger
Provides detailed logs scoped to each connection, aiding in monitoring and debugging connection lifecycle and message processing.
Usage Example
import WebSocket from 'ws'
import { ConnectionHandler } from './websocket'
import { Registry } from './registry'
import { WebsocketClient } from '@shapeshiftoss/websocket'
import { Prometheus } from './prometheus'
import { Logger } from '@shapeshiftoss/logger'
// Assume these instances are initialized appropriately
const registry = new Registry(...)
const wsClient = new WebsocketClient(...)
const prometheus = new Prometheus(...)
const logger = new Logger({ namespace: ['app'] })
// WebSocket server connection handler
const wss = new WebSocket.Server({ port: 8080 })
wss.on('connection', (ws) => {
ConnectionHandler.start(ws, registry, wsClient, prometheus, logger)
})
Mermaid Class Diagram
classDiagram
class ConnectionHandler {
+clientId: string
-websocket: WebSocket
-registry: Registry
-client: WebsocketClient
-prometheus: Prometheus
-logger: Logger
-routes: Record<Topics, Methods>
-pingIntervalMs: number
-pingTimeout?: NodeJS.Timeout
-subscriptionIds: Set~string~
+static start(websocket: WebSocket, registry: Registry, client: WebsocketClient, prometheus: Prometheus, logger: Logger): void
-constructor(websocket: WebSocket, registry: Registry, client: WebsocketClient, prometheus: Prometheus, logger: Logger)
-heartbeat(): void
-sendError(message: string, subscriptionId: string): void
-onMessage(event: WebSocket.MessageEvent): void
-close(interval: NodeJS.Timeout): void
-handleSubscribeTxs(subscriptionId: string, data?: TxsTopicData): void
-handleUnsubscribeTxs(subscriptionId: string, data?: TxsTopicData): void
+publish(subscriptionId: string, address: string, data: unknown): void
}
Summary
The [websocket.ts](/projects/291/69055) file is a critical component enabling scalable, real-time blockchain event subscriptions over WebSocket connections. The **ConnectionHandler** class encapsulates all per-client connection logic, including message parsing, subscription lifecycle management, heartbeat monitoring, and event dispatching. It works closely with the centralized **Registry** and underlying **WebsocketClient** to ensure efficient and accurate delivery of transaction event updates to subscribed clients.
Its design emphasizes:
Clear separation of concerns between connection state and global subscription management.
Robustness via heartbeat and error handling.
Extensibility through topic-based routing.
Integration with observability tools for monitoring active connections.
This organized approach ensures that clients receive timely, relevant blockchain data with minimal overhead and maximum reliability.