Kafka Message Consumption
This module manages the lifecycle and operation of Kafka consumers within Camel routes. It provides the infrastructure to consume messages from Kafka topics, process them safely and efficiently, manage offset commits, handle errors during consumption, and support features such as pause/resume and health monitoring. The module is designed for robustness and flexibility to fit various consumption use cases including batch or streaming processing.
Core Concepts and Purpose
The **Kafka Message Consumption** module exists to provide a seamless, reliable, and configurable Kafka consumer experience inside Apache Camel routes. It solves common challenges of Kafka consumption, such as:
Managing multiple concurrent fetch tasks for scalability.
Handling offset commits with various strategies.
Supporting graceful shutdown, pause, and resume of consumers.
Providing health checks for consumer readiness.
Managing error handling during polling and processing.
Supporting dynamic topic subscriptions via patterns.
Integrating with resume strategies to allow fault-tolerant offset state recovery.
At its heart, this module abstracts the Kafka consumer API into a Camel-friendly, managed service, enabling developers to easily consume Kafka messages while leveraging Camel's routing and processing capabilities.
How the Module Works
KafkaConsumer — The Consumer Coordinator
The main class is `KafkaConsumer` (in `KafkaConsumer.java`), which extends Camel's `DefaultConsumer`. It acts as the orchestrator for Kafka consumption, responsible for:
Starting and stopping the consumer lifecycle.
Creating and managing a thread pool (
ExecutorService) of fetch tasks (KafkaFetchRecords).Configuring Kafka consumer properties.
Handling pause and resume requests.
Integrating health checks and consumer listeners.
Coordinating offset repository lifecycle if used.
When [KafkaConsumer.doStart()](/projects/289/68528) is called, it:
Initializes health checks if enabled.
Starts the offset repository if configured and not already running.
Creates an executor service to run fetch tasks.
For the configured number of consumers (
consumersCount), createsKafkaFetchRecordstasks and submits them to the executor.
Each `KafkaFetchRecords` instance runs in its own thread to poll Kafka independently, enabling parallel consumption.
KafkaFetchRecords — The Polling and Processing Task
The `KafkaFetchRecords` class (in `KafkaFetchRecords.java`) encapsulates the actual Kafka poll loop and message processing logic. Each fetch task:
Creates and subscribes a Kafka consumer instance.
Uses a PollExceptionStrategy to handle errors during polling.
Polls Kafka in a loop, with configurable timeout.
Processes returned records using a KafkaRecordProcessorFacade which implements either batch or streaming processing depending on configuration.
Manages offset commits via a
CommitManager.Supports consumer pause and resume states safely without concurrency issues.
Collects metrics for dev console integration.
Handles rebalance events via
ConsumerRebalanceListenerimplementations (ClassicRebalanceListenerorResumeRebalanceListener).
The fetcher uses a lock to prevent concurrent access to the consumer instance (which is not thread safe). It also manages connection retries with exponential backoff.
Offset Management & Commit Manager
While offset commit management is handled primarily by separate commit manager classes, the `KafkaFetchRecords` task integrates the commit manager to perform commits after successful processing or during shutdown.
Pause and Resume
Pause and resume are handled cooperatively:
KafkaConsumerexposes doSuspend() anddoResume()which invoke pause/resume on all fetch tasks.Each
KafkaFetchRecordstracks its state with an internal enum (RUNNING,PAUSE_REQUESTED,PAUSED,RESUME_REQUESTED).State transitions cause the Kafka consumer to be paused or resumed on its assigned partitions.
The
PausePreservingRebalanceListenerensures that if a rebalance occurs while paused, the paused state is preserved.
This design allows pausing consumption safely during runtime or via JMX/management interfaces.
Health Monitoring
`KafkaConsumer` integrates with Camel's health check system by registering a [KafkaConsumerHealthCheck](/projects/289/68544) instance. This health check aggregates states from all fetch tasks, indicating readiness based on Kafka client connection health, errors, and recoverability.
The fetch tasks expose health states including connectivity, termination, recoverability, last errors, and client IDs. The health check reports "DOWN" if any task is not ready, including diagnostic details.
Error Handling
The `KafkaFetchRecords` fetch task uses a [PollExceptionStrategy](/projects/289/68529) to determine how to handle exceptions during polling. Strategies include retrying, reconnecting, stopping, discarding, or bridging to Camel's error handler. This strategy is chosen based on configuration or component defaults, encapsulated in `KafkaErrorStrategies`.
Interactions with Other Modules and Files
KafkaEndpoint: Provides configuration and client factory for consumer creation.
CommitManager: Used by fetch tasks to handle offset commits.
ResumeStrategy: If configured, the fetch task uses a resume-aware rebalance listener to manage offsets stored externally.
KafkaConsumerListener: Used for error handling callbacks after consume/process events.
HealthCheckRepository: KafkaConsumer registers health checks here.
Dev Console Metrics: The fetch task collects consumption metrics for development and monitoring console.
The fetch tasks are created and managed by `KafkaConsumer` and rely on the endpoint's configuration for properties such as topics, concurrency, commit strategy, and error strategies.
Important Concepts and Design Patterns
Thread-per-Consumer Model: Multiple
KafkaFetchRecordsthreads allow parallel consumption and increase throughput.State Machine for Pause/Resume: Internal state enum avoids race conditions and ensures safe consumer pause/resume transitions.
Retry and Backoff Using ForegroundTask: Creation and subscription of Kafka consumers use retry loops with backoff to handle transient failures.
Strategy Pattern for Error Handling: PollExceptionStrategy implementations allow configurable, pluggable error recovery behaviors.
Listener Decorator Pattern:
PausePreservingRebalanceListenerwraps another rebalance listener to add pause state preservation.Health Check Integration: Implements health check interface for readiness reporting, enhancing observability.
Locking for Thread Safety: Uses a
ReentrantLockto serialize access to Kafka consumer instance within the poll loop.
Code Illustrations
Starting Multiple Fetch Tasks in KafkaConsumer
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
KafkaFetchRecords task = new KafkaFetchRecords(
this, bridge, topic, pattern, Integer.toString(i), getProps(), consumerListener);
executor.submit(task);
tasks.add(task);
}
This code creates and starts multiple fetch tasks to consume messages in parallel.
Poll Loop with Error Handling in KafkaFetchRecords
while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
if (result.isBreakOnErrorHit()) {
setReconnect(true);
setConnected(false);
}
}
This loop polls Kafka, processes records, and handles error conditions by triggering reconnects.
Managing Pause and Resume States
switch (state.get()) {
case PAUSE_REQUESTED:
consumer.pause(consumer.assignment());
state.set(State.PAUSED);
break;
case RESUME_REQUESTED:
consumer.resume(consumer.assignment());
state.set(State.RUNNING);
break;
}
This snippet shows how the fetch task reacts to pause or resume requests by invoking Kafka consumer API accordingly.
Visual Diagram: Kafka Consumer Message Processing Workflow
flowchart TD
Start[Start Kafka Consumer] --> CreateConsumer[Create Kafka Consumer Instance]
CreateConsumer --> Subscribe[Subscribe to Topics]
Subscribe --> PollLoop[Poll Kafka Broker]
PollLoop --> Consume[Process ConsumerRecords]
Consume --> Commit[Commit Offsets]
Consume --> ErrorHandler[Handle Processing Errors]
ErrorHandler --> Decision{Can Continue?}
Decision -- Yes --> PollLoop
Decision -- No --> Terminate[Terminate Consumer Thread]
Commit --> PollLoop
Terminate --> CloseConsumer[Close Kafka Consumer]
This flowchart outlines the lifecycle of a fetch task, from creation to polling, processing, error handling, and termination.
Summary
The Kafka Message Consumption module provides a robust, multi-threaded Kafka consumer implementation integrated with Camel. It manages consumer creation, topic subscription, message polling, processing, offset commits, pause/resume, error handling, and health monitoring. It balances flexibility and safety by using design patterns such as strategy and state machine, and integrates smoothly with Camel's ecosystem and monitoring tools.