batch_processor.go

Обзор

Файл batch_processor.go содержит реализацию компонента BatchProcessor — обработчика пакетной обработки игровых действий в децентрализованной игровой системе на базе блокчейнов. Основная задача BatchProcessor — аккумулировать игровые действия (GameActionEvent) в батчи (пакеты), валидировать их, согласовывать через Symbiotic Relay, а затем отправлять результаты на соответствующие блокчейн-контракты.

BatchProcessor обеспечивает:

Таким образом, файл реализует ядро обработки игровых событий, обеспечивая масштабируемость, надежность и межцепочечное взаимодействие.


Основные структуры и типы

BatchProcessor

Основной класс, управляющий сбором, обработкой и отправкой игровых действий.

Поля:


GameActionEvent

Представляет отдельное игровое действие.


BatchResult

Результат обработки батча.


BatchProcessorConfig

Конфигурация для создания BatchProcessor.


BatchIndex

Структура для индексирования батча для эффективной обработки.


Основные методы и функции

NewBatchProcessor

Создаёт и возвращает новый экземпляр BatchProcessor.

bp := NewBatchProcessor(gameEngine, relayClient, gameContracts, config)

Start(ctx context.Context) error

Запускает обработку батчей — запускает горутины для приёма и обработки действий и результатов.

AddAction(action GameActionEvent) error

Добавляет новое действие в очередь обработки. Возвращает ошибку, если очередь переполнена.

processBatches(ctx context.Context)

Основной цикл обработки: принимает действия из канала, добавляет в батч, запускает обработку при заполнении или по таймауту.

addActionToBatch(ctx context.Context, action GameActionEvent)

Добавляет действие в текущий батч с фильтрацией дубликатов. При достижении максимального размера запускает обработку батча.

processBatch(ctx context.Context)

Запускает асинхронную обработку текущего батча: копирует батч, очищает текущий, вызывает processBatchAsync.

processBatchAsync(ctx, batchID string, batch []GameActionEvent)

Асинхронно обрабатывает батч:

  1. Индексирует батч (по игрокам, типам, цепочкам, mana flow).

  2. Валидирует через GameEngine.

  3. Отправляет на Symbiotic Relay для консенсуса.

  4. Подписывает и отправляет на соответствующие блокчейн-контракты.

  5. Формирует результат обработки и отправляет в канал batchResults.

indexBatch(batch []GameActionEvent) *BatchIndex

Индексирует батч для оптимизации обработки (группировка по игрокам, типам, цепочкам, расчет потока маны).

validateBatch(ctx, batch []GameActionEvent) ([]GameActionEvent, map[common.Address]PlayerState, error)

Валидирует батч через GameEngine, группируя по chainID, получает текущие состояния игроков из блокчейна, обновляет состояния после валидации.

getCurrentPlayerStates(ctx, chainID int64, actions []contracts.GameManagerGameAction) (map[common.Address]PlayerState, error)

Извлекает актуальные состояния игроков из контрактов цепочки. При отсутствии данных создаёт дефолтное состояние.

submitToRelay(ctx, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState) ([]byte, error)

Отправляет батч в Symbiotic Relay для достижения кворума подписи. Возвращает подпись кворума.

submitToChain(ctx, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState, quorumSig []byte) error

Отправляет батч на каждый соответствующий блокчейн, вызывая локальную функцию submitChainBatch.

submitChainBatch(ctx, chainID int64, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState, quorumSig []byte) error

Готовит данные и (условно) отправляет транзакцию в контракт GameManager на указанной цепочке.


Обработка результатов


Повторная обработка


Кросс-чейн обработка


Синхронизация состояний игроков


Вспомогательные методы


Взаимодействие с другими компонентами системы

BatchProcessor выступает связующим звеном, обеспечивая надёжное и согласованное выполнение игровых действий в распределённой мультичейн среде.


Пример использования

config := BatchProcessorConfig{
    MaxBatchSize: 50,
    BatchTimeout: 5 * time.Second,
    MaxRetries:   3,
    RetryDelay:   2 * time.Second,
    ValidatorKey: validatorPrivKeyBytes,
    NetworkID:    "mainnet",
}

bp := NewBatchProcessor(gameEngine, relayClient, gameContracts, config)
ctx := context.Background()

err := bp.Start(ctx)
if err != nil {
    log.Fatalf("Failed to start batch processor: %v", err)
}

// Добавляем действие в очередь
actionEvent := GameActionEvent{
    ChainID: 1,
    ActionID: someActionID,
    Action: someGameAction,
    Timestamp: time.Now(),
}

err = bp.AddAction(actionEvent)
if err != nil {
    log.Printf("Failed to enqueue action: %v", err)
}

Визуальная диаграмма структуры файла

classDiagram
    class BatchProcessor {
        -gameEngine *GameEngine
        -relayClient *SymbioticClient
        -gameContracts map[int64]*GameContracts
        -maxBatchSize int
        -batchTimeout time.Duration
        -maxRetries int
        -retryDelay time.Duration
        -currentBatch []GameActionEvent
        -batchTimer *time.Timer
        -playerStates map[common.Address]PlayerState
        -actionQueue chan GameActionEvent
        -batchResults chan BatchResult
        -validatorKey []byte
        -networkID string
        +Start(ctx context.Context) error
        +AddAction(action GameActionEvent) error
        +GetBatchResults() <-chan BatchResult
        +GetQueueSize() int
        +GetCurrentBatchSize() int
        -processBatches(ctx context.Context)
        -addActionToBatch(ctx context.Context, action GameActionEvent)
        -processBatch(ctx context.Context)
        -processBatchAsync(ctx context.Context, batchID string, batch []GameActionEvent)
        -validateBatch(ctx context.Context, batch []GameActionEvent) ([]GameActionEvent, map[common.Address]PlayerState, error)
        -submitToRelay(ctx context.Context, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState) ([]byte, error)
        -submitToChain(ctx context.Context, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState, quorumSig []byte) error
        -handleBatchResults(ctx context.Context)
        -processBatchResult(ctx context.Context, result BatchResult)
        -handleFailedBatch(ctx context.Context, result BatchResult)
        +ProcessCrossChainBatch(ctx context.Context, batchID string, actions []CrossChainGameAction, sourceChain int64, targetChain int64) error
        +SynchronizePlayerStateAcrossChains(ctx context.Context, player common.Address, sourceChain int64, targetChain int64) error
    }

    class GameActionEvent {
        +ChainID int64
        +ActionID [32]byte
        +Action GameManagerGameAction
        +Timestamp time.Time
        +RetryCount int
    }

    class BatchResult {
        +BatchID string
        +Actions []GameActionEvent
        +PlayerStates map[common.Address]PlayerState
        +Success bool
        +Error error
        +Timestamp time.Time
        +QuorumSig []byte
    }

    class BatchIndex {
        +PlayerActions map[common.Address][]GameActionEvent
        +ActionTypeCount map[uint8]int
        +ChainActions map[int64][]GameActionEvent
        +TotalManaFlow map[common.Address]*big.Int
    }

    BatchProcessor "1" o-- "*" GameActionEvent : currentBatch
    BatchProcessor "1" o-- "1" BatchResult : batchResults
    BatchProcessor "1" o-- "1" BatchIndex : indexBatch

Важные детали реализации и алгоритмы


Заключение

batch_processor.go — ядро пакетной обработки игровых действий для мультичейн среды с использованием консенсусного механизма relay и бизнес-логики GameEngine. Он обеспечивает надежное, масштабируемое и устойчивое выполнение игровых операций, поддерживает кросс-чейн взаимодействие и синхронизацию состояний.

Документация и диаграмма помогают понять архитектуру, основные методы и важные процессы, реализованные в данном файле.