KafkaConsumerAsyncManualCommitIT.java
Overview
`KafkaConsumerAsyncManualCommitIT` is an integration test class designed to verify the asynchronous manual commit functionality of Kafka consumers within the Apache Camel Kafka component. It tests how Kafka messages are consumed and committed manually in an asynchronous manner, ensuring the consumer correctly handles offset commits when manual commit is enabled, and that the consumer resumes consumption from the correct offset after stopping and restarting.
This test class focuses on the behavior of Kafka consumers configured with:
Manual asynchronous offset commits (via a custom
KafkaManualCommitFactory).Aggregation of messages with asynchronous manual commits.
Verification of message headers related to commit behavior.
Proper offset management on consumer restart.
The class extends `BaseKafkaTestSupport`, leveraging Kafka test utilities and Camel testing frameworks to simulate Kafka message production and consumption.
Classes and Components
KafkaConsumerAsyncManualCommitIT
This is the sole public test class in the file, annotated with JUnit 5 annotations to order tests and manage lifecycle events.
Properties / Fields
Field Name | Type | Description |
|---|---|---|
`TOPIC` | `String` | Kafka topic name used for producing and consuming messages (`"testManualCommitTest"`). |
`LOG` | `Logger` | Logger instance for logging errors and info messages. |
`manualCommitFactory` | `KafkaManualCommitFactory` | Bean registered as `"testFactory"` that provides asynchronous manual commit capability. |
`context` | `CamelContext` | Camel runtime context used to create and control routes. |
`producer` | `KafkaProducer` | Kafka producer instance used to send messages to the topic. |
`failCount` | `volatile int` | Counter to track failed commit attempts during message processing. |
Lifecycle Methods
@BeforeEach void before():
Initializes the Kafka producer before each test by loading properties from KafkaTestUtil and creating a new producer instance.@AfterEach void after():
Closes the Kafka producer after each test to release resources.
Route Configuration
protected RouteBuilder createRouteBuilder():
Defines two routes using Apache Camel'sRouteBuilder:Consumer Route ("foo")
Consumes messages from the Kafka topictestManualCommitTestwith:Manual offset commit (
autoCommitEnable=false).Async manual commit factory (
kafkaManualCommitFactory=#testFactory).Poll timeout of 1000ms.
Group id:
"KafkaConsumerAsyncManualCommitIT".Offset reset policy: earliest.
Messages are sent to `"direct:aggregate"` for further processing.
Aggregator Route ("aggregate")
Consumes from"direct:aggregate"and processes messages as follows:Sends messages to a mock endpoint (
KafkaTestUtil.MOCK_RESULT) for assertions.Aggregates all exchanges with a constant key
true.Uses completion timeout of 1 millisecond to batch messages quickly.
Aggregation strategy groups exchanges (
AggregationStrategies.groupedExchange()).Splits the aggregated message body (a list of exchanges) to process each exchange individually.
For each exchange, retrieves the
KafkaManualCommitinstance from the message header.Calls
manual.commit()asynchronously to commit the offset.Logs and increments
failCounton commit failure.
**Note:** The route warns that records from the same partition must be processed by a single thread to avoid concurrency issues.
Disabled Consumer Route ("bar")
Consumes from the same Kafka topic but is configured withautoStartup=false. Sends messages to a separate mock endpoint (KafkaTestUtil.MOCK_RESULT_BAR).
Tests
testLastRecordBeforeCommitHeader()Produces 5 messages to Kafka.
Expects 5 messages at the mock endpoint with the header KafkaConstants.LAST_RECORD_BEFORE_COMMIT set.
Asserts that the last message processed has this header set to
true.Verifies the asynchronous commit header functionality.
kafkaManualCommit()Stops the "foo" route (the main consumer).
Sends 3 messages to Kafka while the route is stopped.
Expects zero messages to be consumed during the stopped period.
Validates that no messages are processed when the route is inactive.
testResumeFromTheRightPoint()Restarts the "foo" route.
Expects consumption of the 3 messages sent during the stopped period.
Verifies that the consumer correctly resumes from the last committed offset.
Asserts there are zero commit failures.
Detailed Explanations
Manual Commit Factory Binding
@BindToRegistry("testFactory")
private final KafkaManualCommitFactory manualCommitFactory = new DefaultKafkaManualAsyncCommitFactory();
Registers a custom manual commit factory bean in Camel's registry.
The factory enables asynchronous manual commits on the Kafka consumer.
This factory is referenced in the Kafka endpoint URI using
kafkaManualCommitFactory=#testFactory.
Route URI Construction
String uri = "kafka:" + TOPIC + "?brokers=" + service.getBootstrapServers()
+ "&groupId=KafkaConsumerAsyncManualCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
+ "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory";
Connects to the Kafka broker(s) defined by
service.getBootstrapServers().Uses group id for consumer group management.
Disables automatic commits to allow manual offset control.
Enables manual commit support.
Specifies the custom asynchronous manual commit factory.
Ensures the consumer starts reading from the earliest offset if no offset is committed yet.
Aggregation and Commit Processing
Messages are aggregated with a minimal timeout (
completionTimeout(1)) to batch process quickly.Aggregated messages are split back into individual exchanges.
For each individual message:
The
KafkaManualCommitinstance is retrieved from the exchange header.The manual commit is triggered asynchronously using
manual.commit().Commit failures are logged and counted.
This approach avoids typical concurrency exceptions that occur when manually committing offsets in an aggregator with multiple threads.
Testing Commit Behavior and Offset Management
The
testLastRecordBeforeCommitHeaderverifies the presence of theLAST_RECORD_BEFORE_COMMITheader for each message, confirming the Kafka component correctly marks the last record before a commit.The
kafkaManualCommittest stops the consumer route and verifies no messages are consumed during the downtime.The
testResumeFromTheRightPointtest restarts the route and verifies it resumes consumption from the last committed offset, ensuring no message loss or duplication.
Usage Examples
Running the Tests
These tests are executed as part of the integration test suite and require a running Kafka broker (usually managed by the test framework).
Produce messages to the topic
testManualCommitTest.The consumer route consumes and processes these messages asynchronously.
The tests verify correct manual commit behavior and offset management.
Extending the Manual Commit Factory
If you want to customize manual commit behavior, you can implement your own `KafkaManualCommitFactory` and bind it in the Camel registry similarly:
@BindToRegistry("customFactory")
private final KafkaManualCommitFactory customFactory = new MyCustomManualCommitFactory();
And reference it in the Kafka endpoint:
kafka:myTopic?brokers=localhost:9092&kafkaManualCommitFactory=#customFactory
Important Implementation Details
Asynchronous Manual Commit:
UtilizesDefaultKafkaManualAsyncCommitFactoryto allow commits to happen asynchronously, avoiding blocking operations in the route.Aggregation Strategy:
UsesAggregationStrategies.groupedExchange()to collect messages into a list before splitting them to commit offsets individually. The completion timeout ensures periodic commit calls.Thread Safety Warning:
The comment in the route warns that Kafka partitions must be processed by a single thread to avoid concurrent modification issues when committing offsets manually.Mock Endpoints:
The tests use CamelMockEndpointto assert expected message counts and message contents, facilitating controlled integration testing.
Interaction with Other System Components
Kafka Broker:
The class interacts directly with Kafka brokers for producing and consuming messages.Camel Kafka Component:
Uses Camel's Kafka component to create Kafka consumers and producers, leveraging Camel routing and processing capabilities.Camel Context and Registry:
Registers the manual commit factory within Camel's registry and uses the Camel context to define and control routes.JUnit 5 Testing Framework:
Uses JUnit 5 annotations for lifecycle management, test ordering, and assertions.Awaitility:
Used to wait asynchronously until assertions on mock endpoints are satisfied.BaseKafkaTestSupport:
Inherits from a base integration test support class that likely provides Kafka embedded services and utility methods.
Mermaid Class Diagram
classDiagram
class KafkaConsumerAsyncManualCommitIT {
- static final String TOPIC
- static final Logger LOG
- KafkaManualCommitFactory manualCommitFactory
- CamelContext context
- KafkaProducer<String,String> producer
- volatile int failCount
+ void before()
+ void after()
+ RouteBuilder createRouteBuilder()
+ void testLastRecordBeforeCommitHeader()
+ void kafkaManualCommit()
+ void testResumeFromTheRightPoint()
}
KafkaConsumerAsyncManualCommitIT --|> BaseKafkaTestSupport
Summary
`KafkaConsumerAsyncManualCommitIT.java` is a focused integration test class validating asynchronous manual commit functionality in Kafka consumers within the Apache Camel framework. It configures Kafka consumers with manual commit enabled, processes messages in aggregated batches, commits offsets asynchronously, and verifies correct offset handling across route restarts. The tests provide confidence that manual commit semantics are correctly implemented and that the consumer resumes consumption from the appropriate offset, preventing message loss or duplication.