KafkaDevConsole.java
Overview
`KafkaDevConsole.java` is a developer console component integrated into the Apache Camel framework that provides real-time diagnostics and monitoring for Kafka consumers configured within Camel routes. Its primary purpose is to expose detailed runtime metrics about Kafka consumer fetch tasks, including thread states, last processed records, consumer group metadata, and optionally committed offset information directly fetched from Kafka brokers.
The class extends `AbstractDevConsole` and is annotated with `@DevConsole`, making it discoverable and usable within Camel's developer console infrastructure. This enables Camel users and operators to query Kafka consumer health and operational statistics either as plain text or JSON-formatted responses, facilitating easier debugging, health monitoring, and operational insight.
Detailed Class and Method Documentation
Class: KafkaDevConsole
Package:
org.apache.camel.component.kafkaExtends:
AbstractDevConsoleAnnotations:
@DevConsole(name = "kafka", displayName = "Kafka", description = "Apache Kafka")
Description
The `KafkaDevConsole` class provides two primary output formats for monitoring Kafka consumers:
Textual output (
doCallText) - Human-readable console text representation.JSON output (
doCallJson) - Structured JSON representation useful for integrations or UI rendering.
It supports an option to include committed offsets (`"committed"`) that triggers synchronous fetching of committed offsets from Kafka brokers, enhancing the depth of monitoring.
Constants
Name | Type | Description |
|---|---|---|
`COMMITTED_TIMEOUT` | `long` | Timeout in milliseconds (10,000ms) for fetching committed offsets from Kafka brokers |
`COMMITTED` | `String` | Key option to include committed offsets in the output ("committed") |
Constructor
KafkaDevConsole()
Description:
Default constructor initializing the dev console with identifiers and display names specific to Kafka.Usage example:
KafkaDevConsole console = new KafkaDevConsole();
Methods
protected String doCallText(Map<String, Object> options)
Description:
Generates a human-readable string describing the state and metrics of Kafka consumers in all Camel routes. Includes details such as thread IDs, worker states, last errors if any, consumer group metadata, last processed record information, and optionally committed offsets.Parameters:
options- a map of options; recognizes"committed"(string "true"/"false") to enable fetching committed offsets.Returns:
A formatted string representing the Kafka consumer metrics.Usage example:
Map<String, Object> options = Map.of("committed", "true");
String metricsText = kafkaDevConsole.doCallText(options);
System.out.println(metricsText);
Key Implementation Notes:
Iterates over all routes and checks if the route's consumer is an instance of
KafkaConsumer.For each fetch task (
KafkaFetchRecords), extracts metrics from itsDevConsoleMetricsCollector.If
"committed"is enabled, fetches committed offsets via a synchronous call with a timeout.Uses
StringBuilderfor efficient string concatenation.Logs the time taken to fetch committed offsets for debugging.
protected Map<String, Object> doCallJson(Map<String, Object> options)
Description:
Produces a JSON object representing Kafka consumer metrics similar todoCallTextbut structured for programmatic consumption.Parameters:
options- a map of options; recognizes"committed"(string "true"/"false") to enable including committed offsets data.Returns:
AMap<String, Object>containing JSON-serializable structures reflecting the consumer metrics.Usage example:
Map<String, Object> options = Map.of("committed", "false");
Map<String, Object> jsonMetrics = kafkaDevConsole.doCallJson(options);
// Can be converted or serialized to JSON string for UI or API response
Key Implementation Notes:
Constructs a root
JsonObjectwith a key"kafkaConsumers"holding an array of consumer route objects.Each consumer route object contains route ID, endpoint URI, and a list of worker details.
Worker details include thread info, state, errors, group metadata, last record data, and optionally committed offsets.
Committed offsets are fetched with the same mechanism as in
doCallText.
private static List<DefaultMetricsCollector.KafkaTopicPosition> fetchCommitOffsets(KafkaConsumer kc, DevConsoleMetricsCollector collector)
Description:
Helper method that triggers the asynchronous fetch of committed offsets from Kafka brokers and waits synchronously for the response or timeout.Parameters:
kc- theKafkaConsumerinstance whose committed offsets are being fetched.collector- theDevConsoleMetricsCollectorassociated with the fetch task, responsible for managing commit records.
Returns:
A list ofKafkaTopicPositionobjects representing committed offsets per topic partition, ornullif timeout or failure occurs.Usage scenario:
Internally used by the console methods when the committed offsets option is enabled.Implementation details:
Uses
CountDownLatchto wait for commit records.Timeout is the minimum of configured poll timeout and a max cap of 10 seconds.
Silent failure with catch-all exception to avoid breaking metrics display.
Logs the duration of the commit fetch operation for performance monitoring.
Important Implementation Details and Algorithms
Synchronous fetch of committed offsets:
The committed offsets from Kafka consumer groups are fetched synchronously by waiting on aCountDownLatchthat theDevConsoleMetricsCollectorcontrols. This ensures that the console shows up-to-date commit positions but may block for up to 10 seconds to avoid indefinite waiting.Use of Java 16+ Pattern Matching:
The code uses pattern matching ininstanceofchecks, such as:if (route.getConsumer() instanceof KafkaConsumer kc) { ... }This improves readability and scope of the variable.
Health State Awareness:
The console fetches and displays health state information viaTaskHealthState. If a fetch task is not ready, it shows the last error message, helping diagnose consumer thread issues.Extensive Metadata Exposure:
The console exposes consumer group metadata such as group ID, instance ID, member ID, and generation ID, which are crucial for debugging consumer group coordination problems.Dual output formats:
Supporting both plain text and JSON output allows integration flexibility, including CLI inspection and embedded UI or API clients.
Interactions with Other System Components
Camel Routes:
KafkaDevConsolequeries all routes in the Camel context to find those that useKafkaConsumer. It relies on the route's consumer implementation to gather metrics.KafkaConsumer and KafkaFetchRecords:
These classes represent the consumer and its individual fetch task threads. The console accesses task-level details and metrics through these.DevConsoleMetricsCollector / DefaultMetricsCollector:
Metrics collectors attached to fetch tasks provide thread IDs, last consumed records, and commit offset info.TaskHealthState:
Used to determine the readiness and error states of fetch tasks, integrated into the console's output to reflect consumer health.Camel Dev Console Infrastructure:
By extendingAbstractDevConsoleand using the@DevConsoleannotation, this class integrates into Camel's developer console system, making it accessible via CLI commands or UI.
Usage Examples
Example 1: Fetch Kafka consumer metrics as text with committed offsets
KafkaDevConsole console = new KafkaDevConsole();
Map<String, Object> options = new HashMap<>();
options.put("committed", "true");
String metricsText = console.doCallText(options);
System.out.println(metricsText);
Example 2: Fetch Kafka consumer metrics as JSON without committed offsets
KafkaDevConsole console = new KafkaDevConsole();
Map<String, Object> options = new HashMap<>();
options.put("committed", "false");
Map<String, Object> metricsJson = console.doCallJson(options);
// Serialize metricsJson to JSON string for API or UI usage
Mermaid Class Diagram
classDiagram
class KafkaDevConsole {
<<extends AbstractDevConsole>>
- static final Logger LOG
- static final long COMMITTED_TIMEOUT
- static final String COMMITTED
+ KafkaDevConsole()
+ String doCallText(Map<String,Object> options)
+ Map<String,Object> doCallJson(Map<String,Object> options)
- static List<KafkaTopicPosition> fetchCommitOffsets(KafkaConsumer kc, DevConsoleMetricsCollector collector)
}
KafkaDevConsole --|> AbstractDevConsole
Summary
`KafkaDevConsole.java` is a critical observability tool within Apache Camel's Kafka component, providing deep visibility into Kafka consumer internals. It aids developers and operators with detailed thread-level metrics, error diagnostics, and offset commit information, supporting robust monitoring and troubleshooting workflows. Its design leverages Camel's dev console framework, integrating cleanly into the existing ecosystem and offering flexible output formats for diverse consumption scenarios.