KafkaWithDBTransactionIT.java

Overview

`KafkaWithDBTransactionIT.java` is an integration test class designed to validate the interaction and transactional behavior between Apache Kafka message production and database operations within the Apache Camel framework. It specifically tests scenarios involving Kafka producers that send messages to Kafka topics combined with SQL inserts into an embedded H2 database, with and without transactional guarantees.

This test class verifies different transactional configurations, ensuring message delivery to Kafka and database inserts occur atomically or otherwise, depending on the setup. It helps guarantee data integrity and consistency when working with Kafka and relational databases simultaneously, particularly focusing on rollback behavior in case of failures.

Key features:


Class: KafkaWithDBTransactionIT

Extends

Purpose

To run integration tests that verify transactional message production to Kafka topics along with database inserts, ensuring consistency and rollback semantics in Apache Camel routes.

Properties

Property

Type

Description

`TOPIC_TX_1` to `TOPIC_TX_5`

`String`

Kafka topic names used in different test routes.

`INSERT_SQL_1` to `INSERT_SQL_5`

`String`

Parameterized SQL insert statements targeting different tables.

`stringsConsumerConn`

`KafkaConsumer`

Kafka consumer used to read messages from test topics.

`db`

`EmbeddedDatabase`

Embedded H2 database instance used for test data persistence.

`jdbc`

`JdbcTemplate`

Spring's JdbcTemplate for executing SQL queries.

`context`

`CamelContext` (volatile)

The Camel context used to deploy routes and execute tests.


Lifecycle Methods

@BeforeAll static void before()

@ContextFixture void configureContext(CamelContext context)

@AfterAll static void after()


Test Methods

1. void noTransactionProducerWithDBLast()

**Scenario:** Sends one message to Kafka and performs one database insert without any transactional guarantees.

**Route:** `direct:startNoTx` → Kafka topic `transaction-1` → SQL insert into `foo1`.

**Behavior:**

**Example Usage:** Send `"foobar"` with header `"word=foobar"` to `direct:startNoTx`.


2. void noTransactionProducerDuplicatedWithDBLast()

**Scenario:** Sends two duplicate messages without transaction; second insert fails due to unique constraint.

**Route:** `direct:startNoTx2` → Kafka topic `transaction-2` → SQL insert into `foo2`.

**Behavior:**


3. void transactionProducerWithDBLast(String txParam)

**Parameterized Test:** Tests three Kafka transactional configurations:

**Scenario:** Sends duplicate messages with transaction enabled on the Kafka producer, with the SQL insert last in the route.

**Route:** `direct:startTxDBLast` → Kafka topic `transaction-3` (transactional) → SQL insert into `foo3`.

**Behavior:**


4. void transactionMultipleProducersWithDBLast()

**Scenario:** Tests multiple routes sending to the same Kafka topic with different transactional configurations.

**Routes:**

**Behavior:**


5. void transactionProducerWithDBFirst(String txParam)

**Parameterized Test:** Same parameters as test #3.

**Scenario:** Transactional Kafka producer with SQL insert as the first step in route.

**Route:** `direct:startTxDBFirst` → SQL insert into `foo4` → Kafka topic `transaction-4` (transactional).

**Behavior:**


Helper Methods

private ConsumerRecords<String,String> getMessagesFromTopic(KafkaConsumer<String,String> consumerConn, String topic)

private static KafkaConsumer<String,String> createStringKafkaConsumer(final String groupId)


Overridden Method

protected RouteBuilder createRouteBuilder()


Important Implementation Details


Interaction with Other System Components


Example Usage in Tests

ProducerTemplate producer = contextExtension.getProducerTemplate();
String bodyContent = "foobar";
producer.sendBodyAndHeader("direct:startNoTx", bodyContent, "word", bodyContent);

This sends a message `"foobar"` with header `"word=foobar"` to a Camel direct endpoint `startNoTx`, triggering the route to produce a Kafka message and insert into the database.


Mermaid Class Diagram

classDiagram
    class KafkaWithDBTransactionIT {
        -TOPIC_TX_1: String
        -TOPIC_TX_2: String
        -TOPIC_TX_3: String
        -TOPIC_TX_4: String
        -TOPIC_TX_5: String
        -INSERT_SQL_1: String
        -INSERT_SQL_2: String
        -INSERT_SQL_3: String
        -INSERT_SQL_4: String
        -INSERT_SQL_5: String
        -stringsConsumerConn: KafkaConsumer<String,String>
        -db: EmbeddedDatabase
        -jdbc: JdbcTemplate
        -context: CamelContext
        +before(): void
        +configureContext(CamelContext): void
        +after(): void
        +noTransactionProducerWithDBLast(): void
        +noTransactionProducerDuplicatedWithDBLast(): void
        +transactionProducerWithDBLast(String): void
        +transactionMultipleProducersWithDBLast(): void
        +transactionProducerWithDBFirst(String): void
        -getMessagesFromTopic(KafkaConsumer<String,String>, String): ConsumerRecords<String,String>
        -createStringKafkaConsumer(String): KafkaConsumer<String,String>
        +createRouteBuilder(): RouteBuilder
    }

Summary

`KafkaWithDBTransactionIT.java` is a comprehensive integration test designed to validate the transactional integrity and behavior of Kafka producers combined with database operations in Apache Camel. It demonstrates critical scenarios impacting message delivery and data consistency, such as transactional rollbacks and error handling, ensuring robust integration between messaging and persistence layers.