Consumer Health Monitoring

Purpose

Consumer Health Monitoring addresses the critical need to observe and verify the operational readiness and error states of Kafka consumers within Camel routes. Unlike general consumer message processing or fetch task management, this subtopic focuses specifically on providing actionable health status information. It enables early detection of connectivity issues, consumer failures, or partition assignment problems, thereby supporting robust deployment and operational observability of Kafka consumers.

Functionality

This subtopic implements health checks and metrics reporting that monitor the internal state of Kafka consumer fetch tasks and their interactions with Kafka brokers. Key workflows include:

These functionalities together empower Camel users and administrators with real-time insights into Kafka consumer health beyond simple message processing status.

Integration with the Parent Topic and Other Subtopics

Consumer Health Monitoring complements the broader **Kafka Message Consumption** topic by adding observability layers that do not interfere with message fetching or offset commit logic but instead monitor those processes. It integrates closely with:

This subtopic introduces new diagnostic capabilities not covered elsewhere, focusing on health status reporting and operational metrics rather than message flow or commit control.

Diagram

flowchart TD
    A[Start Health Check] --> B[Retrieve KafkaConsumer Fetch Tasks]
    B --> C{All Tasks Ready?}
    C -- No --> D[Report DOWN State]
    D --> E[Include Last Error & Metadata]
    C -- Yes --> F[Report UP State]
    E --> G[Provide Health Status to Camel Health API]
    F --> G
    G --> H[Dev Console Queries Metrics]
    H --> I[Fetch Thread States & Last Records]
    I --> J{Committed Offsets Enabled?}
    J -- Yes --> K[Fetch Committed Offsets from Kafka]
    J -- No --> L[Skip Commit Fetch]
    K --> M[Display Metrics & Health in UI/JSON]
    L --> M

Code Snippets Illustrating Key Interactions

Health Check Invocation and Status Reporting

List<TaskHealthState> healthStates = kafkaConsumer.healthStates();

for (TaskHealthState healthState : healthStates) {
    if (!healthState.isReady()) {
        builder.down()
               .message(healthState.buildStateMessage())
               .error(healthState.getLastError())
               .detail("bootstrap.servers", healthState.getBootstrapServers())
               .detail("client.id", healthState.getClientId())
               .detail("group.id", healthState.getGroupId())
               .detail("route.id", routeId)
               .detail("topic", kafkaConsumer.getEndpoint().getConfiguration().getTopic());
        return;
    }
}
builder.up();

Dev Console Metrics Collection per Consumer Thread

for (KafkaFetchRecords task : kafkaConsumer.tasks()) {
    DevConsoleMetricsCollector metrics = task.getMetricsCollector();
    sb.append("Thread: ").append(metrics.getThreadId())
      .append(", State: ").append(task.getState());

    if (!task.healthState().isReady()) {
        sb.append(", Last Error: ").append(task.healthState().buildStateMessage());
    }

    if (committedOffsetsRequested) {
        List<KafkaTopicPosition> offsets = fetchCommitOffsets(kafkaConsumer, metrics);
        // Append committed offset info to output
    }
}

This approach distinctly enhances consumer observability, enabling developers and operators to maintain healthy Kafka consumer integrations within Camel routes.