websocket.go
Overview
The [websocket.go](/projects/291/69273) file provides the implementation of a WebSocket connection handler tailored for the **Unchained API server**. It manages individual WebSocket client connections, processes incoming messages (such as subscription requests), maintains connection health through heartbeat mechanisms (ping/pong), and facilitates asynchronous message delivery from server-side event handlers to connected clients.
Key features include:
Managing lifecycle of WebSocket connections (start, stop, cleanup).
Handling client subscriptions to topics/addresses.
Sending and receiving JSON-formatted messages conforming to defined payload structures.
Implementing WebSocket protocol heartbeat (ping/pong) to ensure connection liveness.
Integration with a subscription manager (
Registrar) and a connection manager (Manager) for orchestrating multiple clients and subscriptions.
This file is critical for enabling real-time, event-driven communication between clients and the Unchained API via WebSockets.
Detailed Explanation
Types and Constants
Constants
Constant | Description |
|---|---|
`writeWait` | Maximum duration for writing a message (15 seconds) |
`readWait` | Maximum duration to wait for a new message (15 seconds) |
`pingPeriod` | Frequency of sending ping messages (90% of `readWait`) |
`maxMessageSize` | Maximum size of inbound message payload (1024 bytes) |
Structs
RequestPayload
Represents the structure of a JSON message received from the client.
type RequestPayload struct {
Data struct {
Topic string `json:"topic"`
Addresses []string `json:"addresses"`
} `json:"data"`
Method string `json:"method"`
SubscriptionID string `json:"subscriptionId"`
}
Fields:
Data.Topic: Topic string clients subscribe to (currently unused).Data.Addresses: List of addresses to subscribe/unsubscribe.Method: The type of request, e.g.,"subscribe","unsubscribe","ping".SubscriptionID: Unique ID for the subscription request.
ErrorResponse
Represents the JSON structure for sending error messages to clients.
type ErrorResponse struct {
Message string `json:"message"`
SubscriptionID string `json:"subscriptionId"`
Type string `json:"type"` // Always set to "error"
}
Fields:
Message: Describes the error.SubscriptionID: The related subscription ID (if any).Type: Identifies the message type, always"error".
MessageResponse
Represents the JSON structure for sending data messages to clients.
type MessageResponse struct {
Address string `json:"address"`
Data interface{} `json:"data"`
SubscriptionID string `json:"subscriptionId"`
}
Fields:
Address: The address the message relates to.Data: Payload data (can be any type).SubscriptionID: The subscription this message belongs to.
Connection
Represents a single WebSocket client connection.
type Connection struct {
clientID string
conn *websocket.Conn
doneChan chan interface{}
handler Registrar
manager *Manager
msgChan chan []byte
subscriptionIDs map[string]struct{}
ticker *time.Ticker
m sync.Mutex
}
Fields:
clientID: Unique UUID identifying the client connection.conn: Underlying Gorilla WebSocket connection.doneChan: Signals connection termination.handler: Interface for subscription management (Registrar).manager: Reference to the connection manager (Manager) managing all connections.msgChan: Channel for outbound messages to the client.subscriptionIDs: Set of subscription IDs active on this connection.ticker: Ticker used to trigger periodic ping messages.m: Mutex to synchronize writes to the WebSocket.
Functions and Methods
NewConnection
func NewConnection(conn *websocket.Conn, handler Registrar, manager *Manager) *Connection
Purpose: Creates a new
Connectioninstance, initializes its fields, and registers it with theManager.Parameters:
conn: The established Gorilla WebSocket connection.handler: Subscription handler implementingRegistrarinterface.manager: The manager responsible for all connections.
Returns: Pointer to a newly created
Connection.Usage Example:
newConn := NewConnection(wsConn, subscriptionHandler, connectionManager)
(*Connection) Start
func (c *Connection) Start()
Purpose: Starts the connection lifecycle. Sets up read/write deadlines, ping/pong handlers, and launches goroutines for reading, writing, pinging, and cleanup.
Details:
Configures WebSocket read limits and deadlines.
Sets a ping handler to respond with pong messages.
Sets a pong handler to reset read deadlines.
Starts a ticker to send ping messages periodically.
Starts goroutines:
read(): Read incoming messages from client.write(): Write outbound messages to client.Anonymous function: Periodic ping sender.
cleanup(): Cleanup resources on connection close.
Usage: Called immediately after creating a new connection to begin processing.
(*Connection) Stop
func (c *Connection) Stop()
Purpose: Stops the connection. Unsubscribes all active subscriptions and unregisters the connection from the manager.
Details:
Iterates over
subscriptionIDsand callshandler.Unsubscribe.Removes connection from the manager's registry if still registered.
Logs the closure.
Usage: Called when the connection must be gracefully terminated (e.g., client disconnect, error).
(*Connection) cleanup
func (c *Connection) cleanup()
Purpose: Waits for termination signal on
doneChan, stops the ping ticker, sends close message, closes the WebSocket, and closes the message channel.Usage: Runs as a goroutine started within
Start().
(*Connection) send
func (c *Connection) send(messageType int, data []byte) error
Purpose: Thread-safe method to write a message to the WebSocket connection.
Parameters:
messageType: WebSocket message type (e.g.,websocket.TextMessage).data: The message payload as bytes.
Returns: Any error encountered during write.
Implementation Detail: Uses a mutex to ensure serialized writes.
(*Connection) read
func (c *Connection) read()
Purpose: Continuously reads messages from the client, unmarshals them, and performs actions based on the
Methodfield.Behavior:
On
"ping"method: Responds with"pong".On
"subscribe": Registers subscription viahandler.Subscribeand tracks subscription ID.On
"unsubscribe": Unsubscribes by ID or unsubscribes all if no ID specified.On unknown methods: Sends error response.
On read error or unmarshal failure: Stops connection or sends error respectively.
Usage: Runs as a goroutine.
(*Connection) write
func (c *Connection) write()
Purpose: Listens for messages on
msgChanand writes them to the WebSocket connection.Details: Sets write deadlines, logs errors on failure.
Usage: Runs as a goroutine.
(*Connection) writeError
func (c *Connection) writeError(message string, subscriptionID string)
Purpose: Sends a JSON-formatted error message to the client.
Parameters:
message: Error description.subscriptionID: Associated subscription ID, if any.
Details: Marshals an
ErrorResponseand sends it with a write deadline.
Important Implementation Details and Algorithms
Heartbeat Management: Uses WebSocket ping and pong control messages to detect dead connections. The client must respond to pings with pongs within the read deadline interval, or the connection is closed.
Subscription Tracking: Maintains a map of active subscription IDs per connection to allow targeted unsubscribe operations and cleanup on disconnect.
Concurrency:
Uses goroutines for separate reading, writing, pinging, and cleanup processes to avoid blocking.
Synchronizes writes with a mutex to prevent concurrent write conflicts.
Error Handling: Robust error handling during read/write, JSON unmarshalling, and subscription operations ensures graceful degradation and informative client feedback.
Integration with
RegistrarandManagerinterfaces: Delegates subscription logic and connection management to external components, promoting separation of concerns.
Interaction with Other System Components
RegistrarInterface:
Thehandlerfield implements this interface, responsible for managing subscription logic:Subscribe(clientID, subscriptionID, addresses, msgChan)Unsubscribe(clientID, subscriptionID, addresses, msgChan)
`Connection` delegates subscription control to this handler.
ManagerType:
The connection manager maintains all active connections:registerchannel to add new connections.unregisterchannel to remove connections.Maintains a map of active connections.
`Connection` registers/unregisters itself with this manager.
Logging:
Utilizes internal logging (log.WithoutFields()) for debug and error messages.
This file is a core part of the WebSocket communication layer, enabling client subscription management, message handling, and connection lifecycle management within the Unchained API server.
Visual Diagram
classDiagram
class Connection {
-clientID: string
-conn: *websocket.Conn
-doneChan: chan interface{}
-handler: Registrar
-manager: *Manager
-msgChan: chan []byte
-subscriptionIDs: map[string]struct{}
-ticker: *time.Ticker
-m: sync.Mutex
+Start()
+Stop()
+read()
+write()
+send(messageType int, data []byte) error
+writeError(message string, subscriptionID string)
+cleanup()
}
class RequestPayload {
+Data: struct { Topic string; Addresses []string }
+Method: string
+SubscriptionID: string
}
class ErrorResponse {
+Message: string
+SubscriptionID: string
+Type: string
}
class MessageResponse {
+Address: string
+Data: interface{}
+SubscriptionID: string
}
Connection --> Registrar : handler (interface)
Connection --> Manager : manager (manages connections)
Usage Example
// Assume wsConn is an established *websocket.Conn,
// subscriptionHandler implements Registrar,
// and connectionManager manages all connections.
conn := websocket.NewConnection(wsConn, subscriptionHandler, connectionManager)
conn.Start()
// When done or on error
conn.Stop()
Summary
The [websocket.go](/projects/291/69273) file encapsulates the management of WebSocket connections to the Unchained API server. It provides a robust, concurrent, and maintainable structure to handle real-time client subscriptions, message exchange, and connection health monitoring, relying on external subscription and connection management components for broader system integration.