websocket.go


Overview

The [websocket.go](/projects/291/69287) file implements a WebSocket client for interacting with a CometBFT (formerly Tendermint) blockchain node via its WebSocket RPC interface. It establishes and manages a persistent WebSocket connection to subscribe to blockchain events such as new blocks and transactions, processes these events, and provides extensible hooks for handling them.

This client integrates tightly with the application's block service and event processing logic, enabling real-time event-driven workflows based on blockchain state changes. It also handles reconnection logic, subscription management, and buffering of transactions whose corresponding block data is not yet available.


Detailed Explanation

Types and Constants


Struct: WSClient

The core WebSocket client struct that encapsulates the connection, subscriptions, and event handling logic.

type WSClient struct {
    *websocket.Registry               // Embeds a Pub/Sub registry for event broadcasting
    blockService      *BlockService   // Service managing block data storage and retrieval
    client            *cometbft.WSClient // Underlying CometBFT WebSocket client
    encoding          *params.EncodingConfig // Encoding configuration for transaction/block data
    errChan           chan<- error    // Channel for reporting errors
    m                 sync.RWMutex    // Mutex protecting shared state
    t                 *time.Timer     // Timer for resetting subscriptions
    txHandler         TxHandlerFunc   // Transaction event handler callback
    blockEventHandler BlockEventHandlerFunc // Block event handler callback
    newBlockHandlers  []NewBlockHandlerFunc  // List of new block event handlers
    unhandledTxs      map[int][]types.EventDataTx // Buffered TXs awaiting block data
}

Functions and Methods

Constructor

NewWebsocketClient(conf Config, blockService *BlockService, errChan chan<- error) (*WSClient, error)

Creates and initializes a new `WSClient`.


Connection Management

(ws *WSClient) Start() error

Starts the WebSocket client, initiates subscriptions, and begins listening for events asynchronously.

(ws *WSClient) Stop()

Stops the WebSocket client gracefully.


Event Handler Registration

(ws *WSClient) TxHandler(fn TxHandlerFunc)

Registers a callback for handling transaction events.

(ws *WSClient) BlockEventHandler(fn BlockEventHandlerFunc)

Registers a callback for processing block-related events.

(ws *WSClient) NewBlockHandler(fn NewBlockHandlerFunc)

Adds a new block event handler to the list.


Accessors

(ws *WSClient) EncodingConfig() params.EncodingConfig

Returns the encoding configuration used by the client.


Subscription Management

(ws *WSClient) subscribe() error

Subscribes the WebSocket client to transaction and new block events.

(ws *WSClient) reset()

Resets the WebSocket subscriptions by unsubscribing from all and resubscribing.


Event Listening and Processing

(ws *WSClient) listen()

Main goroutine that reads from the WebSocket client's response channel.

(ws *WSClient) handleTx(tx types.EventDataTx)

Processes individual transaction events.

(ws *WSClient) handleNewBlock(newBlock types.EventDataNewBlock, blockEvents []ABCIEvent)

Processes new block events.


Important Implementation Details and Algorithms


Interaction with Other System Parts


Usage Example

// Create error channel
errChan := make(chan error)

// Instantiate block service (assumed implementation)
blockService := NewBlockService()

// Load configuration with WS URL and encoding
conf := Config{
    WSURL:    "ws://localhost:26657",
    Encoding: encodingConfig,
}

// Create WebSocket client
wsClient, err := NewWebsocketClient(conf, blockService, errChan)
if err != nil {
    log.Fatal(err)
}

// Register transaction handler
wsClient.TxHandler(func(tx types.EventDataTx, block *BlockResponse) (interface{}, []string, error) {
    // Process transaction
    return tx, []string{"tx_topic"}, nil
})

// Register block event handler
wsClient.BlockEventHandler(func(eventCache map[string]interface{}, header types.Header, events []ABCIEvent, index int) (interface{}, []string, error) {
    // Process block event
    return nil, nil, nil
})

// Start the client
if err := wsClient.Start(); err != nil {
    log.Fatal(err)
}

// Handle errors asynchronously
go func() {
    for err := range errChan {
        log.Println("WebSocket error:", err)
    }
}()

Mermaid Diagram: WSClient Structure

classDiagram
    class WSClient {
        +Registry
        -blockService: *BlockService
        -client: *cometbft.WSClient
        -encoding: *params.EncodingConfig
        -errChan: chan<- error
        -m: sync.RWMutex
        -t: *time.Timer
        -txHandler: TxHandlerFunc
        -blockEventHandler: BlockEventHandlerFunc
        -newBlockHandlers: []NewBlockHandlerFunc
        -unhandledTxs: map[int][]types.EventDataTx
        +Start() error
        +Stop()
        +TxHandler(fn TxHandlerFunc)
        +BlockEventHandler(fn BlockEventHandlerFunc)
        +NewBlockHandler(fn NewBlockHandlerFunc)
        +EncodingConfig() params.EncodingConfig
        -subscribe() error
        -reset()
        -listen()
        -handleTx(tx types.EventDataTx)
        -handleNewBlock(newBlock types.EventDataNewBlock, blockEvents []ABCIEvent)
    }

Summary

The [websocket.go](/projects/291/69287) file provides a robust, extensible WebSocket client for subscribing to and processing real-time blockchain events from a CometBFT node. It manages connection lifecycle, subscription renewals, and event dispatch, enabling developers to build responsive blockchain applications with event-driven architectures. The design emphasizes concurrency, fault tolerance, and integration with broader application services such as block storage and event publication.