batch_processor.go
Обзор
Файл batch_processor.go содержит реализацию компонента BatchProcessor — обработчика пакетной обработки игровых действий в децентрализованной игровой системе на базе блокчейнов. Основная задача BatchProcessor — аккумулировать игровые действия (GameActionEvent) в батчи (пакеты), валидировать их, согласовывать через Symbiotic Relay, а затем отправлять результаты на соответствующие блокчейн-контракты.
BatchProcessor обеспечивает:
Группировку действий в батчи с ограничением по размеру и таймаутом.
Асинхронную валидацию и индексацию действий.
Отправку подтверждённых батчей в кросс-чейн среду (несколько блокчейнов).
Механизмы повторных попыток при ошибках.
Синхронизацию состояния игроков между цепочками.
Интеграцию с GameEngine для бизнес-логики и с external relay для консенсуса.
Таким образом, файл реализует ядро обработки игровых событий, обеспечивая масштабируемость, надежность и межцепочечное взаимодействие.
Основные структуры и типы
BatchProcessor
Основной класс, управляющий сбором, обработкой и отправкой игровых действий.
Поля:
gameEngine *GameEngine— ссылка на движок игры для валидации.relayClient *v1.SymbioticClient— клиент Symbiotic Relay для консенсуса.gameContracts map[int64]*contracts.GameContracts — карта контрактов для разных цепочек (chainID).
Конфигурация батчинга:
maxBatchSize,batchTimeout,maxRetries,retryDelay.Текущее состояние батча:
currentBatch,batchTimer,playerStates.Каналы для очереди действий и результатов:
actionQueue,batchResults.Ключ валидатора и идентификатор сети:
validatorKey,networkID.
GameActionEvent
Представляет отдельное игровое действие.
ChainID int64— идентификатор цепочки.ActionID [32]byte— уникальный идентификатор действия.Action contracts.GameManagerGameAction — данные действия.
Timestamp time.Time— время создания.RetryCount int— количество попыток обработки.
BatchResult
Результат обработки батча.
BatchID string— идентификатор батча.Actions []GameActionEvent— действия в батче.PlayerStates map[common.Address]PlayerState— обновлённые состояния игроков.Success bool— статус успешности.Error error— ошибка, если была.Timestamp time.Time— время обработки.QuorumSig []byte— подпись кворума консенсуса.
BatchProcessorConfig
Конфигурация для создания BatchProcessor.
Параметры размера, таймаутов, повторов, ключ валидатора и сеть.
BatchIndex
Структура для индексирования батча для эффективной обработки.
PlayerActions map[common.Address][]GameActionEvent— действия по игрокам.ActionTypeCount map[uint8]int— количество по типам действий.ChainActions map[int64][]GameActionEvent— действия по цепочкам.TotalManaFlow map[common.Address]*big.Int— итоговый поток маны по игрокам.
Основные методы и функции
NewBatchProcessor
Создаёт и возвращает новый экземпляр BatchProcessor.
bp := NewBatchProcessor(gameEngine, relayClient, gameContracts, config)
Start(ctx context.Context) error
Запускает обработку батчей — запускает горутины для приёма и обработки действий и результатов.
AddAction(action GameActionEvent) error
Добавляет новое действие в очередь обработки. Возвращает ошибку, если очередь переполнена.
processBatches(ctx context.Context)
Основной цикл обработки: принимает действия из канала, добавляет в батч, запускает обработку при заполнении или по таймауту.
addActionToBatch(ctx context.Context, action GameActionEvent)
Добавляет действие в текущий батч с фильтрацией дубликатов. При достижении максимального размера запускает обработку батча.
processBatch(ctx context.Context)
Запускает асинхронную обработку текущего батча: копирует батч, очищает текущий, вызывает processBatchAsync.
processBatchAsync(ctx, batchID string, batch []GameActionEvent)
Асинхронно обрабатывает батч:
Индексирует батч (по игрокам, типам, цепочкам, mana flow).
Валидирует через GameEngine.
Отправляет на Symbiotic Relay для консенсуса.
Подписывает и отправляет на соответствующие блокчейн-контракты.
Формирует результат обработки и отправляет в канал
batchResults.
indexBatch(batch []GameActionEvent) *BatchIndex
Индексирует батч для оптимизации обработки (группировка по игрокам, типам, цепочкам, расчет потока маны).
validateBatch(ctx, batch []GameActionEvent) ([]GameActionEvent, map[common.Address]PlayerState, error)
Валидирует батч через GameEngine, группируя по chainID, получает текущие состояния игроков из блокчейна, обновляет состояния после валидации.
getCurrentPlayerStates(ctx, chainID int64, actions []contracts.GameManagerGameAction) (map[common.Address]PlayerState, error)
Извлекает актуальные состояния игроков из контрактов цепочки. При отсутствии данных создаёт дефолтное состояние.
submitToRelay(ctx, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState) ([]byte, error)
Отправляет батч в Symbiotic Relay для достижения кворума подписи. Возвращает подпись кворума.
submitToChain(ctx, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState, quorumSig []byte) error
Отправляет батч на каждый соответствующий блокчейн, вызывая локальную функцию submitChainBatch.
submitChainBatch(ctx, chainID int64, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState, quorumSig []byte) error
Готовит данные и (условно) отправляет транзакцию в контракт GameManager на указанной цепочке.
Обработка результатов
handleBatchResults(ctx context.Context)— цикл обработки результатов из каналаbatchResults.processBatchResult(ctx, result BatchResult)— логирует и обновляет локальный кеш состояний или инициирует повтор.
Повторная обработка
handleFailedBatch(ctx, result BatchResult)— повторяет отдельные действия из неуспешного батча с учётом ограничений по количеству попыток.
Кросс-чейн обработка
ProcessCrossChainBatch(ctx, batchID string, actions []CrossChainGameAction, sourceChain int64, targetChain int64) error— обрабатывает батч действий, которые затрагивают несколько цепей.Использует отдельные методы для получения состояний, отправки на Relay и Chain с учётом кросс-чейн логики.
Синхронизация состояний игроков
SynchronizePlayerStateAcrossChains(ctx, player common.Address, sourceChain int64, targetChain int64) error— синхронизирует состояние игрока между двумя цепочками, вызывая соответствующие методы GameEngine и отправляя результаты в цепи.
Вспомогательные методы
generateBatchID() string— генерирует уникальный ID батча.createBatchData(...) []byte— формирует данные для подписи батча.signBatchData(data []byte) ([]byte, error)— подписывает данные валидаторским приватным ключом.GetBatchResults() <-chan BatchResult— возвращает канал с результатами.GetQueueSize() int— возвращает размер очереди ожидания действий.GetCurrentBatchSize() int— возвращает размер текущего набранного батча.
Взаимодействие с другими компонентами системы
GameEngine — используется для валидации игровых действий и синхронизации состояний.
Symbiotic Relay — сервис консенсуса, обеспечивающий кворумные подписи для батчей.
GameContracts — обертка над смарт-контрактами конкретных цепочек, через которые происходит чтение и запись состояния игроков.
Ethereum и криптография — для подписания и хеширования данных применяется библиотека
go-ethereum/crypto.
BatchProcessor выступает связующим звеном, обеспечивая надёжное и согласованное выполнение игровых действий в распределённой мультичейн среде.
Пример использования
config := BatchProcessorConfig{
MaxBatchSize: 50,
BatchTimeout: 5 * time.Second,
MaxRetries: 3,
RetryDelay: 2 * time.Second,
ValidatorKey: validatorPrivKeyBytes,
NetworkID: "mainnet",
}
bp := NewBatchProcessor(gameEngine, relayClient, gameContracts, config)
ctx := context.Background()
err := bp.Start(ctx)
if err != nil {
log.Fatalf("Failed to start batch processor: %v", err)
}
// Добавляем действие в очередь
actionEvent := GameActionEvent{
ChainID: 1,
ActionID: someActionID,
Action: someGameAction,
Timestamp: time.Now(),
}
err = bp.AddAction(actionEvent)
if err != nil {
log.Printf("Failed to enqueue action: %v", err)
}
Визуальная диаграмма структуры файла
classDiagram
class BatchProcessor {
-gameEngine *GameEngine
-relayClient *SymbioticClient
-gameContracts map[int64]*GameContracts
-maxBatchSize int
-batchTimeout time.Duration
-maxRetries int
-retryDelay time.Duration
-currentBatch []GameActionEvent
-batchTimer *time.Timer
-playerStates map[common.Address]PlayerState
-actionQueue chan GameActionEvent
-batchResults chan BatchResult
-validatorKey []byte
-networkID string
+Start(ctx context.Context) error
+AddAction(action GameActionEvent) error
+GetBatchResults() <-chan BatchResult
+GetQueueSize() int
+GetCurrentBatchSize() int
-processBatches(ctx context.Context)
-addActionToBatch(ctx context.Context, action GameActionEvent)
-processBatch(ctx context.Context)
-processBatchAsync(ctx context.Context, batchID string, batch []GameActionEvent)
-validateBatch(ctx context.Context, batch []GameActionEvent) ([]GameActionEvent, map[common.Address]PlayerState, error)
-submitToRelay(ctx context.Context, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState) ([]byte, error)
-submitToChain(ctx context.Context, batchID string, actions []GameActionEvent, playerStates map[common.Address]PlayerState, quorumSig []byte) error
-handleBatchResults(ctx context.Context)
-processBatchResult(ctx context.Context, result BatchResult)
-handleFailedBatch(ctx context.Context, result BatchResult)
+ProcessCrossChainBatch(ctx context.Context, batchID string, actions []CrossChainGameAction, sourceChain int64, targetChain int64) error
+SynchronizePlayerStateAcrossChains(ctx context.Context, player common.Address, sourceChain int64, targetChain int64) error
}
class GameActionEvent {
+ChainID int64
+ActionID [32]byte
+Action GameManagerGameAction
+Timestamp time.Time
+RetryCount int
}
class BatchResult {
+BatchID string
+Actions []GameActionEvent
+PlayerStates map[common.Address]PlayerState
+Success bool
+Error error
+Timestamp time.Time
+QuorumSig []byte
}
class BatchIndex {
+PlayerActions map[common.Address][]GameActionEvent
+ActionTypeCount map[uint8]int
+ChainActions map[int64][]GameActionEvent
+TotalManaFlow map[common.Address]*big.Int
}
BatchProcessor "1" o-- "*" GameActionEvent : currentBatch
BatchProcessor "1" o-- "1" BatchResult : batchResults
BatchProcessor "1" o-- "1" BatchIndex : indexBatch
Важные детали реализации и алгоритмы
Фильтрация дубликатов — при добавлении действия в батч проверяется уникальность по ActionID.
Индексация батча — агрегируются данные по игрокам, типам действий и цепочкам для оптимизации логики.
Валидация через GameEngine — действия проверяются по бизнес-логике с учётом текущих состояний игроков.
Подпись батчей — каждый батч подписывается приватным ключом валидатора для обеспечения доверия.
Повторные попытки — неуспешные действия повторно добавляются с задержкой, ограниченной количеством попыток.
Кросс-чейн поддержка — отдельные методы обрабатывают действия, охватывающие несколько цепочек, с синхронизацией состояний.
Асинхронная обработка — батчи обрабатываются в горутинах для повышения пропускной способности.
Заключение
batch_processor.go — ядро пакетной обработки игровых действий для мультичейн среды с использованием консенсусного механизма relay и бизнес-логики GameEngine. Он обеспечивает надежное, масштабируемое и устойчивое выполнение игровых операций, поддерживает кросс-чейн взаимодействие и синхронизацию состояний.
Документация и диаграмма помогают понять архитектуру, основные методы и важные процессы, реализованные в данном файле.