affiliateFees.go
Overview
The [affiliateFees.go](/projects/291/69279) file implements an **Affiliate Fee Indexer** for the Thorchain blockchain ecosystem. Its primary purpose is to **index and aggregate outbound affiliate fee transactions** by scanning blockchain events both historically and in real-time. Affiliate fees in Thorchain are paid to specific affiliate addresses when transactions occur, and this module tracks all such payments to enable transparent reporting and auditing.
Key features include:
Real-time indexing: Listens for new blocks via WebSocket, processing affiliate fee events as they appear.
Historic synchronization: Efficiently searches through past blocks using concurrent paginated queries to index all historic affiliate fee transactions.
Thread-safe storage: Maintains an internal collection of affiliate fee records, safely updated concurrently.
Concurrency and scalability: Uses worker pools and goroutines to optimize block fetching and event processing.
This indexer is a specialized component within the Thorchain-specific API layer, providing authoritative affiliate fee data that can be consumed by other services or APIs.
Detailed Documentation
Constants
Name | Description |
|---|---|
`blockWorkers` | Number of concurrent workers fetching blocks during historic sync (10). |
Number of results per page when querying blocks (50). | |
`resultWorkers` | Number of workers processing fetched block results concurrently (100). |
Types
AffiliateFeeIndexer
type AffiliateFeeIndexer struct {
AffiliateAddresses []string // List of affiliate addresses to track
AffiliateFees []*AffiliateFee // Collected affiliate fee records
httpClients []*cosmos.HTTPClient // HTTP clients for querying blockchain nodes
mu sync.Mutex // Mutex for thread-safe access to AffiliateFees
}
Purpose: Manages indexing processes, stores affiliate fee records, and handles concurrency.
Concurrency: Protects
AffiliateFeesslice via a mutexmuto avoid race conditions.
AffiliateFee
type AffiliateFee struct {
Amount string // Amount of asset paid as affiliate fee (string to preserve precision)
Asset string // Asset symbol or denomination (e.g., "THOR.RUNE")
BlockHeight int64 // Height of the block containing the transaction
BlockHash string // Hash of the block
Timestamp int64 // Unix timestamp of the block time
Address string // Affiliate address receiving the fee
TxID string // Transaction ID from which the affiliate fee originated
}
Represents a single outbound affiliate fee payment.
Stores all relevant metadata required for reporting or auditing.
Functions and Methods
NewAffiliateFeeIndexer
func NewAffiliateFeeIndexer(httpClients []*cosmos.HTTPClient, wsClient *cosmos.WSClient) *AffiliateFeeIndexer
Description: Constructor to create a new
AffiliateFeeIndexerinstance.Parameters:
httpClients []*cosmos.HTTPClient: Slice of HTTP clients used to query blockchain data.wsClient *cosmos.WSClient: WebSocket client for subscribing to new block events.
Returns: Pointer to initialized
AffiliateFeeIndexer.Functionality:
Initializes with a fixed list of known affiliate addresses.
Sets up a WebSocket block event handler to invoke
processAffiliateFeeson every new block.
Usage Example:
httpClients := []*cosmos.HTTPClient{client1, client2}
wsClient := cosmos.NewWSClient(...)
indexer := NewAffiliateFeeIndexer(httpClients, wsClient)
Sync
func (i *AffiliateFeeIndexer) Sync() error
Description: Performs a historic synchronization of affiliate fees by querying past blocks.
Returns:
errorif any HTTP request or processing fails.Functionality:
For each affiliate address and HTTP client:
Executes paginated searches for blocks containing outbound transactions to that affiliate address.
Uses concurrency (
errgroup.Group) to parallelize searches.Calculates total pages and creates channels (
pageCh) to manage pagination.Launches workers to fetch blocks and process block results concurrently.
Logs the duration of the indexing process.
Concurrency:
Uses goroutines and worker pools to fetch and handle block data efficiently.
Usage Notes:
Should be called during startup or periodically to backfill and update affiliate fee records.
Example:
if err := indexer.Sync(); err != nil {
log.Fatalf("Sync failed: %v", err)
}
fetchBlocks
func (i *AffiliateFeeIndexer) fetchBlocks(
httpClient *cosmos.HTTPClient,
affiliateAddress string,
pageCh <-chan int,
resultCh chan<- *coretypes.ResultBlockSearch)
Description: Worker pool function that fetches blocks matching the outbound affiliate address query from paginated pages.
Parameters:
httpClient: HTTP client to perform block search queries.affiliateAddress: The affiliate address to filter outbound transactions.pageCh: Channel with page numbers to fetch.resultCh: Channel to send fetched block search results.
Concurrency: Spawns
blockWorkersgoroutines to process pages concurrently.Error Handling: Panics on HTTP errors to indicate unrecoverable fetch failures.
Implementation Detail:
Closes
resultChafter all pages are fetched.
Internal Use: Invoked by
Sync()method.
handleBlocks
func (i *AffiliateFeeIndexer) handleBlocks(
httpClient *cosmos.HTTPClient,
affiliateAddress string,
resultCh <-chan *coretypes.ResultBlockSearch) *sync.WaitGroup
Description: Processes block search results by fetching detailed block results and parsing affiliate fee events.
Parameters:
httpClient: HTTP client to get block results.affiliateAddress: Address to filter affiliate fee events.resultCh: Channel receiving block search results.
Returns: A
WaitGroupthat can be waited on to ensure processing completion.Concurrency: Launches
resultWorkersgoroutines that:Iterate over blocks in each search result.
Fetch block results and events.
Call
processAffiliateFeesto extract and store affiliate fees.
Error Handling: Panics on errors fetching block results.
Internal Use: Part of
Sync()workflow.
processAffiliateFees
func (i *AffiliateFeeIndexer) processAffiliateFees(
block thorchain.Block,
blockEvents []cosmos.ABCIEvent,
affiliateAddresses []string)
Description: Parses block events to identify outbound affiliate fee transactions and adds them to the indexer's store.
Parameters:
block: Block data interface providing height, hash, timestamp.blockEvents: Raw ABCI events from the block.affiliateAddresses: List of affiliate addresses to match against.
Functionality:
Parses typed Thorchain events from raw events.
For each
EventOutboundevent, extracts transaction ID, recipient address, amount, and asset.Checks if the recipient is in the affiliate address list.
If matched, creates an
AffiliateFeerecord and appends it to the thread-safeAffiliateFeesslice.
Concurrency: Uses mutex
muto protect concurrent writes.Error Handling: Panics if event parsing fails.
Example Workflow:
// Called on new block events or during historic sync
indexer.processAffiliateFees(block, events, indexer.AffiliateAddresses)
Implementation Details and Algorithms
Concurrency Model:
Uses goroutines and worker pools for both block fetching (
fetchBlocks) and block processing (handleBlocks).Channels coordinate work distribution and result collection.
Mutex locks ensure safe concurrent writes to the shared affiliate fee slice.
Pagination & Block Searching:
Uses the
BlockSearchAPI with a query filter"outbound.to='<affiliateAddress>'"to find relevant blocks.Calculates the number of pages based on total count and page size.
Distributes page fetches among workers for parallel processing.
Event Parsing:
Employs
thorchain.ParseBlockEventsto convert raw ABCI events into typed events.Focuses on
EventOutboundtype to extract outbound affiliate fee details.
Real-time vs Historic:
On instantiation, registers a WebSocket block handler for real-time indexing.
Sync()method is used to backfill historic data by querying block history.
Interaction with Other System Components
HTTP Clients (
cosmos.HTTPClient):Used to query the blockchain REST API for block searches and block results.
Multiple clients allow load balancing and redundancy.
WebSocket Client (
cosmos.WSClient):Subscribes to new block events, triggering real-time processing.
Thorchain Package:
Provides types such as
thorchain.Block,thorchain.EventOutbound, and event parsing utilities.Used to interpret blockchain event data.
Logging:
Uses a
logger(assumed global) to log indexing progress and errors.
API Layer:
The indexed affiliate fees can be exposed through API endpoints for querying affiliate revenue.
Usage Example
httpClients := []*cosmos.HTTPClient{client1, client2}
wsClient := cosmos.NewWSClient("ws://thorchain-node:26657/websocket")
indexer := NewAffiliateFeeIndexer(httpClients, wsClient)
// Sync historic affiliate fee data
if err := indexer.Sync(); err != nil {
log.Fatalf("Failed to sync affiliate fees: %v", err)
}
// indexer listens and processes new blocks automatically
// Access affiliate fees:
for _, fee := range indexer.AffiliateFees {
fmt.Printf("Affiliate %s received %s %s at block %d\n",
fee.Address, fee.Amount, fee.Asset, fee.BlockHeight)
}
Visual Diagram: AffiliateFeeIndexer Workflow
flowchart TD
Start[Start Indexer / On New Block] --> CheckNewBlock{New Block Received?}
CheckNewBlock -- Yes --> FetchBlockEvents[Fetch Block Events]
FetchBlockEvents --> ParseEvents[Parse Block Events]
ParseEvents --> IdentifyAffiliateFees{Outbound To Affiliate?}
IdentifyAffiliateFees -- Yes --> CreateFeeRecord[Create Affiliate Fee Record]
CreateFeeRecord --> StoreRecord[Store Record Thread-Safely]
IdentifyAffiliateFees -- No --> EndProcess[End Processing]
StoreRecord --> EndProcess
StartHistoricSync[Start Historic Sync] --> ForEachAffiliate[For Each Affiliate Address]
ForEachAffiliate --> PaginatedBlockSearch[Paginate Blocks with Outbound Tx to Affiliate]
PaginatedBlockSearch --> FetchBlockResults[Fetch Block Results]
FetchBlockResults --> ParseEventsHistoric[Parse Events in Blocks]
ParseEventsHistoric --> IdentifyAffiliateFeesHistoric{Outbound To Affiliate?}
IdentifyAffiliateFeesHistoric -- Yes --> CreateFeeRecordHistoric[Create Fee Record]
CreateFeeRecordHistoric --> StoreRecord
IdentifyAffiliateFeesHistoric -- No --> Continue
Continue --> EndHistoricSync[End Historic Sync]
EndProcess -.-> WaitForNextBlock[Wait for Next Block]
WaitForNextBlock --> CheckNewBlock
Summary
The [affiliateFees.go](/projects/291/69279) file provides a robust, concurrent indexing mechanism for tracking and recording outbound affiliate fee transactions on Thorchain. It balances real-time event listening with historic backfill capabilities, ensuring comprehensive and accurate affiliate fee data aggregation. This module is critical for supporting transparency and accountability in affiliate revenue reporting within the Thorchain ecosystem.