KafkaProducerMultipleMessagesInTransactionWithSplitTest.java
Overview
`KafkaProducerMultipleMessagesInTransactionWithSplitTest.java` is a JUnit 5 integration test class designed to verify the behavior of sending multiple messages to an Apache Kafka topic within a single transaction using Apache Camel's Kafka component. This test focuses on using the Split Enterprise Integration Pattern (EIP) to split a single incoming message into multiple Kafka messages, ensuring transactional guarantees such as atomicity and idempotence.
The tests simulate sending multiple records in one Camel route transaction, validating both successful commit scenarios and failure scenarios where an exception triggers rollback of the Kafka transaction.
Detailed Breakdown
Package and Imports
Package:
org.apache.camel.component.kafkaKey Dependencies:
Apache Camel core and testing classes (
CamelTestSupport,RouteBuilder,MockEndpoint)Kafka client mocks (
MockProducer)Mockito for mocking Kafka client factory
JUnit 5 for test lifecycle management
Class: KafkaProducerMultipleMessagesInTransactionWithSplitTest
This class extends `CamelTestSupport`, leveraging Camel's testing framework to build and test routes.
Fields
Field Name | Type | Description |
|---|---|---|
`doneEndpoint` | `MockEndpoint` | Mock endpoint to verify route completion. Injected with URI `"mock:done"`. |
`mockProducer` | `MockProducer` | A Kafka client mock simulating the Kafka producer with transaction support enabled. |
Methods
createCamelContext()
@Override
protected CamelContext createCamelContext() throws Exception;
Purpose: Creates and configures the Camel context for the test.
Implementation Details:
Mocks the
KafkaClientFactoryto return themockProducerwhen requested.Configures the Kafka component with dummy broker addresses and enables record metadata.
Replaces the default Kafka client factory with the mocked version.
Returns: Configured
CamelContextinstance.
test01_HappySplitPath()
@Test
public void test01_HappySplitPath() throws Exception;
Purpose: Tests the happy path where multiple messages are split and sent successfully within a transaction.
Functionality:
Constructs a multiline string with 5 messages (e.g.,
test01\n test02\n ...).Sends the entire string to the
direct:splitendpoint.Expects the mock endpoint
mock:doneto receive exactly one message signaling route completion.Asserts that all 5 messages were sent to Kafka and that the transaction was committed exactly once.
Key Assertions:
Number of messages sent to Kafka equals
messageCount(5).Kafka transaction commit count is 1.
test02_OnExceptionWithSplitPath()
@Test
public void test02_OnExceptionWithSplitPath() throws Exception;
Purpose: Tests the behavior when an exception is thrown during one of the split iterations, causing the Kafka transaction to rollback.
Functionality:
Prepares a multiline string with 5 messages.
Sends message to
direct:splitwith a header indicating to throw an exception on the 5th message (ThrowExceptionOnIndex=4).Expects the route to throw a
CamelExecutionExceptionwrapping aCamelExchangeExceptionand ultimately aRuntimeException.Verifies that no messages were sent and no commits occurred on the Kafka producer.
Key Assertions:
Exception is thrown and correctly wrapped.
Kafka producer history is empty (no messages sent).
Commit count is zero (no transaction commit).
createRouteBuilder()
@Override
protected RoutesBuilder createRouteBuilder() throws Exception;
Purpose: Defines the Camel route used in the tests.
Route Pattern:
Source:
direct:splitSplit the incoming message body by newline (
\n) using.split(body().tokenize("\n"))Transactional split with
shareUnitOfWork(true)so that all split messages share the same unit of work/transaction..stopOnException()ensures the split processing stops when an exception is thrown.In each split message:
Checks if the current split index matches the header
ThrowExceptionOnIndexvariable.If matched, throws a
RuntimeExceptionwith message containing the failing index.Otherwise, sends the message to Kafka topic
splitwith transactional properties:transactional.id=45678enable.idempotence=trueretries=5
After processing all splits or on exception, sends a message to the
mock:doneendpoint to signal route completion.
Implementation Notes:
Uses Camel's
.choice()DSL to conditionally throw exceptions.Kafka producer is configured to use transactions and idempotence, crucial for exactly-once semantics.
The route is designed to demonstrate transactional behavior over multiple Kafka messages produced within a single Camel exchange.
Important Implementation Details and Algorithms
Transactional Kafka Producer with Split EIP:
The
.split()EIP is used with.shareUnitOfWork(true)to ensure that transactions span the entire split process.Kafka producer is mocked with
MockProducersupporting transactions to simulate commit/rollback behavior.The route configures Kafka producer properties to enable idempotence and retries, supporting exactly-once delivery semantics.
Exception handling within the split causes the entire transaction to abort, demonstrating rollback scenarios.
Mocking Kafka Client Factory:
The Kafka client factory is mocked to inject the
MockProducerinto the Camel Kafka component, isolating tests from real Kafka brokers.
Integration Testing Pattern:
The tests use Camel's
MockEndpointto assert route behavior.Validates both positive and negative transactional flows.
Interaction with Other System Components
KafkaComponent:
The Camel Kafka component is the bridge between Camel routes and Kafka brokers.
This test replaces the real Kafka producer with a mock to isolate from external dependencies.
Camel Routes:
The tested route starts from a
directendpoint, processes the message via the Split EIP, and then sends to Kafka and a mock endpoint.The file tests how Camel routes interact with Kafka in transactional message production scenarios.
Usage Examples
Sending Multiple Messages in a Transaction
String messages = "msg1\nmsg2\nmsg3\nmsg4\nmsg5\n";
template.sendBody("direct:split", messages);
This sends 5 messages split by newline to Kafka in one transaction.
On success, all 5 messages commit atomically.
Simulating a Failure on a Specific Split Message
String messages = "msg1\nmsg2\nmsg3\nmsg4\nmsg5\n";
int failIndex = 3;
template.sendBodyAndHeader("direct:split", messages, "ThrowExceptionOnIndex", failIndex);
This causes the route to throw a runtime exception on the 4th message (index 3).
The Kafka transaction is rolled back, no messages are committed.
Mermaid Class Diagram
classDiagram
class KafkaProducerMultipleMessagesInTransactionWithSplitTest {
- MockEndpoint doneEndpoint
- MockProducer<String,String> mockProducer
+ createCamelContext() CamelContext
+ test01_HappySplitPath() void
+ test02_OnExceptionWithSplitPath() void
+ createRouteBuilder() RoutesBuilder
}
KafkaProducerMultipleMessagesInTransactionWithSplitTest --|> CamelTestSupport
Summary
`KafkaProducerMultipleMessagesInTransactionWithSplitTest.java` is a focused integration test that validates transactional message production to Kafka using Apache Camel's Split EIP. It demonstrates:
How to configure Kafka producers with transactions and idempotence.
How to use Camel's split with shared unit of work to encapsulate Kafka transactional boundaries.
How to handle exceptions and validate rollback behavior.
Usage of mocking frameworks to isolate Kafka dependencies.
This file is critical for ensuring robust Kafka message production workflows in applications leveraging Apache Camel for integration patterns involving transactions.