KafkaWithDBTransactionIT.java
Overview
`KafkaWithDBTransactionIT.java` is an integration test class designed to validate the interaction and transactional behavior between Apache Kafka message production and database operations within the Apache Camel framework. It specifically tests scenarios involving Kafka producers that send messages to Kafka topics combined with SQL inserts into an embedded H2 database, with and without transactional guarantees.
This test class verifies different transactional configurations, ensuring message delivery to Kafka and database inserts occur atomically or otherwise, depending on the setup. It helps guarantee data integrity and consistency when working with Kafka and relational databases simultaneously, particularly focusing on rollback behavior in case of failures.
Key features:
Uses embedded H2 database with Spring's
JdbcTemplatefor data persistence.Creates Kafka consumers to validate messages on topics.
Defines Camel routes with various transactional configurations.
Tests scenarios such as no transaction, transactional Kafka producer with DB inserts, and multiple producers with mixed transactional settings.
Demonstrates how rollback affects Kafka message delivery and DB inserts.
Class: KafkaWithDBTransactionIT
Extends
BaseKafkaTestSupport (presumably provides Kafka test infrastructure and utilities)
Purpose
To run integration tests that verify transactional message production to Kafka topics along with database inserts, ensuring consistency and rollback semantics in Apache Camel routes.
Properties
Property | Type | Description |
|---|---|---|
`TOPIC_TX_1` to `TOPIC_TX_5` | `String` | Kafka topic names used in different test routes. |
`INSERT_SQL_1` to `INSERT_SQL_5` | `String` | Parameterized SQL insert statements targeting different tables. |
`stringsConsumerConn` | `KafkaConsumer` | Kafka consumer used to read messages from test topics. |
`db` | `EmbeddedDatabase` | Embedded H2 database instance used for test data persistence. |
`jdbc` | `JdbcTemplate` | Spring's JdbcTemplate for executing SQL queries. |
`context` | `CamelContext` (volatile) | The Camel context used to deploy routes and execute tests. |
Lifecycle Methods
@BeforeAll static void before()
Initializes the Kafka consumer (
stringsConsumerConn) before all tests run.Uses a helper method
createStringKafkaConsumerwith a group ID"KafkaWithDBTransactionIT".
@ContextFixture void configureContext(CamelContext context)
Configures the test context by:
Creating an embedded H2 database instance.
Binding the database and Spring transaction manager into the Camel registry.
Initializing the
JdbcTemplate.Creating five tables (
foo1tofoo5) with unique constraints on thenamecolumn.
This method is invoked to prepare the database and transaction infrastructure before running routes.
@AfterAll static void after()
Cleans up after all tests:
Deletes all test Kafka topics.
Shuts down the embedded database.
Test Methods
1. void noTransactionProducerWithDBLast()
**Scenario:** Sends one message to Kafka and performs one database insert without any transactional guarantees.
**Route:** `direct:startNoTx` → Kafka topic `transaction-1` → SQL insert into `foo1`.
**Behavior:**
Both operations succeed independently.
Verifies one message in Kafka and one record inserted in the database.
**Example Usage:** Send `"foobar"` with header `"word=foobar"` to `direct:startNoTx`.
2. void noTransactionProducerDuplicatedWithDBLast()
**Scenario:** Sends two duplicate messages without transaction; second insert fails due to unique constraint.
**Route:** `direct:startNoTx2` → Kafka topic `transaction-2` → SQL insert into `foo2`.
**Behavior:**
The Kafka topic receives both messages.
The database only contains one inserted row (second insert fails).
No rollback is performed because no transaction is used.
Exception is caught and handled silently.
3. void transactionProducerWithDBLast(String txParam)
**Parameterized Test:** Tests three Kafka transactional configurations:
transacted=truetransactionalId=my-foo1additionalProperties[transactional.id]=my-foo2
**Scenario:** Sends duplicate messages with transaction enabled on the Kafka producer, with the SQL insert last in the route.
**Route:** `direct:startTxDBLast` → Kafka topic `transaction-3` (transactional) → SQL insert into `foo3`.
**Behavior:**
On second duplicate insert failure, the transaction rolls back both Kafka send and DB insert.
Only one message is present in Kafka and one row in DB after tests.
4. void transactionMultipleProducersWithDBLast()
**Scenario:** Tests multiple routes sending to the same Kafka topic with different transactional configurations.
**Routes:**
direct:startTxDBLast2aanddirect:startTxDBLast2b: Kafka producer withtransacted=true+ SQL insert intofoo5.direct:startTxDBLast2c: Kafka producer without transactions + SQL insert intofoo5.
**Behavior:**
Duplicate messages sent to transactional routes rollback on failure.
Non-transactional route does not rollback.
Kafka topic ends up with 4 messages (some duplicates).
Database contains 3 unique rows.
5. void transactionProducerWithDBFirst(String txParam)
**Parameterized Test:** Same parameters as test #3.
**Scenario:** Transactional Kafka producer with SQL insert as the first step in route.
**Route:** `direct:startTxDBFirst` → SQL insert into `foo4` → Kafka topic `transaction-4` (transactional).
**Behavior:**
Duplicate insert causes rollback, preventing Kafka message send.
Only one message and one DB row exist after test.
Helper Methods
private ConsumerRecords<String,String> getMessagesFromTopic(KafkaConsumer<String,String> consumerConn, String topic)
Subscribes the consumer to the given topic.
Polls up to 5 times (100ms each) for messages.
Returns the polled
ConsumerRecords.Unsubscribes the consumer afterward.
private static KafkaConsumer<String,String> createStringKafkaConsumer(final String groupId)
Creates and configures a Kafka consumer with:
Bootstrap servers from
getBootstrapServers()(inherited).String deserializers for key/value.
Auto commit enabled.
Isolation level set to
"read_committed"to support transactional reads.
Returns the configured consumer.
Overridden Method
protected RouteBuilder createRouteBuilder()
Returns a blank route builder.
Actual routes are added dynamically inside test methods.
Important Implementation Details
Embedded H2 database is used to simulate persistent storage with unique constraints to trigger failures.
Kafka transactional producers are tested with different configuration parameters to ensure flexibility.
Exception handling in routes uses Camel's
onExceptionto mark transactions for rollback or handle errors gracefully.The tests verify Kafka and DB consistency by querying the Kafka topic and the database table after operations.
The test class relies on Apache Camel’s SQL component and Kafka component for route definitions.
BaseKafkaTestSupport provides Kafka cluster lifecycle and utility methods (not shown here).
Interaction with Other System Components
Apache Camel:
The primary integration framework used to construct routes combining Kafka and SQL components.Kafka Cluster:
Kafka brokers provide messaging infrastructure. The tests produce/consume messages to/from Kafka topics.Embedded H2 Database:
Simulated relational database for testing transactional behavior.Spring JDBC & Transaction Management:
Enables database operations and transaction management integrated with Camel routes.Kafka Admin Client:
Used to delete test topics after all tests complete.
Example Usage in Tests
ProducerTemplate producer = contextExtension.getProducerTemplate();
String bodyContent = "foobar";
producer.sendBodyAndHeader("direct:startNoTx", bodyContent, "word", bodyContent);
This sends a message `"foobar"` with header `"word=foobar"` to a Camel direct endpoint `startNoTx`, triggering the route to produce a Kafka message and insert into the database.
Mermaid Class Diagram
classDiagram
class KafkaWithDBTransactionIT {
-TOPIC_TX_1: String
-TOPIC_TX_2: String
-TOPIC_TX_3: String
-TOPIC_TX_4: String
-TOPIC_TX_5: String
-INSERT_SQL_1: String
-INSERT_SQL_2: String
-INSERT_SQL_3: String
-INSERT_SQL_4: String
-INSERT_SQL_5: String
-stringsConsumerConn: KafkaConsumer<String,String>
-db: EmbeddedDatabase
-jdbc: JdbcTemplate
-context: CamelContext
+before(): void
+configureContext(CamelContext): void
+after(): void
+noTransactionProducerWithDBLast(): void
+noTransactionProducerDuplicatedWithDBLast(): void
+transactionProducerWithDBLast(String): void
+transactionMultipleProducersWithDBLast(): void
+transactionProducerWithDBFirst(String): void
-getMessagesFromTopic(KafkaConsumer<String,String>, String): ConsumerRecords<String,String>
-createStringKafkaConsumer(String): KafkaConsumer<String,String>
+createRouteBuilder(): RouteBuilder
}
Summary
`KafkaWithDBTransactionIT.java` is a comprehensive integration test designed to validate the transactional integrity and behavior of Kafka producers combined with database operations in Apache Camel. It demonstrates critical scenarios impacting message delivery and data consistency, such as transactional rollbacks and error handling, ensuring robust integration between messaging and persistence layers.