Consumer Health Monitoring
Purpose
Consumer Health Monitoring addresses the critical need to observe and verify the operational readiness and error states of Kafka consumers within Camel routes. Unlike general consumer message processing or fetch task management, this subtopic focuses specifically on providing actionable health status information. It enables early detection of connectivity issues, consumer failures, or partition assignment problems, thereby supporting robust deployment and operational observability of Kafka consumers.
Functionality
This subtopic implements health checks and metrics reporting that monitor the internal state of Kafka consumer fetch tasks and their interactions with Kafka brokers. Key workflows include:
Health Check Execution: The KafkaConsumerHealthCheck class performs readiness checks by querying the health state of all fetch task threads managed by the
KafkaConsumer. It marks the consumer asUPonly if all tasks report readiness; otherwise, it reportsDOWNwith detailed error messages and contextual metadata.Health State Aggregation: Each fetch task reports a
TaskHealthStatethat encapsulates readiness, last error encountered, client ID, group ID, bootstrap servers, and other relevant connection details. The health check iterates over these states to assess overall consumer health.Dev Console Integration: The KafkaDevConsole provides a runtime view and JSON API exposing metrics such as thread states, last processed record offsets, consumer group metadata, and optionally committed offsets fetched synchronously from Kafka brokers. This allows operators to inspect consumer behavior and troubleshoot issues interactively.
Commit Offset Reporting: When enabled, the dev console fetches and displays committed offsets for each partition, helping verify that offset commits are occurring as expected and that consumer progress is tracked correctly.
These functionalities together empower Camel users and administrators with real-time insights into Kafka consumer health beyond simple message processing status.
Integration with the Parent Topic and Other Subtopics
Consumer Health Monitoring complements the broader **Kafka Message Consumption** topic by adding observability layers that do not interfere with message fetching or offset commit logic but instead monitor those processes. It integrates closely with:
Fetch Task Management: Health checks query the state of individual fetch tasks (
KafkaFetchRecords) to determine consumer readiness.Pause and Resume: While pause/resume controls consumer activity, health monitoring reflects the impact of these controls on consumer state, providing visibility into whether the consumer is operational or intentionally paused.
Offset Commit Management: The dev console can report committed offsets, linking health insights with commit manager activities.
This subtopic introduces new diagnostic capabilities not covered elsewhere, focusing on health status reporting and operational metrics rather than message flow or commit control.
Diagram
flowchart TD
A[Start Health Check] --> B[Retrieve KafkaConsumer Fetch Tasks]
B --> C{All Tasks Ready?}
C -- No --> D[Report DOWN State]
D --> E[Include Last Error & Metadata]
C -- Yes --> F[Report UP State]
E --> G[Provide Health Status to Camel Health API]
F --> G
G --> H[Dev Console Queries Metrics]
H --> I[Fetch Thread States & Last Records]
I --> J{Committed Offsets Enabled?}
J -- Yes --> K[Fetch Committed Offsets from Kafka]
J -- No --> L[Skip Commit Fetch]
K --> M[Display Metrics & Health in UI/JSON]
L --> M
Code Snippets Illustrating Key Interactions
Health Check Invocation and Status Reporting
List<TaskHealthState> healthStates = kafkaConsumer.healthStates();
for (TaskHealthState healthState : healthStates) {
if (!healthState.isReady()) {
builder.down()
.message(healthState.buildStateMessage())
.error(healthState.getLastError())
.detail("bootstrap.servers", healthState.getBootstrapServers())
.detail("client.id", healthState.getClientId())
.detail("group.id", healthState.getGroupId())
.detail("route.id", routeId)
.detail("topic", kafkaConsumer.getEndpoint().getConfiguration().getTopic());
return;
}
}
builder.up();
Dev Console Metrics Collection per Consumer Thread
for (KafkaFetchRecords task : kafkaConsumer.tasks()) {
DevConsoleMetricsCollector metrics = task.getMetricsCollector();
sb.append("Thread: ").append(metrics.getThreadId())
.append(", State: ").append(task.getState());
if (!task.healthState().isReady()) {
sb.append(", Last Error: ").append(task.healthState().buildStateMessage());
}
if (committedOffsetsRequested) {
List<KafkaTopicPosition> offsets = fetchCommitOffsets(kafkaConsumer, metrics);
// Append committed offset info to output
}
}
This approach distinctly enhances consumer observability, enabling developers and operators to maintain healthy Kafka consumer integrations within Camel routes.