registry.go
Overview
The [registry.go](/projects/291/69101) file defines a subscription registry system designed to manage client subscriptions to various address channels in a WebSocket context. The core functionality revolves around allowing clients to subscribe or unsubscribe to messages published on specific addresses and ensuring that published messages are correctly routed to all subscribed clients.
This file provides:
A
Registrarinterface defining subscription management methods.A
Registrystruct implementing theRegistrarinterface that maintains internal mappings from clients to addresses and vice versa.Methods to subscribe clients to addresses, unsubscribe them, and publish messages to all subscribers of given addresses.
The registry supports multiple clients subscribing to multiple addresses, with each client maintaining one or more subscriptions identified by subscription IDs. Messages are sent asynchronously to clients over dedicated channels.
Detailed Description
Interfaces and Types
Registrar Interface
type Registrar interface {
Subscribe(clientID string, subscriptionID string, addrs []string, msg chan<- []byte)
Unsubscribe(clientID string, subscriptionID string, addrs []string, msg chan<- []byte)
Publish(addrs []string, data interface{})
}
Purpose: Defines the contract for subscription management.
Methods:
Subscribe: Registers a client subscription to one or more addresses.Unsubscribe: Removes subscriptions for a client from one or more addresses or all if none specified.Publish: Sends a message to all clients subscribed to the provided addresses.
Structs
Registry
type Registry struct {
clients map[string]map[string]struct{}
addresses map[string]map[string]chan<- []byte
}
Purpose: Implements the
Registrarinterface to manage client subscriptions and message delivery.Fields:
clients: Maps a composite client-subscription ID (clientID:subscriptionID) to a set of subscribed addresses.addresses: Maps an address to a map of client-subscription IDs and their corresponding message channels.
**Note:** Composite IDs uniquely identify subscriptions per client.
Constructor
NewRegistry() *Registry
Creates and initializes a new `Registry` instance.
func NewRegistry() *Registry {
return &Registry{
clients: make(map[string]map[string]struct{}),
addresses: make(map[string]map[string]chan<- []byte),
}
}
Utility Functions
toID(clientID string, subscriptionID string) string
Combines `clientID` and `subscriptionID` into a single string identifier.
Parameters:
clientID: The unique identifier of the client.subscriptionID: The unique identifier of the subscription.
Returns: A string formatted as
"clientID:subscriptionID".
fromID(id string) (string, string)
Splits a composite ID back into `clientID` and `subscriptionID`.
Parameters:
id: The composite ID string.
Returns: Tuple of
(clientID, subscriptionID).
Methods
Subscribe(clientID string, subscriptionID string, addrs []string, msgChan chan<- []byte)
Registers a client subscription to multiple addresses and associates a message channel for delivering messages.
Parameters:
clientID: Unique client identifier.subscriptionID: Unique subscription identifier for the client.addrs: List of address strings to subscribe to.msgChan: Channel through which messages will be sent to the client.
Behavior:
Creates a composite ID from
clientIDandsubscriptionID.Ensures internal maps are initialized.
Adds each address to the client's subscription and associates the
msgChanwith each address for this subscription.
Usage Example:
msgChan := make(chan []byte)
registry.Subscribe("client123", "sub1", []string{"addr1", "addr2"}, msgChan)
Unsubscribe(clientID string, subscriptionID string, addrs []string, msgChan chan<- []byte)
Removes subscription of a client from specified addresses or all addresses if none provided.
Parameters:
clientID: Unique client identifier.subscriptionID: Unique subscription identifier.addrs: List of addresses to unsubscribe from. If empty, unsubscribe from all.msgChan: Message channel previously associated (not directly used inside but required by interface).
Behavior:
Computes composite ID.
If no addresses specified, removes the client subscription completely.
Otherwise, removes only the specified addresses.
Cleans up internal maps when no clients remain subscribed to an address.
Usage Example:
// Unsubscribe from specific addresses
registry.Unsubscribe("client123", "sub1", []string{"addr1"}, msgChan)
// Unsubscribe from all addresses (complete removal)
registry.Unsubscribe("client123", "sub1", []string{}, msgChan)
Publish(addrs []string, data interface{})
Sends a message to all clients subscribed to any of the given addresses.
Parameters:
addrs: List of addresses for which messages should be published.data: Arbitrary data payload to send.
Behavior:
For each address, iterates over subscribed clients.
Marshals a
MessageResponsestruct (assumed to be defined elsewhere) containing the address, data, and subscription ID.Sends the marshaled message to the subscriber’s message channel.
Logs errors if marshaling fails.
Usage Example:
registry.Publish([]string{"addr1"}, map[string]string{"msg": "Hello subscribers"})
Important Implementation Details
Composite IDs: The use of
clientID:subscriptionIDstrings as keys allows multiple subscriptions per client, distinguished by subscription IDs.Bidirectional Maps:
clientsmap lets the system know which addresses a client subscription is interested in.addressesmap maintains which clients are subscribed to a given address and provides their message channels.
Channel-based Messaging: Messages to clients are sent asynchronously via channels (
chan<- []byte), supporting non-blocking communication.Cleanup: Addresses are removed from the registry if no clients remain subscribed, preventing stale entries.
JSON Marshaling: Messages are serialized using
encoding/jsonbefore sending over the channels, assuming a specificMessageResponsestruct format.Logging: The code uses a
logger(not defined in this file) for debugging and error reporting.
Interaction with Other System Components
MessageResponse Struct: The code references
MessageResponsefor message formatting, which should be defined in another part of the system.Logger: Uses a
loggerinstance for debug/error logs; this implies integration with a centralized logging facility.WebSocket Layer: This registry likely acts as a middle layer between WebSocket connection handlers and message producers, managing subscriptions and delivering relevant messages.
Visual Diagram
classDiagram
class Registrar {
<<interface>>
+Subscribe(clientID, subscriptionID string, addrs []string, msg chan<- []byte)
+Unsubscribe(clientID, subscriptionID string, addrs []string, msg chan<- []byte)
+Publish(addrs []string, data interface{})
}
class Registry {
-clients map[string]map[string]struct{}
-addresses map[string]map[string]chan<- []byte
+Subscribe(clientID, subscriptionID string, addrs []string, msg chan<- []byte)
+Unsubscribe(clientID, subscriptionID string, addrs []string, msg chan<- []byte)
+Publish(addrs []string, data interface{})
}
Registrar <|.. Registry
Summary
The [registry.go](/projects/291/69101) file defines a robust subscription registry mechanism to manage client subscriptions and message delivery over WebSocket addresses. It uses composite IDs to identify subscriptions uniquely and maintains efficient bidirectional mappings to quickly add, remove, and notify subscribers. The asynchronous messaging via channels and JSON serialization enables flexible and scalable real-time communication.
This file is a foundational component for handling pub-sub style message distribution in WebSocket-based applications within the system.