KafkaConsumer.java
Overview
`KafkaConsumer.java` defines the `KafkaConsumer` class, a core component of Apache Camel's Kafka integration that manages Kafka message consumption within Camel routes. Extending Camel's `DefaultConsumer`, it orchestrates the lifecycle and operation of multiple Kafka consumer tasks (`KafkaFetchRecords`), enabling parallel and scalable consumption of Kafka topics or topic patterns.
**Key responsibilities:**
Configuring Kafka consumer properties.
Starting and stopping the Kafka consumer fetch tasks.
Managing an executor service that runs multiple
KafkaFetchRecordsworker threads.Handling pause and resume operations on consumers.
Integrating health checks for consumer readiness monitoring.
Coordinating the lifecycle of offset repositories.
Providing hooks to attach consumer listeners and resume strategies.
This class acts as the bridge between Camel's routing and processing infrastructure and Kafka's consumer API, ensuring robust, fault-tolerant, and manageable Kafka consumption.
Class: KafkaConsumer
Declaration
public class KafkaConsumer extends DefaultConsumer
implements ResumeAware<ResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>, Suspendable
Purpose
Manages one or more Kafka consumer tasks (`KafkaFetchRecords`) to consume messages from Kafka topics. It controls the consumer lifecycle, threading, configuration, error handling integration, pause/resume functionality, and health monitoring.
Package
`org.apache.camel.component.kafka`
Fields Summary
Field Name | Type | Description |
|---|---|---|
`executor` | `ExecutorService` | Thread pool executor managing fetch tasks. |
`endpoint` | `KafkaEndpoint` | Reference to the parent Kafka endpoint configuration. |
`consumerHealthCheck` | `KafkaConsumerHealthCheck` | Health check instance for this consumer. |
`healthCheckRepository` | `HealthCheckRepository` | Registry for health checks within the Camel context. |
`tasks` | `List` | List of running Kafka fetch tasks (worker threads). |
`stopOffsetRepo` | `boolean` | Flag indicating if offset repository should be stopped. |
`resumeStrategy` | `ResumeStrategy` | Strategy to resume consumption after failures. |
`consumerListener` | `KafkaConsumerListener` | Listener for consumption and error events callbacks. |
Constructor
KafkaConsumer(KafkaEndpoint endpoint, Processor processor)
Parameters:
endpoint– The Kafka endpoint providing configuration and context.processor– CamelProcessorto process consumed Kafka messages.
Description:
Initializes theKafkaConsumerwith the given endpoint and message processor.Usage:
KafkaConsumer consumer = new KafkaConsumer(kafkaEndpoint, processor);
Methods
Configuration & Property Access
Properties getProps()
Returns:
Properties- Kafka consumer configuration properties tailored from the endpoint configuration.Description:
Creates Kafka consumer properties by merging endpoint configuration, client factory details, and setting required Kafka consumer configs such as group ID. If no group ID is configured, a random UUID is generated for uniqueness.Usage example:
Properties kafkaProps = consumer.getProps();
Lifecycle Management
void doStart() throws Exception
Overrides:
DefaultConsumer.doStart()Description:
Starts the Kafka consumer:Initializes health checks if enabled.
Starts the offset repository if required.
Creates an executor service for fetch tasks.
For each configured consumer count, creates and submits a
KafkaFetchRecordstask to poll Kafka.Validates broker addresses if pre-validation is enabled.
Supports topic patterns or explicit topic subscriptions.
Key Implementation Details:
Uses
BridgeExceptionHandlerToErrorHandlerto route exceptions from fetch tasks to Camel's error handler.Supports multiple consumers for parallel consumption.
Tasks are stored in the
taskslist to manage lifecycle.
Usage example:
consumer.doStart();
void doStop() throws Exception
Overrides:
DefaultConsumer.doStop()Description:
Stops the Kafka consumer:Logs stopping message (topic or pattern).
Stops health checks.
Signals all fetch tasks to stop gracefully.
Shuts down executor service with configured timeout.
Clears tasks list and releases executor.
Stops offset repository if it was started by this consumer.
Notes:
Ensures a graceful shutdown of all consumer threads and resources.Usage example:
consumer.doStop();
Pause and Resume
void doSuspend() throws Exception
Overrides:
DefaultConsumer.doSuspend()Description:
Pauses all running Kafka fetch tasks by invoking theirpause()methods. This halts message polling without stopping the consumer threads.Usage example:
consumer.doSuspend();
void doResume() throws Exception
Overrides:
DefaultConsumer.doResume()Description:
Resumes all paused Kafka fetch tasks by invoking theirresume()methods, resuming message polling.Usage example:
consumer.doResume();
Health Monitoring
List<TaskHealthState> healthStates()
Returns:
List ofTaskHealthStateobjects representing health of each fetch task.Description:
Aggregates health states from all running Kafka fetch tasks to provide insights into connectivity, errors, and readiness.Usage example:
List<TaskHealthState> states = consumer.healthStates();
boolean isKafkaPaused()
Returns:
trueif all fetch tasks are currently paused; otherwisefalse.Description:
Indicates whether the Kafka consumer is in a paused state.Usage example:
if (consumer.isKafkaPaused()) {
// handle paused state
}
Resume Strategy and Consumer Listener
void setResumeStrategy(ResumeStrategy resumeStrategy)
Description:
Sets the resume strategy used to manage offset recovery and rebalance behavior.
ResumeStrategy getResumeStrategy()
Returns:
The current resume strategy.
KafkaConsumerListener getConsumerListener()
Returns:
The consumer listener handling error callbacks and processing notifications.
void setConsumerListener(KafkaConsumerListener consumerListener)
Description:
Sets the consumer listener used for handling consumption events and errors.
Utility
String adapterFactoryService()
Returns:
"kafka-adapter-factory"Description:
Identifies the adapter factory service name.
Important Implementation Details
Multiple Consumer Threads:
doStart()creates multipleKafkaFetchRecordstasks according to the configured number of consumers (consumersCount). Each task runs on a separate thread, allowing parallel consumption from Kafka.Offset Repository Lifecycle:
If an offset repository is configured and not already started, the consumer will start it during startup and stop it during shutdown, ensuring proper offset management lifecycle.Pause/Resume Coordination:
The consumer delegates pause and resume commands to all fetch tasks, which manage their Kafka consumer instances accordingly. This supports fine-grained control over message consumption without full shutdown.Health Checks:
The consumer registers aKafkaConsumerHealthCheckin Camel's health check repository if enabled, providing runtime health monitoring.Error Handling:
UsesBridgeExceptionHandlerToErrorHandlerto integrate Kafka consumer exceptions with Camel's error handling mechanism.Topic Patterns:
Supports subscription to topic patterns via regex when configured (topicIsPattern), allowing dynamic topic subscriptions.Broker Address Validation:
If enabled, the consumer validates Kafka broker addresses eagerly on startup to detect misconfiguration early.
Interaction with Other Components
KafkaEndpoint:
Provides configuration details, Kafka client factory, and executor service creation.KafkaFetchRecords:
Worker tasks created and managed byKafkaConsumerresponsible for polling Kafka and processing messages.KafkaConsumerListener:
Listener interface for consumption event callbacks, error handling, and integrating pause/resume logic.ResumeStrategy:
Strategy pattern interface to manage offset resume behavior and rebalance listeners.HealthCheckRepositoryandKafkaConsumerHealthCheck:
Used for health monitoring and reporting consumer readiness status.StateRepository:
Handles offset storage and retrieval when configured.
Usage Example
// Assuming kafkaEndpoint and processor are initialized
KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaEndpoint, processor);
// Optionally set a resume strategy and consumer listener
kafkaConsumer.setResumeStrategy(myResumeStrategy);
kafkaConsumer.setConsumerListener(myConsumerListener);
// Start the consumer (starts multiple fetch tasks)
kafkaConsumer.start();
// Pause consumption (e.g., due to error)
kafkaConsumer.suspend();
// Resume consumption
kafkaConsumer.resume();
// Stop the consumer gracefully
kafkaConsumer.stop();
Visual Diagram: KafkaConsumer Class Structure
classDiagram
class KafkaConsumer {
-ExecutorService executor
-KafkaEndpoint endpoint
-KafkaConsumerHealthCheck consumerHealthCheck
-HealthCheckRepository healthCheckRepository
-List~KafkaFetchRecords~ tasks
-boolean stopOffsetRepo
-ResumeStrategy resumeStrategy
-KafkaConsumerListener consumerListener
+KafkaConsumer(KafkaEndpoint, Processor)
+void setResumeStrategy(ResumeStrategy)
+ResumeStrategy getResumeStrategy()
+KafkaConsumerListener getConsumerListener()
+void setConsumerListener(KafkaConsumerListener)
+KafkaEndpoint getEndpoint()
+Properties getProps()
+void doStart() throws Exception
+void doStop() throws Exception
+void doSuspend() throws Exception
+void doResume() throws Exception
+List~TaskHealthState~ healthStates()
+boolean isKafkaPaused()
+String adapterFactoryService()
}
KafkaConsumer --> KafkaEndpoint
KafkaConsumer --> ExecutorService
KafkaConsumer --> KafkaFetchRecords
KafkaConsumer --> ResumeStrategy
KafkaConsumer --> KafkaConsumerListener
KafkaConsumer --> KafkaConsumerHealthCheck
KafkaConsumer --> HealthCheckRepository
Summary
`KafkaConsumer.java` implements the main Kafka consumer coordination for Apache Camel Kafka component. It manages multiple consumer threads, lifecycle events, error handling integration, pause/resume control, and health checks. This class is vital for enabling scalable, resilient Kafka message consumption within Camel routes, abstracting Kafka client complexities behind a flexible and manageable interface.
Additional Notes
This class is tightly coupled with `KafkaFetchRecords` which performs the actual Kafka polling and processing. It also relies on the `KafkaEndpoint` for configuration and executor service provisioning. The design embraces multi-threading by running multiple fetch tasks and supports advanced features such as offset resume and health monitoring to ensure robust Kafka consumption in distributed integration scenarios.