KafkaConsumerFullIT.java
Overview
`KafkaConsumerFullIT.java` is an integration test class designed to validate the full lifecycle and advanced features of the Apache Camel Kafka consumer component within a real Kafka environment. It performs end-to-end testing by producing messages to a Kafka topic and asserting that these messages are correctly consumed, processed, and routed by Camel routes configured with Kafka consumers.
The class tests various Kafka consumer behaviors such as:
Consuming messages with header propagation control.
Ensuring Kafka record-specific headers are not overwritten.
Seeking to the beginning or end of Kafka topics.
Suspending and resuming routes with Kafka consumer pause/resume semantics.
Overriding the default Kafka header deserializer.
It uses JUnit 5 for test structuring and assertions, Apache Camel's testing framework for route and mock endpoint management, and Kafka's Java client APIs for producing messages and managing topics.
Package and Imports
Package:
org.apache.camel.component.kafka.integrationKey dependencies:
Apache Camel components (
kafka,mock,test.infra, etc.)Kafka client APIs (
KafkaProducer,ProducerRecord,DeleteTopicsResult)Testing and concurrency utilities (
JUnit 5,Awaitility,SLF4J)
Class: KafkaConsumerFullIT
Description
`KafkaConsumerFullIT` extends [BaseKafkaTestSupport](/projects/289/68683) (presumably a base class providing common Kafka test utilities) and implements an extensive suite of integration tests to verify the correctness and robustness of Kafka consumer behavior when integrated with Apache Camel routes.
Constants
Name | Description |
|---|---|
`TOPIC` | Unique Kafka topic name used for all tests, generated with a UUID to avoid clashes. |
`ROUTE` | Unique Camel route ID, also generated with UUID. |
`FROM_URI` | Kafka endpoint URI configured with group ID, deserializers, auto-commit settings, polling timeouts, and an interceptor class for capturing records. |
Logger
LOG: SLF4J Logger for logging test execution details.
Fields
Name | Type | Description |
|---|---|---|
`producer` | Kafka producer instance used to send test messages. | |
`bean` | `MyKafkaHeaderDeserializer` (inner class) | Custom Kafka header deserializer bound to Camel registry for testing header deserializer override. |
Lifecycle Methods
@BeforeEach void before()
Prepares the Kafka producer before each test using default properties.
Clears the static record capture list in the
MockConsumerInterceptor.
@AfterEach void after()
Closes the Kafka producer.
Deletes the test Kafka topic to clean up after each test.
Waits up to 60 seconds to ensure topic deletion completes before proceeding.
Route Setup
@RouteFixture void createRouteBuilder(CamelContext context)
Registers a Camel route builder in the test Camel context.
RouteBuilder createRouteBuilder()
Defines the Camel route used in tests:
Source: Kafka endpoint configured by
FROM_URI.Processor: Logs consumed messages at trace level.
Destination: Routes messages to a mock endpoint
KafkaTestUtil.MOCK_RESULTfor assertion.
Tests
1. kafkaMessageIsConsumedByCamelSeekedToBeginning()
Order: 1
Description: Tests that messages produced to the Kafka topic are consumed by Camel when the consumer is configured to seek to the beginning.
Behavior:
Produces 5 messages.
Asserts consumption of all 5 messages.
Stops and restarts the route with SeekPolicy.BEGINNING configured.
Verifies all messages are re-consumed.
Usage Example:
// Producing messages
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>(TOPIC, "1", "message-" + i));
}
// Assertions handled by MockEndpoint
2. kafkaRecordSpecificHeadersAreNotOverwritten()
Order: 2
Description: Validates that Kafka record-specific headers (e.g.,
kafka.TOPIC) are preserved and not overwritten by Camel's Kafka consumer.Behavior:
Produces a message with a header
kafka.TOPICset incorrectly.Asserts the actual Kafka topic header is used in the Camel exchange.
Important: Confirms Kafka headers integrity during header propagation.
3. kafkaMessageIsConsumedByCamel()
Order: 3
Description: Tests that messages sent to Kafka are consumed by Camel and headers are propagated correctly according to interceptor rules.
Behavior:
Produces 5 messages with two types of headers: one to be propagated and one to be skipped.
Asserts all messages arrive at the mock endpoint with the propagated header present and skipped header absent.
Verifies the interceptor captured all records.
Highlights:
Demonstrates header filtering during consumption.
Shows use of
MockConsumerInterceptorto validate records.
4. kafkaMessageIsConsumedByCamelSeekedToEnd()
Order: 4
Description: Tests seeking to the end of the topic to avoid re-processing old messages.
Behavior:
Produces 5 messages and consumes them.
Restarts the route with
SeekPolicy.END.Asserts no messages are re-consumed on restart.
5. headerDeserializerCouldBeOverridden()
Order: 5
Description: Checks that the Kafka header deserializer can be overridden by providing a custom deserializer bean.
Behavior:
Retrieves Kafka endpoint with overridden header deserializer.
Asserts the deserializer instance is of the expected custom type.
6. kafkaMessageIsConsumedByCamelAfterSuspendResume()
Order: 6
Description: Validates that suspending and resuming a Camel route correctly pauses and resumes the Kafka consumer.
Behavior:
Produces 5 messages and consumes them.
Suspends the route and waits until the Kafka consumer is paused.
Resumes the route.
Produces 3 more messages and asserts they are consumed.
Important: Uses Awaitility to wait for consumer pause state.
Inner Classes
MyKafkaHeaderDeserializer
Extends
DefaultKafkaHeaderDeserializer.Used to test overriding the default header deserializer in Kafka endpoint configuration.
Important Implementation Details
Unique Topic and Route Names: To avoid clashes with parallel or previous test runs, topic and route names are generated dynamically using Kafka's
Uuid.randomUuid().Use of MockConsumerInterceptor: A custom Kafka consumer interceptor is used to capture and assert the Kafka records received by the consumer.
Seek Policy Testing: Tests demonstrate how changing the
SeekPolicyconfiguration influences Kafka consumer behavior to re-consume or skip old messages.Header Propagation: Tests validate selective propagation of Kafka record headers and ensure Kafka-specific headers are preserved.
Route Lifecycle Management: Tests include stopping, starting, suspending, and resuming Camel routes to validate Kafka consumer state transitions.
Topic Cleanup: After each test, the Kafka topic is deleted to prevent interference between tests, with Awaitility ensuring deletion completion.
Interaction with Other System Components
Apache Camel: This test class exercises Camel routes consuming from Kafka endpoints.
Kafka Broker: Produces and consumes real Kafka messages on dynamically created topics.
Kafka Admin Client: Used to delete topics after tests.
Mock Endpoint (
KafkaTestUtil.MOCK_RESULT): Captures messages for assertions.MockConsumerInterceptor: Kafka interceptor class for capturing and asserting consumed records.
Camel Registry: Registers custom header deserializer bean for override testing.
Usage Example of Test Setup
// Producing messages
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "message-" + i);
record.headers().add(new RecordHeader("PropagatedCustomHeader", "value".getBytes()));
producer.send(record);
}
// Asserting via Camel Mock Endpoint
MockEndpoint mock = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
mock.expectedMessageCount(5);
mock.assertIsSatisfied(3000);
Visual Diagram: Class Structure
classDiagram
class KafkaConsumerFullIT {
- static final String TOPIC
- static final String ROUTE
- static final String FROM_URI
- Logger LOG
- KafkaProducer<String,String> producer
- MyKafkaHeaderDeserializer bean
+ void before()
+ void after()
+ void createRouteBuilder(CamelContext)
+ RouteBuilder createRouteBuilder()
+ void kafkaMessageIsConsumedByCamelSeekedToBeginning()
+ void kafkaRecordSpecificHeadersAreNotOverwritten()
+ void kafkaMessageIsConsumedByCamel()
+ void kafkaMessageIsConsumedByCamelSeekedToEnd()
+ void headerDeserializerCouldBeOverridden()
+ void kafkaMessageIsConsumedByCamelAfterSuspendResume()
}
class MyKafkaHeaderDeserializer {
}
KafkaConsumerFullIT --> MyKafkaHeaderDeserializer
Summary
`KafkaConsumerFullIT.java` is a comprehensive integration test class that verifies multiple aspects of Kafka consumer integration with Apache Camel routes, focusing on message consumption correctness, header propagation, consumer seek semantics, and lifecycle management. It leverages Kafka's actual broker environment and Camel's testing framework to ensure the Kafka component behaves reliably in complex scenarios, helping maintain the robustness of the Camel Kafka component.
This file plays a critical role in the system by providing automated validation for Kafka consumer features, preventing regressions and ensuring smooth integration within larger applications that rely on Apache Camel and Kafka.