Kafka Message Consumption

This module manages the lifecycle and operation of Kafka consumers within Camel routes. It provides the infrastructure to consume messages from Kafka topics, process them safely and efficiently, manage offset commits, handle errors during consumption, and support features such as pause/resume and health monitoring. The module is designed for robustness and flexibility to fit various consumption use cases including batch or streaming processing.


Core Concepts and Purpose

The **Kafka Message Consumption** module exists to provide a seamless, reliable, and configurable Kafka consumer experience inside Apache Camel routes. It solves common challenges of Kafka consumption, such as:

At its heart, this module abstracts the Kafka consumer API into a Camel-friendly, managed service, enabling developers to easily consume Kafka messages while leveraging Camel's routing and processing capabilities.


How the Module Works

KafkaConsumer — The Consumer Coordinator

The main class is `KafkaConsumer` (in `KafkaConsumer.java`), which extends Camel's `DefaultConsumer`. It acts as the orchestrator for Kafka consumption, responsible for:

When [KafkaConsumer.doStart()](/projects/289/68528) is called, it:

  1. Initializes health checks if enabled.

  2. Starts the offset repository if configured and not already running.

  3. Creates an executor service to run fetch tasks.

  4. For the configured number of consumers (consumersCount), creates KafkaFetchRecords tasks and submits them to the executor.

Each `KafkaFetchRecords` instance runs in its own thread to poll Kafka independently, enabling parallel consumption.

KafkaFetchRecords — The Polling and Processing Task

The `KafkaFetchRecords` class (in `KafkaFetchRecords.java`) encapsulates the actual Kafka poll loop and message processing logic. Each fetch task:

The fetcher uses a lock to prevent concurrent access to the consumer instance (which is not thread safe). It also manages connection retries with exponential backoff.

Offset Management & Commit Manager

While offset commit management is handled primarily by separate commit manager classes, the `KafkaFetchRecords` task integrates the commit manager to perform commits after successful processing or during shutdown.

Pause and Resume

Pause and resume are handled cooperatively:

This design allows pausing consumption safely during runtime or via JMX/management interfaces.

Health Monitoring

`KafkaConsumer` integrates with Camel's health check system by registering a [KafkaConsumerHealthCheck](/projects/289/68544) instance. This health check aggregates states from all fetch tasks, indicating readiness based on Kafka client connection health, errors, and recoverability.

The fetch tasks expose health states including connectivity, termination, recoverability, last errors, and client IDs. The health check reports "DOWN" if any task is not ready, including diagnostic details.

Error Handling

The `KafkaFetchRecords` fetch task uses a [PollExceptionStrategy](/projects/289/68529) to determine how to handle exceptions during polling. Strategies include retrying, reconnecting, stopping, discarding, or bridging to Camel's error handler. This strategy is chosen based on configuration or component defaults, encapsulated in `KafkaErrorStrategies`.


Interactions with Other Modules and Files

The fetch tasks are created and managed by `KafkaConsumer` and rely on the endpoint's configuration for properties such as topics, concurrency, commit strategy, and error strategies.


Important Concepts and Design Patterns


Code Illustrations

Starting Multiple Fetch Tasks in KafkaConsumer

for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
    KafkaFetchRecords task = new KafkaFetchRecords(
        this, bridge, topic, pattern, Integer.toString(i), getProps(), consumerListener);
    executor.submit(task);
    tasks.add(task);
}

This code creates and starts multiple fetch tasks to consume messages in parallel.

Poll Loop with Error Handling in KafkaFetchRecords

while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
    ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
    ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
    if (result.isBreakOnErrorHit()) {
        setReconnect(true);
        setConnected(false);
    }
}

This loop polls Kafka, processes records, and handles error conditions by triggering reconnects.

Managing Pause and Resume States

switch (state.get()) {
    case PAUSE_REQUESTED:
        consumer.pause(consumer.assignment());
        state.set(State.PAUSED);
        break;
    case RESUME_REQUESTED:
        consumer.resume(consumer.assignment());
        state.set(State.RUNNING);
        break;
}

This snippet shows how the fetch task reacts to pause or resume requests by invoking Kafka consumer API accordingly.


Visual Diagram: Kafka Consumer Message Processing Workflow

flowchart TD
    Start[Start Kafka Consumer] --> CreateConsumer[Create Kafka Consumer Instance]
    CreateConsumer --> Subscribe[Subscribe to Topics]
    Subscribe --> PollLoop[Poll Kafka Broker]
    PollLoop --> Consume[Process ConsumerRecords]
    Consume --> Commit[Commit Offsets]
    Consume --> ErrorHandler[Handle Processing Errors]
    ErrorHandler --> Decision{Can Continue?}
    Decision -- Yes --> PollLoop
    Decision -- No --> Terminate[Terminate Consumer Thread]
    Commit --> PollLoop
    Terminate --> CloseConsumer[Close Kafka Consumer]

This flowchart outlines the lifecycle of a fetch task, from creation to polling, processing, error handling, and termination.


Summary

The Kafka Message Consumption module provides a robust, multi-threaded Kafka consumer implementation integrated with Camel. It manages consumer creation, topic subscription, message polling, processing, offset commits, pause/resume, error handling, and health monitoring. It balances flexibility and safety by using design patterns such as strategy and state machine, and integrates smoothly with Camel's ecosystem and monitoring tools.