KafkaConsumer.java


Overview

`KafkaConsumer.java` defines the `KafkaConsumer` class, a core component of Apache Camel's Kafka integration that manages Kafka message consumption within Camel routes. Extending Camel's `DefaultConsumer`, it orchestrates the lifecycle and operation of multiple Kafka consumer tasks (`KafkaFetchRecords`), enabling parallel and scalable consumption of Kafka topics or topic patterns.

**Key responsibilities:**

This class acts as the bridge between Camel's routing and processing infrastructure and Kafka's consumer API, ensuring robust, fault-tolerant, and manageable Kafka consumption.


Class: KafkaConsumer

Declaration

public class KafkaConsumer extends DefaultConsumer
    implements ResumeAware<ResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>, Suspendable

Purpose

Manages one or more Kafka consumer tasks (`KafkaFetchRecords`) to consume messages from Kafka topics. It controls the consumer lifecycle, threading, configuration, error handling integration, pause/resume functionality, and health monitoring.

Package

`org.apache.camel.component.kafka`


Fields Summary

Field Name

Type

Description

`executor`

`ExecutorService`

Thread pool executor managing fetch tasks.

`endpoint`

`KafkaEndpoint`

Reference to the parent Kafka endpoint configuration.

`consumerHealthCheck`

`KafkaConsumerHealthCheck`

Health check instance for this consumer.

`healthCheckRepository`

`HealthCheckRepository`

Registry for health checks within the Camel context.

`tasks`

`List`

List of running Kafka fetch tasks (worker threads).

`stopOffsetRepo`

`boolean`

Flag indicating if offset repository should be stopped.

`resumeStrategy`

`ResumeStrategy`

Strategy to resume consumption after failures.

`consumerListener`

`KafkaConsumerListener`

Listener for consumption and error events callbacks.


Constructor

KafkaConsumer(KafkaEndpoint endpoint, Processor processor)

KafkaConsumer consumer = new KafkaConsumer(kafkaEndpoint, processor);

Methods

Configuration & Property Access

Properties getProps()

Properties kafkaProps = consumer.getProps();

Lifecycle Management

void doStart() throws Exception

consumer.doStart();

void doStop() throws Exception

consumer.doStop();

Pause and Resume

void doSuspend() throws Exception

consumer.doSuspend();

void doResume() throws Exception

consumer.doResume();

Health Monitoring

List<TaskHealthState> healthStates()

List<TaskHealthState> states = consumer.healthStates();

boolean isKafkaPaused()

if (consumer.isKafkaPaused()) {
    // handle paused state
}

Resume Strategy and Consumer Listener

void setResumeStrategy(ResumeStrategy resumeStrategy)

ResumeStrategy getResumeStrategy()

KafkaConsumerListener getConsumerListener()

void setConsumerListener(KafkaConsumerListener consumerListener)


Utility

String adapterFactoryService()


Important Implementation Details


Interaction with Other Components


Usage Example

// Assuming kafkaEndpoint and processor are initialized
KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaEndpoint, processor);

// Optionally set a resume strategy and consumer listener
kafkaConsumer.setResumeStrategy(myResumeStrategy);
kafkaConsumer.setConsumerListener(myConsumerListener);

// Start the consumer (starts multiple fetch tasks)
kafkaConsumer.start();

// Pause consumption (e.g., due to error)
kafkaConsumer.suspend();

// Resume consumption
kafkaConsumer.resume();

// Stop the consumer gracefully
kafkaConsumer.stop();

Visual Diagram: KafkaConsumer Class Structure

classDiagram
    class KafkaConsumer {
        -ExecutorService executor
        -KafkaEndpoint endpoint
        -KafkaConsumerHealthCheck consumerHealthCheck
        -HealthCheckRepository healthCheckRepository
        -List~KafkaFetchRecords~ tasks
        -boolean stopOffsetRepo
        -ResumeStrategy resumeStrategy
        -KafkaConsumerListener consumerListener
        +KafkaConsumer(KafkaEndpoint, Processor)
        +void setResumeStrategy(ResumeStrategy)
        +ResumeStrategy getResumeStrategy()
        +KafkaConsumerListener getConsumerListener()
        +void setConsumerListener(KafkaConsumerListener)
        +KafkaEndpoint getEndpoint()
        +Properties getProps()
        +void doStart() throws Exception
        +void doStop() throws Exception
        +void doSuspend() throws Exception
        +void doResume() throws Exception
        +List~TaskHealthState~ healthStates()
        +boolean isKafkaPaused()
        +String adapterFactoryService()
    }
    KafkaConsumer --> KafkaEndpoint
    KafkaConsumer --> ExecutorService
    KafkaConsumer --> KafkaFetchRecords
    KafkaConsumer --> ResumeStrategy
    KafkaConsumer --> KafkaConsumerListener
    KafkaConsumer --> KafkaConsumerHealthCheck
    KafkaConsumer --> HealthCheckRepository

Summary

`KafkaConsumer.java` implements the main Kafka consumer coordination for Apache Camel Kafka component. It manages multiple consumer threads, lifecycle events, error handling integration, pause/resume control, and health checks. This class is vital for enabling scalable, resilient Kafka message consumption within Camel routes, abstracting Kafka client complexities behind a flexible and manageable interface.


Additional Notes

This class is tightly coupled with `KafkaFetchRecords` which performs the actual Kafka polling and processing. It also relies on the `KafkaEndpoint` for configuration and executor service provisioning. The design embraces multi-threading by running multiple fetch tasks and supports advanced features such as offset resume and health monitoring to ensure robust Kafka consumption in distributed integration scenarios.