DefaultMetricsCollector.java

Overview

`DefaultMetricsCollector` is a concrete implementation of the `DevConsoleMetricsCollector` interface designed to collect and manage Kafka consumer metrics when the Kafka Dev Console is enabled. It acts as a metrics repository and coordinator, storing metadata about the Kafka consumer group, tracking the last processed Kafka record, and managing commit offset information asynchronously. This class enables the Dev Console to display real-time Kafka consumer metrics by maintaining internal state updated from the running Kafka consumer.

Key responsibilities include:

This class interacts primarily with the Kafka `Consumer` API to retrieve metadata and committed offset information, and with the `ProcessingResult` data from the consumer processing pipeline.


Class: DefaultMetricsCollector

public class DefaultMetricsCollector implements DevConsoleMetricsCollector

Description

The `DefaultMetricsCollector` class collects Kafka consumer metrics for the Dev Console. It holds and updates internal state that reflects the Kafka consumer's group metadata, last processed record position, and committed offsets. The class is thread-safe and uses atomic references and synchronization primitives to coordinate data collection.

Fields

Field

Type

Description

`private static final LOG`

`Logger`

Logger instance for logging debug information and exceptions.

`private final threadId`

`String`

Identifier for the thread this collector is associated with.

`private volatile groupMetadata`

`GroupMetadata`

Holds the Kafka consumer group metadata.

`private volatile lastRecord`

`KafkaTopicPosition`

Tracks the last processed Kafka record's topic, partition, offset, and leader epoch.

`private final commitRecords`

`List`

List of commit offset records collected from Kafka.

`private final commitRecordsRequested`

`AtomicBoolean`

Flag indicating if commit records collection has been requested.

`private final latch`

`AtomicReference`

Latch used to coordinate waiting for commit records to be fetched asynchronously.

Constructor

public DefaultMetricsCollector(String threadId)

Methods

void storeMetadata(Consumer consumer)

DefaultMetricsCollector collector = new DefaultMetricsCollector("thread-1");
collector.storeMetadata(kafkaConsumer);

void storeLastRecord(ProcessingResult result)

ProcessingResult lastResult = ...; // obtained from consumer processing pipeline
collector.storeLastRecord(lastResult);

void collectCommitMetrics(Consumer consumer)


GroupMetadata getGroupMetadata()


KafkaTopicPosition getLastRecord()


String getThreadId()


List getCommitRecords()


CountDownLatch fetchCommitRecords()

CountDownLatch latch = collector.fetchCommitRecords();
// Wait for the consumer thread to collect commit records
latch.await();
// Now safe to call getCommitRecords() to retrieve updated commits
List<KafkaTopicPosition> commits = collector.getCommitRecords();

Important Implementation Details and Algorithms


Interaction with Other System Components


Class Diagram

classDiagram
    class DefaultMetricsCollector {
        -String threadId
        -volatile GroupMetadata groupMetadata
        -volatile KafkaTopicPosition lastRecord
        -List~KafkaTopicPosition~ commitRecords
        -AtomicBoolean commitRecordsRequested
        -AtomicReference~CountDownLatch~ latch
        +DefaultMetricsCollector(String threadId)
        +void storeMetadata(Consumer<?, ?> consumer)
        +void storeLastRecord(ProcessingResult result)
        +void collectCommitMetrics(Consumer<?, ?> consumer)
        +GroupMetadata getGroupMetadata()
        +KafkaTopicPosition getLastRecord()
        +String getThreadId()
        +List~KafkaTopicPosition~ getCommitRecords()
        +CountDownLatch fetchCommitRecords()
    }

    %% Related classes (not implemented here)
    class GroupMetadata {
        +String groupId
        +String groupInstanceId
        +String memberId
        +int generationId
    }

    class KafkaTopicPosition {
        +String topic
        +int partition
        +long offset
        +int leaderEpoch
    }

    class ProcessingResult {
        +String getTopic()
        +int getPartition()
        +long getOffset()
    }

    DefaultMetricsCollector ..> GroupMetadata : uses
    DefaultMetricsCollector ..> KafkaTopicPosition : uses
    DefaultMetricsCollector ..> ProcessingResult : uses

Summary

`DefaultMetricsCollector.java` is a vital component within the Kafka consumer Dev Console infrastructure that efficiently collects, stores, and exposes Kafka consumer group metadata, last processed record information, and commit offset metrics. It uses thread-safe constructs and an asynchronous latch-based mechanism to synchronize metrics fetching without blocking Kafka consumer operations, enabling real-time monitoring and debugging in a multi-threaded environment.