registry.ts
Overview
The **registry.ts** file implements a centralized **Registry** class designed to manage WebSocket client subscriptions to blockchain address events. It tracks which connected clients have subscribed to updates for specific blockchain addresses and forwards new transaction data relevant to those addresses directly to the subscribed clients.
This registry serves as a crucial component in a real-time blockchain event subscription system, enabling efficient and scalable delivery of transaction notifications over WebSocket connections. It maintains bidirectional mappings between clients and addresses and integrates with customizable handlers to process incoming block and transaction messages.
Detailed Explanation
Type Definitions
AddressFormatter
type AddressFormatter = (address: string) => stringA function type used to normalize addresses (e.g., converting to lowercase) for consistent matching within the registry.
BlockHandler<T, T2>
type BlockHandler<T = any, T2 = Array<unknown>> = (block: T) => Promise<{ txs: T2 }>An asynchronous function that processes a block message and returns an object containing a list of transactions (
txs). This handler is optional in the Registry and can be used to handle new block events.
TransactionHandler<T, T2>
type TransactionHandler<T = any, T2 = unknown> = (tx: T) => Promise<{ addresses: Array<string>; tx: T2 }>An asynchronous function that processes a transaction message and returns an object containing:
addresses: an array of blockchain addresses involved in the transaction.tx: a transaction payload in a standardized format.
This handler is mandatory and essential for extracting addresses to notify subscribers.
Internal Type Guard
isTxWithAddresses(tx: unknown): tx is { addresses: Array; tx: unknown }
A type guard function to check if an object has the structure
{ addresses: string[]; tx: unknown }. This is used internally to distinguish between raw transactions and those already associated with addresses.
Interface: RegistryArgs
interface RegistryArgs {
addressFormatter?: AddressFormatter
blockHandler?: BlockHandler
transactionHandler: TransactionHandler
}
The constructor argument interface for the `Registry` class, allowing injection of:
addressFormatter- optional function to normalize addresses.blockHandler- optional asynchronous function to handle block messages.transactionHandler- required asynchronous function to handle transaction messages.
Class: Registry
class Registry
Purpose
Manages WebSocket client subscriptions by tracking which clients have subscribed to which blockchain addresses and dispatching new transaction data to those clients based on their subscriptions.
Properties
clients: Record<string, Set>
Maps a subscription ID (combined client and subscription ID) to a set of subscribed addresses.
addresses: Record<string, Map<string, ConnectionHandler>>
Maps each blockchain address to a
Mapof subscription IDs and their corresponding WebSocket connection handlers.handleBlock?: BlockHandler
Optional block handling function injected during construction.
handleTransaction: TransactionHandler
Mandatory transaction handling function injected during construction.
formatAddress: AddressFormatter
Function to normalize addresses. Defaults to lowercasing.
logger: Logger
Logger instance scoped to the registry for error and event logging.
Constructor
constructor(args: RegistryArgs)
Initializes the registry with optional
blockHandler, requiredtransactionHandler, and optionaladdressFormatter.Sets the default address formatter to lowercase if none is provided.
Static Methods
toId(clientId: string, subscriptionId: string): string
Combines
clientIdandsubscriptionIdinto a single string key format:"clientId:subscriptionId".fromId(id: string): { clientId: string; subscriptionId: string }
Parses the combined ID string back into
clientIdandsubscriptionId.
Public Methods
getAddresses(): Array
Returns a list of all addresses currently tracked in the registry (i.e., addresses with at least one subscriber).
Usage Example:
const addresses = registry.getAddresses() console.log(addresses)
subscribe(clientId: string, subscriptionId: string, connection: ConnectionHandler, addresses: Array): void
Registers a subscription for a client's WebSocket connection to a list of blockchain addresses.
Parameters:
clientId: Unique identifier for the client.subscriptionId: Unique identifier for the subscription (can represent a specific subscription context or filter).connection: The WebSocket connection handler for the client.addresses: Array of blockchain addresses to subscribe to.
**Behavior:**
Normalizes each address.
Updates the
clientsmap to associate the combined ID with the addresses.Updates the
addressesmap to associate each address with the connection.Creates new entries if needed.
**Usage Example:**
registry.subscribe('client-123', 'sub-abc', connectionHandler, ['addr1', 'addr2'])
unsubscribe(clientId: string, subscriptionId: string, addresses: Array): void
Removes subscription(s) for a client from specified addresses or all if the address list is empty.
Parameters:
clientId: Unique client identifier.subscriptionId: Subscription identifier.addresses: Addresses to unsubscribe from. If empty, unsubscribe from all addresses associated with this subscription.
**Behavior:**
Normalizes addresses.
Removes addresses from the client's subscription set.
Removes clients from address maps.
Cleans up empty entries to avoid memory leaks.
**Usage Example:**
// Unsubscribe from specific addresses registry.unsubscribe('client-123', 'sub-abc', ['addr1']) // Unsubscribe from all addresses for this subscription registry.unsubscribe('client-123', 'sub-abc', [])
onBlock(msg: unknown): Promise
Processes an incoming blockchain block message.
Uses the optional
blockHandlerto extract transaction data.For each transaction, publishes relevant transactions to subscribed clients.
If transactions do not contain addresses, falls back to processing as individual transactions.
**Returns:** Promise resolving when processing is complete.
**Usage Example:**
await registry.onBlock(blockMessage)
onTransaction(msg: unknown): Promise
Processes an incoming blockchain transaction message.
Uses
transactionHandlerto extract addresses and transaction payload.Publishes the transaction to all subscribed clients for those addresses.
**Returns:** Promise resolving when processing is complete.
**Usage Example:**
await registry.onTransaction(transactionMessage)
Private Methods
publishTransaction(addresses: string[], tx: unknown): void
Sends a transaction payload to all clients subscribed to any of the provided addresses.
Normalizes each address.
For each address, looks up all subscribed connections.
Calls the
publishmethod on each connection handler with the subscription ID, address, and transaction payload.
Important Implementation Details
Bidirectional Mapping for Efficiency:
The registry maintains two maps:clientsmap from subscription keys (clientId:subscriptionId) to subscribed addresses.addressesmap from each address to a map of subscription keys and their WebSocket connection handlers.
This design enables quick subscription/unsubscription updates and efficient event dispatching.
Address Normalization:
All addresses are normalized using a formatter (default lowercase) to avoid mismatches due to case sensitivity or formatting differences.Robust Subscription Management:
Unsubscription logic cleans up empty client and address entries to prevent stale data.Error Handling and Logging:
Errors in processing blocks or transactions are caught and logged without crashing the system.Asynchronous Processing:
Block and transaction handlers are asynchronous, allowing integration with external APIs or databases if needed.Flexible Handlers:
The registry relies on injected custom handlers for processing raw blockchain data, enabling blockchain-agnostic usage.
Interaction with Other System Components
ConnectionHandler (from './websocket'):
Represents individual WebSocket client connections. The Registry stores references to these handlers to send messages to subscribed clients.Upstream Blockchain WebSocket or Event Source:
The Registry receives raw block and transaction messages from blockchain nodes or indexers via WebSocket clients external to this module.Custom Transaction and Block Handlers:
These handlers parse raw blockchain messages into address lists and transaction payloads compatible with the subscription model.Logging System:
Integrates with a structured logger for observability.
Usage Example
import { Registry } from './registry'
import { ConnectionHandler } from './websocket'
const transactionHandler = async (tx: any) => {
// Extract addresses and transaction data from raw tx
return {
addresses: tx.addresses,
tx,
}
}
const registry = new Registry({
transactionHandler,
})
// WebSocket connection from a client
const connection = new ConnectionHandler()
// Client subscribes to addresses
registry.subscribe('client-uuid', 'sub-1', connection, ['addr1', 'addr2'])
// Incoming transaction from blockchain node
await registry.onTransaction(incomingTxMessage)
Visual Diagram
classDiagram
class Registry {
-clients: Record<string, Set<string>>
-addresses: Record<string, Map<string, ConnectionHandler>>
-handleBlock?: BlockHandler
-handleTransaction: TransactionHandler
-formatAddress: AddressFormatter
-logger: Logger
+constructor(args: RegistryArgs)
+subscribe(clientId: string, subscriptionId: string, connection: ConnectionHandler, addresses: string[]): void
+unsubscribe(clientId: string, subscriptionId: string, addresses: string[]): void
+getAddresses(): string[]
+onBlock(msg: unknown): Promise<void>
+onTransaction(msg: unknown): Promise<void>
-publishTransaction(addresses: string[], tx: unknown): void
+static toId(clientId: string, subscriptionId: string): string
+static fromId(id: string): { clientId: string; subscriptionId: string }
}
class ConnectionHandler {
<<imported>>
}
Registry "1" -- "*" ConnectionHandler : manages
Summary
The **registry.ts** file implements the `Registry` class, a core component responsible for:
Tracking WebSocket client subscriptions on a per-address basis.
Managing subscription and unsubscription requests.
Processing incoming blockchain block and transaction events.
Publishing relevant transaction data only to subscribed clients.
It achieves this through efficient bidirectional maps, address normalization, and pluggable handlers, enabling real-time and scalable blockchain event delivery over WebSocket connections. The Registry integrates tightly with WebSocket connection handlers and blockchain event sources to form a robust subscription delivery system.