DefaultMetricsCollector.java
Overview
`DefaultMetricsCollector` is a concrete implementation of the `DevConsoleMetricsCollector` interface designed to collect and manage Kafka consumer metrics when the Kafka Dev Console is enabled. It acts as a metrics repository and coordinator, storing metadata about the Kafka consumer group, tracking the last processed Kafka record, and managing commit offset information asynchronously. This class enables the Dev Console to display real-time Kafka consumer metrics by maintaining internal state updated from the running Kafka consumer.
Key responsibilities include:
Storing Kafka consumer group metadata.
Tracking the last processed record's topic, partition, and offset.
Collecting and storing committed offsets for topic partitions.
Coordinating asynchronous fetching of commit records via a
CountDownLatch.Providing thread-safe access to collected metrics for use by the Dev Console UI.
This class interacts primarily with the Kafka `Consumer` API to retrieve metadata and committed offset information, and with the `ProcessingResult` data from the consumer processing pipeline.
Class: DefaultMetricsCollector
public class DefaultMetricsCollector implements DevConsoleMetricsCollector
Description
The `DefaultMetricsCollector` class collects Kafka consumer metrics for the Dev Console. It holds and updates internal state that reflects the Kafka consumer's group metadata, last processed record position, and committed offsets. The class is thread-safe and uses atomic references and synchronization primitives to coordinate data collection.
Fields
Field | Type | Description |
|---|---|---|
`private static final LOG` | `Logger` | Logger instance for logging debug information and exceptions. |
`private final threadId` | `String` | Identifier for the thread this collector is associated with. |
`private volatile groupMetadata` | `GroupMetadata` | Holds the Kafka consumer group metadata. |
`private volatile lastRecord` | `KafkaTopicPosition` | Tracks the last processed Kafka record's topic, partition, offset, and leader epoch. |
`private final commitRecords` | `List` | List of commit offset records collected from Kafka. |
`private final commitRecordsRequested` | `AtomicBoolean` | Flag indicating if commit records collection has been requested. |
`private final latch` | `AtomicReference` | Latch used to coordinate waiting for commit records to be fetched asynchronously. |
Constructor
public DefaultMetricsCollector(String threadId)
Parameters:
threadId- a unique identifier representing the thread that this metrics collector instance is associated with.
Description:
Initializes the collector with the specified thread ID.
Methods
void storeMetadata(Consumer consumer)
Description:
Extracts and stores Kafka consumer group metadata from the provided KafkaConsumerinstance.Parameters:
consumer- the Kafka consumer from which to retrieve the group metadata.
Usage Example:
DefaultMetricsCollector collector = new DefaultMetricsCollector("thread-1");
collector.storeMetadata(kafkaConsumer);
Details:
RetrievesConsumerGroupMetadatafrom the Kafka consumer and stores it internally as aGroupMetadataobject.
void storeLastRecord(ProcessingResult result)
Description:
Stores information about the last record processed by the Kafka consumer.Parameters:
result- aProcessingResultobject representing the last processed Kafka record.
Usage Example:
ProcessingResult lastResult = ...; // obtained from consumer processing pipeline
collector.storeLastRecord(lastResult);
Details:
Updates the internallastRecordreference with the topic, partition, offset, and sets leader epoch to zero (default as not provided byProcessingResult).
void collectCommitMetrics(Consumer consumer)
Description:
If commit records collection has been requested (commitRecordsRequestedflag is true), fetches the latest committed offsets from the Kafka broker for the assigned partitions, updates internal commit records, and releases the waiting latch.Parameters:
consumer- the Kafka consumer used to fetch committed offsets.
Details:
Usesconsumer.committed(consumer.assignment())to get committed offsets. UpdatescommitRecordslist atomically. If an exception occurs during fetching, it is logged at debug level but otherwise ignored to avoid disruption.Usage Context:
Intended to be called periodically or from the consumer thread to update commit offset metrics asynchronously.
GroupMetadata getGroupMetadata()
Description:
Returns the stored Kafka consumer group metadata.Returns:
GroupMetadatainstance ornullif not yet set.
KafkaTopicPosition getLastRecord()
Description:
Returns the stored position of the last processed Kafka record.Returns:
KafkaTopicPositioninstance ornullif not yet set.
String getThreadId()
Description:
Returns the thread identifier associated with this metrics collector.Returns:
Stringrepresenting the thread ID.
List getCommitRecords()
Description:
Returns an unmodifiable snapshot of the commit offset records collected.Returns:
Unmodifiable
List<KafkaTopicPosition>representing committed offsets for topic partitions.
CountDownLatch fetchCommitRecords()
Description:
Initiates a request to fetch commit records asynchronously by setting a latch and the request flag. Returns aCountDownLatchthat the caller can wait on until commit records are collected.Returns:
CountDownLatchwhich will count down once commit records are available.
Usage Example:
CountDownLatch latch = collector.fetchCommitRecords();
// Wait for the consumer thread to collect commit records
latch.await();
// Now safe to call getCommitRecords() to retrieve updated commits
List<KafkaTopicPosition> commits = collector.getCommitRecords();
Details:
This mechanism allows synchronization between the thread requesting commit metrics and the consumer thread performing the actual fetch from Kafka brokers.
Important Implementation Details and Algorithms
Uses volatile variables (
groupMetadata,lastRecord) to ensure visibility of updates across threads.Uses atomic variables (
AtomicBooleanandAtomicReference<CountDownLatch>) to manage synchronization and state flags safely.Maintains an internal
List<KafkaTopicPosition>to store commit offset information, which is cleared and rebuilt upon each collection.The asynchronous commit metrics collection pattern with
CountDownLatchallows the caller thread to wait for the consumer thread to complete fetching commit information without blocking Kafka consumer operations.Logging is done with SLF4J at debug level for exceptions during commit offset fetching, ensuring no disruption in metric collection due to transient errors.
Interaction with Other System Components
Kafka Consumer:
TheDefaultMetricsCollectorinteracts closely with the KafkaConsumerinterface to gather metadata and committed offset information.Processing Pipeline:
ReceivesProcessingResultinstances from the Kafka consumer processing pipeline to update the last processed record position.Dev Console UI:
Provides metrics data (group metadata, last record, commit offsets) for display in the Kafka Dev Console, facilitating monitoring and debugging.Concurrency Model:
Designed to be used in multi-threaded environments where a dedicated consumer thread updates metrics, while other threads may request and wait for metric snapshots.
Class Diagram
classDiagram
class DefaultMetricsCollector {
-String threadId
-volatile GroupMetadata groupMetadata
-volatile KafkaTopicPosition lastRecord
-List~KafkaTopicPosition~ commitRecords
-AtomicBoolean commitRecordsRequested
-AtomicReference~CountDownLatch~ latch
+DefaultMetricsCollector(String threadId)
+void storeMetadata(Consumer<?, ?> consumer)
+void storeLastRecord(ProcessingResult result)
+void collectCommitMetrics(Consumer<?, ?> consumer)
+GroupMetadata getGroupMetadata()
+KafkaTopicPosition getLastRecord()
+String getThreadId()
+List~KafkaTopicPosition~ getCommitRecords()
+CountDownLatch fetchCommitRecords()
}
%% Related classes (not implemented here)
class GroupMetadata {
+String groupId
+String groupInstanceId
+String memberId
+int generationId
}
class KafkaTopicPosition {
+String topic
+int partition
+long offset
+int leaderEpoch
}
class ProcessingResult {
+String getTopic()
+int getPartition()
+long getOffset()
}
DefaultMetricsCollector ..> GroupMetadata : uses
DefaultMetricsCollector ..> KafkaTopicPosition : uses
DefaultMetricsCollector ..> ProcessingResult : uses
Summary
`DefaultMetricsCollector.java` is a vital component within the Kafka consumer Dev Console infrastructure that efficiently collects, stores, and exposes Kafka consumer group metadata, last processed record information, and commit offset metrics. It uses thread-safe constructs and an asynchronous latch-based mechanism to synchronize metrics fetching without blocking Kafka consumer operations, enabling real-time monitoring and debugging in a multi-threaded environment.