KafkaConsumerAsyncManualCommitIT.java

Overview

`KafkaConsumerAsyncManualCommitIT` is an integration test class designed to verify the asynchronous manual commit functionality of Kafka consumers within the Apache Camel Kafka component. It tests how Kafka messages are consumed and committed manually in an asynchronous manner, ensuring the consumer correctly handles offset commits when manual commit is enabled, and that the consumer resumes consumption from the correct offset after stopping and restarting.

This test class focuses on the behavior of Kafka consumers configured with:

The class extends `BaseKafkaTestSupport`, leveraging Kafka test utilities and Camel testing frameworks to simulate Kafka message production and consumption.


Classes and Components

KafkaConsumerAsyncManualCommitIT

This is the sole public test class in the file, annotated with JUnit 5 annotations to order tests and manage lifecycle events.

Properties / Fields

Field Name

Type

Description

`TOPIC`

`String`

Kafka topic name used for producing and consuming messages (`"testManualCommitTest"`).

`LOG`

`Logger`

Logger instance for logging errors and info messages.

`manualCommitFactory`

`KafkaManualCommitFactory`

Bean registered as `"testFactory"` that provides asynchronous manual commit capability.

`context`

`CamelContext`

Camel runtime context used to create and control routes.

`producer`

`KafkaProducer`

Kafka producer instance used to send messages to the topic.

`failCount`

`volatile int`

Counter to track failed commit attempts during message processing.

Lifecycle Methods

Route Configuration

Tests

  1. testLastRecordBeforeCommitHeader()

    • Produces 5 messages to Kafka.

    • Expects 5 messages at the mock endpoint with the header KafkaConstants.LAST_RECORD_BEFORE_COMMIT set.

    • Asserts that the last message processed has this header set to true.

    • Verifies the asynchronous commit header functionality.

  2. kafkaManualCommit()

    • Stops the "foo" route (the main consumer).

    • Sends 3 messages to Kafka while the route is stopped.

    • Expects zero messages to be consumed during the stopped period.

    • Validates that no messages are processed when the route is inactive.

  3. testResumeFromTheRightPoint()

    • Restarts the "foo" route.

    • Expects consumption of the 3 messages sent during the stopped period.

    • Verifies that the consumer correctly resumes from the last committed offset.

    • Asserts there are zero commit failures.


Detailed Explanations

Manual Commit Factory Binding

@BindToRegistry("testFactory")
private final KafkaManualCommitFactory manualCommitFactory = new DefaultKafkaManualAsyncCommitFactory();

Route URI Construction

String uri = "kafka:" + TOPIC + "?brokers=" + service.getBootstrapServers()
             + "&groupId=KafkaConsumerAsyncManualCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
             + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory";

Aggregation and Commit Processing

This approach avoids typical concurrency exceptions that occur when manually committing offsets in an aggregator with multiple threads.

Testing Commit Behavior and Offset Management


Usage Examples

Running the Tests

These tests are executed as part of the integration test suite and require a running Kafka broker (usually managed by the test framework).

Extending the Manual Commit Factory

If you want to customize manual commit behavior, you can implement your own `KafkaManualCommitFactory` and bind it in the Camel registry similarly:

@BindToRegistry("customFactory")
private final KafkaManualCommitFactory customFactory = new MyCustomManualCommitFactory();

And reference it in the Kafka endpoint:

kafka:myTopic?brokers=localhost:9092&kafkaManualCommitFactory=#customFactory

Important Implementation Details


Interaction with Other System Components


Mermaid Class Diagram

classDiagram
    class KafkaConsumerAsyncManualCommitIT {
        - static final String TOPIC
        - static final Logger LOG
        - KafkaManualCommitFactory manualCommitFactory
        - CamelContext context
        - KafkaProducer<String,String> producer
        - volatile int failCount
        + void before()
        + void after()
        + RouteBuilder createRouteBuilder()
        + void testLastRecordBeforeCommitHeader()
        + void kafkaManualCommit()
        + void testResumeFromTheRightPoint()
    }
    KafkaConsumerAsyncManualCommitIT --|> BaseKafkaTestSupport

Summary

`KafkaConsumerAsyncManualCommitIT.java` is a focused integration test class validating asynchronous manual commit functionality in Kafka consumers within the Apache Camel framework. It configures Kafka consumers with manual commit enabled, processes messages in aggregated batches, commits offsets asynchronously, and verifies correct offset handling across route restarts. The tests provide confidence that manual commit semantics are correctly implemented and that the consumer resumes consumption from the appropriate offset, preventing message loss or duplication.