KafkaTransactionIT.java
Overview
`KafkaTransactionIT.java` is an integration test class for validating Kafka transactional message production and consumption within the Apache Camel Kafka component. It ensures that transactional guarantees and concurrency aspects are correctly handled when sending messages to Kafka topics using Camel routes configured for transactions.
The class extends `BaseKafkaTestSupport`, leveraging Camel's testing framework and Kafka clients to:
Test sequential transactional message production with failure scenarios.
Test concurrent transactional message production across multiple threads.
Verify message delivery and metadata correctness from Kafka topics.
Clean up Kafka topics after tests.
This file plays a critical role in assuring the reliability and correctness of transactional messaging in the Camel Kafka integration, especially under concurrent load conditions.
Detailed Explanation
Class: KafkaTransactionIT
Integration test class for Kafka transactional messaging.
Constants:
Name
Description
`SEQUENTIAL_TRANSACTION_URI`
Camel route URI for sequential transactions (`direct:startTransaction`)
`CONCURRENT_TRANSACTION_URI`
Camel route URI for concurrent transactions (`seda:startTransaction`)
`TOPIC_TRANSACTION`
Kafka topic name for sequential transactions (`transaction`)
`TOPIC_CONCURRENCY_TRANSACTION`
Kafka topic name for concurrent transactions (`concurrency_transaction`)
`THREAD_NUM`
Number of threads for concurrency test (5)
`stringsConsumerConn`
KafkaConsumer instance shared for tests
Lifecycle Methods
@BeforeAll public static void before()
Initializes the Kafka consumer
stringsConsumerConnwith a consumer group"DemoTransaction".Called once before all tests.
@AfterAll public static void after()
Deletes the Kafka topics used in tests (
transactionandconcurrency_transaction) via the Kafka admin client to clean up test artifacts.Called once after all tests.
Overridden Methods
protected RouteBuilder createRouteBuilder()
Defines two Camel routes for testing transactional Kafka producers:
Sequential Transaction Route (
direct:startTransaction)Sends messages to topic
transactionwith Kafka producer properties:transactional.id=1234enable.idempotence=trueretries=5requestRequiredAcks=-1(all replicas must acknowledge)
After producing the message, a processor checks the message body:
If the body contains
"fail", it throws aRuntimeExceptionto simulate failure and trigger transaction rollback.
The route ends with sending the exchange to a mock endpoint for result verification.
Concurrent Transaction Route (
seda:startTransaction)Sends messages to topic
concurrency_transactionwith Kafka producer properties:transactional.id=5678enable.idempotence=trueretries=5synchronous=true(ensures producer sends records synchronously)
No additional processing or failure simulation here.
Test Methods
public void concurrencyProducedTransactionMessage() throws InterruptedException
Tests concurrent production of transactional messages.
Spawns 5 threads (defined by
THREAD_NUM).Each thread sends 5 messages (
messageInTopic) to the concurrent transaction route (seda:startTransaction).Uses a
CountDownLatchinitialized to 25 (5 threads * 5 messages) to track consumed messages.Waits for all threads to finish.
Polls Kafka topic
concurrency_transactionconsuming messages and counting down the latch.Asserts that all messages were received within 200 milliseconds.
**Usage Example:**
concurrencyProducedTransactionMessage();
This will verify that concurrent transactional message production and consumption works without message loss.
public void producedTransactionMassageIsReceivedByKafka() throws InterruptedException
Tests sequential transactional message production with simulated failure.
Sends 10 successful messages to
direct:startTransaction.Attempts to send 10 messages with
"fail"in their body, expecting aRuntimeExceptionto be thrown due to the processor logic.Consumes 10 successful messages from topic
transaction.Verifies all messages arrived.
Checks the mock endpoint to assert:
Exactly 10 exchanges were received.
Each exchange contains exactly one
RecordMetadataheader.Offset is non-negative.
Topic name starts with
"transaction".
**Usage Example:**
producedTransactionMassageIsReceivedByKafka();
This test validates transactional rollback behavior on failure and confirms metadata properties of produced messages.
Private Helper Methods
private static KafkaConsumer<String, String> createStringKafkaConsumer(final String groupId)
Creates and configures a Kafka consumer for string key/value deserialization.
Parameters:
groupId: Kafka consumer group id.
Returns: Configured
KafkaConsumer<String, String>instance.
**Kafka consumer configuration includes:**
Bootstrap servers from the test environment.
Auto commit enabled with 1-second interval.
Session timeout 30 seconds.
Auto offset reset to earliest.
Deserializers for String keys and values.
private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn, String topic, CountDownLatch messagesLatch)
Consumes messages from Kafka topic until the `messagesLatch` count reaches zero.
Parameters:
consumerConn: Kafka consumer instance to use.topic: Kafka topic to subscribe to.messagesLatch: CountDownLatch to decrement for each consumed message.
Subscribes consumer to the topic.
Polls Kafka every 100ms.
For each record, decreases the latch count.
Stops when latch reaches zero.
Implementation Details and Algorithms
Transactional Messaging:
The routes use Kafka producer transactional features configured viatransactional.idandenable.idempotence. This ensures atomicity of message batches and exactly-once delivery semantics.Failure Simulation:
The sequential transaction route deliberately throws an exception when message bodies contain"fail", testing Kafka transaction rollback capabilities.Concurrency Handling:
The concurrency test uses multiple threads sending messages simultaneously to the same topic, ensuring the Kafka producer handles transactions correctly in a multi-threaded environment.Kafka Consumer Polling:
A simple polling loop with aCountDownLatchsynchronizes the test to wait for all expected messages.
Interaction with Other System Components
Apache Camel:
Defines routes to interact with Kafka component and process messages.Kafka Cluster:
Produces and consumes messages for transactional topics.Test Utilities:
UsesTestProducerUtilto send messages via Camel'sProducerTemplate.Mock Endpoint:
Uses Camel'sMockEndpointto verify message exchanges and Kafka metadata.Kafka Admin Client:
Used to delete test topics after execution.BaseKafkaTestSupport:
Provides Kafka test configuration and context.
Visual Diagram
classDiagram
class KafkaTransactionIT {
+static final String SEQUENTIAL_TRANSACTION_URI
+static final String CONCURRENT_TRANSACTION_URI
+static final String TOPIC_TRANSACTION
+static final String TOPIC_CONCURRENCY_TRANSACTION
+static final int THREAD_NUM
-static KafkaConsumer<String, String> stringsConsumerConn
+static void before()
+static void after()
+RouteBuilder createRouteBuilder()
+void concurrencyProducedTransactionMessage() throws InterruptedException
+void producedTransactionMassageIsReceivedByKafka() throws InterruptedException
-static KafkaConsumer<String, String> createStringKafkaConsumer(String groupId)
-void createKafkaMessageConsumer(KafkaConsumer<String,String>, String, CountDownLatch)
}
KafkaTransactionIT ..> RouteBuilder : creates
KafkaTransactionIT ..> KafkaConsumer : uses
KafkaTransactionIT ..> ProducerTemplate : uses
KafkaTransactionIT ..> MockEndpoint : verifies
Summary
`KafkaTransactionIT.java` is a critical integration test class validating Kafka transactional producer functionality within Apache Camel. It covers sequential and concurrent transaction scenarios, including failure handling and metadata verification. The file combines Camel route configuration, Kafka consumer polling, and multithreaded test execution to ensure reliable and correct transactional message processing. It interacts with Kafka, Camel testing utilities, and Kafka admin clients to manage test lifecycle and resource cleanup.