websocket.go
Overview
The [websocket.go](/projects/291/69287) file implements a WebSocket client for interacting with a CometBFT (formerly Tendermint) blockchain node via its WebSocket RPC interface. It establishes and manages a persistent WebSocket connection to subscribe to blockchain events such as new blocks and transactions, processes these events, and provides extensible hooks for handling them.
This client integrates tightly with the application's block service and event processing logic, enabling real-time event-driven workflows based on blockchain state changes. It also handles reconnection logic, subscription management, and buffering of transactions whose corresponding block data is not yet available.
Detailed Explanation
Types and Constants
Constants
writeWait(15s): Maximum duration to wait for a write to the WebSocket.readWait(15s): Maximum duration to wait for a read from the WebSocket.pingPeriod(13.5s): Interval to send ping messages, set to 90% ofreadWait.resetTimeout(30s): Timeout duration after which the WebSocket client will reset subscriptions to avoid stale connections.
Type Aliases
TxHandlerFunc: Function signature for processing a transaction event.func(tx types.EventDataTx, block *BlockResponse) (interface{}, []string, error)tx: The transaction event data.block: The associated block information.Returns:
interface{}: Data to be published.[]string: Addresses (topics) to publish to.error: Error if any occurred.
BlockEventHandlerFunc: Function signature for processing block-related events.func(eventCache map[string]interface{}, blockHeader types.Header, blockEvents []ABCIEvent, eventIndex int) (interface{}, []string, error)eventCache: A map to cache event data across multiple calls.blockHeader: The block header.blockEvents: List of ABCI events for the block.eventIndex: Current event index in processing.Returns:
Data, addresses, and error analogous to
TxHandlerFunc.
NewBlockHandlerFunc: Function signature for handling new block events.func(newBlock types.EventDataNewBlock, blockEvents []ABCIEvent)newBlock: The new block event.blockEvents: Associated ABCI events.
Struct: WSClient
The core WebSocket client struct that encapsulates the connection, subscriptions, and event handling logic.
type WSClient struct {
*websocket.Registry // Embeds a Pub/Sub registry for event broadcasting
blockService *BlockService // Service managing block data storage and retrieval
client *cometbft.WSClient // Underlying CometBFT WebSocket client
encoding *params.EncodingConfig // Encoding configuration for transaction/block data
errChan chan<- error // Channel for reporting errors
m sync.RWMutex // Mutex protecting shared state
t *time.Timer // Timer for resetting subscriptions
txHandler TxHandlerFunc // Transaction event handler callback
blockEventHandler BlockEventHandlerFunc // Block event handler callback
newBlockHandlers []NewBlockHandlerFunc // List of new block event handlers
unhandledTxs map[int][]types.EventDataTx // Buffered TXs awaiting block data
}
Key Fields:
Registry: Enables publishing data to subscribed clients.blockService: Used to fetch block info for transactions.client: Manages WebSocket connection and subscriptions.txHandler,blockEventHandler,newBlockHandlers: Allow users to register custom callbacks for event processing.unhandledTxs: Buffers transactions received before their block data is available, keyed by block height.
Functions and Methods
Constructor
NewWebsocketClient(conf Config, blockService *BlockService, errChan chan<- error) (*WSClient, error)
Creates and initializes a new `WSClient`.
Parameters:
conf: Configuration containing the WebSocket URL and encoding parameters.blockService: Reference to the block service managing block data.errChan: Channel to report errors asynchronously.
Returns:
Pointer to a
WSClient.Error if creation or connection setup fails.
Behavior:
Parses the WebSocket URL.
Instantiates a CometBFT WebSocket client.
Sets default connection parameters (dialer, timeouts, reconnect attempts).
Registers a default new block handler (
ws.handleNewBlock).Sets up reconnect logic with automatic resubscription.
Usage example:
errChan := make(chan error) wsClient, err := NewWebsocketClient(config, blockService, errChan) if err != nil { log.Fatalf("Failed to create WebSocket client: %v", err) }
Connection Management
(ws *WSClient) Start() error
Starts the WebSocket client, initiates subscriptions, and begins listening for events asynchronously.
Returns: Error on failure to start or subscribe.
Usage:
if err := wsClient.Start(); err != nil { log.Fatalf("WebSocket client start error: %v", err) }
(ws *WSClient) Stop()
Stops the WebSocket client gracefully.
Note: Errors during stop are logged but do not propagate.
Event Handler Registration
(ws *WSClient) TxHandler(fn TxHandlerFunc)
Registers a callback for handling transaction events.
(ws *WSClient) BlockEventHandler(fn BlockEventHandlerFunc)
Registers a callback for processing block-related events.
(ws *WSClient) NewBlockHandler(fn NewBlockHandlerFunc)
Adds a new block event handler to the list.
Accessors
(ws *WSClient) EncodingConfig() params.EncodingConfig
Returns the encoding configuration used by the client.
Subscription Management
(ws *WSClient) subscribe() error
Subscribes the WebSocket client to transaction and new block events.
Implementation Detail:
Subscribes to two CometBFT event queries:
types.EventQueryTx.String()(transaction events)types.EventQueryNewBlock.String()(new block events)
Starts a reset timer (
resetTimeout) that triggers a subscription reset to maintain connection health.
(ws *WSClient) reset()
Resets the WebSocket subscriptions by unsubscribing from all and resubscribing.
Called:
When the reset timer expires.
When a subscription error code
-32000(client too slow) is encountered.
Event Listening and Processing
(ws *WSClient) listen()
Main goroutine that reads from the WebSocket client's response channel.
Workflow:
Listens for incoming event responses.
Handles errors and automatic resubscription on specific error codes.
Unmarshals event data into CometBFT types.
Dispatches events to the appropriate handlers:
Transaction events →
handleTxNew block events → calls all registered new block handlers concurrently.
Resets the subscription reset timer on valid events.
(ws *WSClient) handleTx(tx types.EventDataTx)
Processes individual transaction events.
Implementation Details:
Logs transaction hash (SHA256).
Checks if block data for the transaction's height is available.
If not, queues the transaction in
unhandledTxs.
If block data is available and
txHandleris registered:Calls
txHandlerto process the transaction.Publishes returned data to specified addresses.
(ws *WSClient) handleNewBlock(newBlock types.EventDataNewBlock, blockEvents []ABCIEvent)
Processes new block events.
Steps:
Creates a
BlockResponseobject with block height, hash, and timestamp.Stores the block via
blockService.If a
blockEventHandleris registered, processes each block event with it concurrently, publishing results.Processes any buffered transactions for the block height by invoking
handleTx.Removes buffered transactions for that block from
unhandledTxs.
Important Implementation Details and Algorithms
Subscription Reset Timer:
Uses a
time.Timer(ws.t) initialized at subscription time.The timer triggers
reset()every 30 seconds to unsubscribe and resubscribe, ensuring the connection remains active and avoids stale subscriptions.
Concurrency:
Event handlers are called in new goroutines to avoid blocking the main listen loop.
Mutex
mprotects concurrent access to the mapunhandledTxs.
Reconnection Handling:
On reconnect, subscriptions are re-established automatically.
Unhandled transaction buffer is cleared to avoid stale state.
Event Publishing:
The embedded
websocket.Registryprovides a publish-subscribe pattern that allows event handler callbacks to publish processed data to interested clients or components based on addresses/topics.
Interaction with Other System Parts
BlockService:
Provides block data storage and retrieval.
Critical for associating transactions with their corresponding blocks.
CometBFT JSON RPC Client:
Underlying WebSocket client from the CometBFT library used to connect and subscribe to blockchain events.
EncodingConfig:
Defines how blockchain data (transactions, blocks) are encoded/decoded, ensuring compatibility with the blockchain's serialization format.
Websocket Registry:
Provides a pub-sub mechanism for distributing processed events to other components or external clients.
Event Handlers (User-Provided):
Allow integration with business logic such as indexing, analytics, or notification systems.
Usage Example
// Create error channel
errChan := make(chan error)
// Instantiate block service (assumed implementation)
blockService := NewBlockService()
// Load configuration with WS URL and encoding
conf := Config{
WSURL: "ws://localhost:26657",
Encoding: encodingConfig,
}
// Create WebSocket client
wsClient, err := NewWebsocketClient(conf, blockService, errChan)
if err != nil {
log.Fatal(err)
}
// Register transaction handler
wsClient.TxHandler(func(tx types.EventDataTx, block *BlockResponse) (interface{}, []string, error) {
// Process transaction
return tx, []string{"tx_topic"}, nil
})
// Register block event handler
wsClient.BlockEventHandler(func(eventCache map[string]interface{}, header types.Header, events []ABCIEvent, index int) (interface{}, []string, error) {
// Process block event
return nil, nil, nil
})
// Start the client
if err := wsClient.Start(); err != nil {
log.Fatal(err)
}
// Handle errors asynchronously
go func() {
for err := range errChan {
log.Println("WebSocket error:", err)
}
}()
Mermaid Diagram: WSClient Structure
classDiagram
class WSClient {
+Registry
-blockService: *BlockService
-client: *cometbft.WSClient
-encoding: *params.EncodingConfig
-errChan: chan<- error
-m: sync.RWMutex
-t: *time.Timer
-txHandler: TxHandlerFunc
-blockEventHandler: BlockEventHandlerFunc
-newBlockHandlers: []NewBlockHandlerFunc
-unhandledTxs: map[int][]types.EventDataTx
+Start() error
+Stop()
+TxHandler(fn TxHandlerFunc)
+BlockEventHandler(fn BlockEventHandlerFunc)
+NewBlockHandler(fn NewBlockHandlerFunc)
+EncodingConfig() params.EncodingConfig
-subscribe() error
-reset()
-listen()
-handleTx(tx types.EventDataTx)
-handleNewBlock(newBlock types.EventDataNewBlock, blockEvents []ABCIEvent)
}
Summary
The [websocket.go](/projects/291/69287) file provides a robust, extensible WebSocket client for subscribing to and processing real-time blockchain events from a CometBFT node. It manages connection lifecycle, subscription renewals, and event dispatch, enabling developers to build responsive blockchain applications with event-driven architectures. The design emphasizes concurrency, fault tolerance, and integration with broader application services such as block storage and event publication.