KafkaProducerMultipleMessagesInTransactionWithLoopTest.java
Overview
`KafkaProducerMultipleMessagesInTransactionWithLoopTest.java` is a JUnit 5 test class designed to verify the behavior of producing multiple messages to an Apache Kafka topic within a transactional context using Apache Camel routes. The class tests the ability to send a batch of messages inside a Kafka transaction in a loop, ensuring either all messages are committed together or none are committed if an exception occurs.
This test class uses a mocked Kafka producer (`MockProducer`) to simulate Kafka interactions without requiring a real Kafka cluster. The tests focus on:
Successful sending of multiple messages within a single transaction.
Correct rollback behavior when an exception is thrown during message processing in the loop.
By doing so, it validates transactional integrity and error handling when using Camel Kafka component with Kafka transactions enabled.
Detailed Explanation
Class: KafkaProducerMultipleMessagesInTransactionWithLoopTest
This class extends `CamelTestSupport`, leveraging Apache Camel's testing framework for setting up Camel contexts, routes, and endpoints.
Fields
MockEndpoint doneEndpointInjected mock endpoint (
mock:done) used to assert completion of the route.
MockProducer<String, String> mockProducerA Kafka
MockProducerinstance that simulates Kafka producer behavior. It is set up to be transactional (trueargument) and usesStringSerializerfor both keys and values.
Methods
protected CamelContext createCamelContext() throws Exception
Overrides the default Camel context creation to:
Create a mock KafkaClientFactory that returns the
mockProducerinstance when itsgetProducer()method is called.Configure the Kafka component with:
Brokers
"broker1:1234,broker2:4567".Record metadata enabled (
setRecordMetadata(true)).The mocked KafkaClientFactory to avoid real Kafka connections.
Add the customized Kafka component to the Camel context.
**Usage:**
This method prepares the Camel context to use a mocked Kafka producer for testing Kafka-related routes without an actual Kafka cluster.
public void test01_HappyLoopPath() throws Exception
**Purpose:**
Tests the normal flow where multiple messages are sent in a loop, all within a Kafka transaction.
**Steps:**
Sends an integer
messageCount(5) as the body to thedirect:loopCamel route.Expects the
mock:doneendpoint to receive exactly one message indicating route completion.Asserts that:
The
mockProducerhas recorded exactly 5 messages in its history.The Kafka transaction commit count is exactly 1, indicating all messages were sent in a single transaction.
**Example Usage:**
test01_HappyLoopPath();
Verifies that sending 5 messages completes successfully and commits once.
public void test02_OnExceptionWithLoop() throws Exception
**Purpose:**
Tests the transactional rollback behavior when an exception occurs during the looped message sending.
**Steps:**
Sends a message with a header
"ThrowExceptionOnIndex"set to 4, indicating the test should throw an exception on the 5th message (index 4).Expects a
CamelExecutionExceptionwrapping aRuntimeExceptionwith a specific failure message.Asserts that:
No messages were sent (
mockProducer.history()is empty).No Kafka transaction commit occurred (
mockProducer.commitCount()is zero).
**Example Usage:**
test02_OnExceptionWithLoop();
Simulates an error on the 5th message causing transaction rollback.
protected RoutesBuilder createRouteBuilder() throws Exception
Defines the Camel route used for testing.
**Route Details:**
Source:
direct:loopRoute Id:
"loop"
**Route Logic:**
Extracts the optional header
"ThrowExceptionOnIndex"and stores it as an exchange variable.Loops
body()times (the input integer).For each iteration:
If the current loop index equals the
ThrowExceptionOnIndexvariable:Throws a
RuntimeExceptionto simulate failure.
Otherwise:
Logs the message send attempt.
Sets the message body to
"test ${exchangeProperty.CamelLoopIndex}".Sends the message to Kafka topic
"loop"with these additional Kafka producer properties:transactional.id=1234(enables Kafka transactions)enable.idempotence=true(ensures exactly-once semantics)retries=5(retry count for producer)
After the loop completes successfully, sends a single message to
mock:doneto indicate completion.
**Kafka Endpoint URI:**
kafka:loop?additional-properties[transactional.id]=1234&additional-properties[enable.idempotence]=true&additional-properties[retries]=5
This configures the Kafka producer to use transactions and idempotent producer settings.
Important Implementation Details and Algorithms
Kafka Transactions in Loop:
The test simulates sending multiple messages to Kafka within a single transaction by configuring
transactional.idon the Kafka producer endpoint. The looped route sends messages consecutively and Kafka's producer transaction semantics ensure atomic commits.MockProducer Usage:
The
MockProducerfrom Kafka client API is used to simulate Kafka producer behavior without network calls. It records sent messages and commit counts, which are asserted in tests.Exception Handling in Loop:
By throwing a
RuntimeExceptionduring an iteration, the route triggers a rollback of the Kafka transaction, preventing partial commits.Exchange Variables and Properties:
ThrowExceptionOnIndexvariable controls when to throw an exception.CamelLoopIndexis a Camel-provided property indicating the current loop iteration index.
Mockito Mocking:
The Kafka client factory is mocked to supply the
MockProducerinstance, isolating the tests from actual Kafka infrastructure.
Interaction with Other Parts of the System
Apache Camel Kafka Component:
The class tests the integration and correct configuration of the Camel Kafka component's transactional producer capabilities.
Camel Routes:
Defines a route directly inside the test class, demonstrating how Kafka transactional producers can be used in Camel routes with loop and error handling constructs.
Kafka Client API:
The tests rely on Kafka's producer API (mocked) to simulate transactional message production and observe commit/rollback behavior.
JUnit 5 & Mockito:
Utilizes JUnit 5 for structured, ordered tests and Mockito for mocking Kafka client factory.
Usage Summary
To run these tests, simply execute the JUnit test class in a Java IDE or build tool supporting JUnit 5. The tests verify that Kafka transactions work correctly in looped message production scenarios and handle exceptions by rolling back transactions.
Visual Diagram
classDiagram
class KafkaProducerMultipleMessagesInTransactionWithLoopTest {
-MockEndpoint doneEndpoint
-MockProducer<String,String> mockProducer
+createCamelContext() CamelContext
+test01_HappyLoopPath() void
+test02_OnExceptionWithLoop() void
+createRouteBuilder() RoutesBuilder
}
class RouteBuilder {
+configure() void
}
KafkaProducerMultipleMessagesInTransactionWithLoopTest ..> RouteBuilder : creates
KafkaProducerMultipleMessagesInTransactionWithLoopTest --> MockProducer : uses
KafkaProducerMultipleMessagesInTransactionWithLoopTest --> MockEndpoint : uses
Summary
This test class provides a thorough verification of Kafka transactional message production within an Apache Camel route loop construct. It shows how to configure Kafka producers for transactions and idempotence, how to handle exceptions to trigger rollbacks, and how to use mocking frameworks for isolated unit testing of Kafka interactions. This is a valuable resource for developers integrating Kafka transactions into Camel-based messaging systems.