KafkaConsumerSyncCommitIT.java


Overview

`KafkaConsumerSyncCommitIT.java` is an integration test class designed to validate the synchronous manual commit functionality of the Apache Camel Kafka component consumer. The test ensures that Kafka consumer offsets can be committed manually in a synchronous manner rather than relying on automatic offset commits. This is critical for applications requiring precise control over message processing and offset management to avoid message loss or duplication.

The class extends from `BaseManualCommitTestSupport`, which presumably provides foundational Kafka setup, teardown, and utility methods tailored for testing manual commit semantics.


Detailed Description

Package

package org.apache.camel.component.kafka.integration.commit;

This class resides in the `integration.commit` package under the Kafka component of Apache Camel, indicating it is part of integration tests focusing on commit behaviors.


Class: KafkaConsumerSyncCommitIT

public class KafkaConsumerSyncCommitIT extends BaseManualCommitTestSupport

Purpose

Constants

public static final String TOPIC = "testManualCommitSyncTest";

Lifecycle Methods

@AfterEach public void after()


Overridden Methods

protected RouteBuilder createRouteBuilder()

Returns a `RouteBuilder` that defines two Kafka consumer routes configured for manual synchronous commits.

**Route Configuration Details:**

**Routes:**

  1. Route "foo":

    • From the Kafka endpoint.

    • Routes messages to a mock endpoint (KafkaTestUtil.MOCK_RESULT) for assertions or verification.

    • Processes each exchange to retrieve the KafkaManualCommit instance from the message header and performs a synchronous commit by invoking manual.commit().

    • Uses a lambda processor that asserts the presence of KafkaManualCommit and calls commit.

  2. Route "bar":

    • Same Kafka endpoint as "foo".

    • Routes messages to another mock endpoint (KafkaTestUtil.MOCK_RESULT_BAR).

    • The route is configured with .autoStartup(false), meaning it will not start automatically with the Camel context. This could be for testing scenarios where the route is started manually or remains inactive during certain tests.


Test Method

@RepeatedTest(1) public void kafkaManualCommit() throws Exception


Important Implementation Details


Interaction with Other Components


Usage Example

This class is a test class and is not intended for direct production use. However, it demonstrates how to configure a Camel Kafka consumer route for manual synchronous commits:

from("kafka:yourTopic?groupId=yourGroup&autoCommitEnable=false&allowManualCommit=true")
    .process(exchange -> {
        KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
        if (manual != null) {
            manual.commit(); // Synchronously commit the offset after processing
        }
    })
    .to("yourOutputEndpoint");

Mermaid Class Diagram

classDiagram
    class KafkaConsumerSyncCommitIT {
        +static final String TOPIC
        +void after()
        +RouteBuilder createRouteBuilder()
        +void kafkaManualCommit()
    }

    KafkaConsumerSyncCommitIT --|> BaseManualCommitTestSupport

Summary

`KafkaConsumerSyncCommitIT.java` is an integration test validating synchronous manual offset commits in Apache Camel Kafka consumers. It defines Kafka consumer routes with manual commit enabled, processes incoming messages, and explicitly commits offsets via the `KafkaManualCommit` interface. The class uses JUnit 5 for testing, and interacts with utility classes for Kafka topic management and mock endpoints. This test ensures that applications leveraging Camel Kafka consumers can safely and reliably manage offsets in a controlled manner, crucial for exactly-once or at-least-once message processing guarantees.