KafkaPausableConsumerIT.java

Overview

`KafkaPausableConsumerIT` is an integration test class for validating the behavior of a pausable Kafka consumer within the Apache Camel Kafka component. Its main purpose is to verify that a Kafka consumer, which can be paused and resumed programmatically based on a custom listener's logic, correctly consumes messages from a Kafka topic and handles retry logic with error handling.

This test class extends [BaseKafkaTestSupport](/projects/289/68683) to leverage Kafka test utilities and infrastructure. It sets up a Kafka route that consumes from a topic, applies a pausable consumer listener, processes messages with induced errors to simulate retries, and verifies that only a limited number of messages successfully reach the final endpoint.

Key features tested include:


Class: KafkaPausableConsumerIT

Extends

Constants

Name

Type

Description

`SOURCE_TOPIC`

`String`

Kafka topic name used for producing and consuming test messages (`"pause-source"`).

`LOG`

`Logger`

Logger instance for logging test activity.

`RETRY_COUNT`

`int`

Maximum number of retries used to control pausing logic (set to 10).

`count`

`LongAdder`

Atomic counter tracking the number of processed attempts.

`testConsumerListener`

`TestListener`

Custom listener instance to track consumer lifecycle callbacks.

Fields

Name

Type

Description

`producer`

KafkaProducer

Kafka producer instance used to send test messages.

`executorService`

`ScheduledExecutorService`

Scheduler thread pool to increment the retry count periodically.


Methods

static boolean canContinue()

Checks whether the consumer should continue consuming or pause based on the current retry count.


static int getCount()

Returns the current retry count as an integer.


@BeforeEach void before()

JUnit lifecycle method executed before each test.


@AfterEach void after()

JUnit lifecycle method executed after each test.


void increment()

Increments the atomic retry count by one.


RouteBuilder createRouteBuilder()

Overrides the method from [BaseKafkaTestSupport](/projects/289/68683) to define the test Camel routes.


@Test @Timeout kafkaMessageIsConsumedByCamel()

Main integration test validating message consumption and pausing behavior.


Inner Class: TestListener

Extends [KafkaConsumerListener](/projects/289/68604) to track lifecycle callback invocations.

Used to confirm that lifecycle hooks are triggered during message consumption and processing.


Important Implementation Details


Interaction with Other Components


Usage Example

This class itself is a test and not intended for direct reuse. However, the pattern demonstrated can be adapted to build pausable Kafka consumers with retries and error handling in Camel routes.

Example snippet from `createRouteBuilder()` for a pausable consumer:

from("kafka:" + SOURCE_TOPIC + "?groupId=someGroup&autoOffsetReset=earliest&...")
    .pausable(testConsumerListener, o -> canContinue())
    .process(exchange -> {
        // Your processing logic here
    })
    .to("someEndpoint");

Mermaid Class Diagram

classDiagram
    class KafkaPausableConsumerIT {
        +static final String SOURCE_TOPIC
        -static final Logger LOG
        -static final int RETRY_COUNT
        -static final LongAdder count
        -static final TestListener testConsumerListener
        -KafkaProducer<String,String> producer
        -ScheduledExecutorService executorService
        +static boolean canContinue()
        +static int getCount()
        +void before()
        +void after()
        +void increment()
        +RouteBuilder createRouteBuilder()
        +void kafkaMessageIsConsumedByCamel()
    }

    class TestListener {
        -volatile boolean afterConsumeCalled
        -volatile boolean afterProcessCalled
        +boolean afterConsume(Object ignored)
        +boolean afterProcess(ProcessingResult result)
    }

    KafkaPausableConsumerIT --> TestListener : uses

Summary

`KafkaPausableConsumerIT` is a comprehensive integration test class designed to validate a pausable Kafka consumer within Apache Camel. It effectively simulates retry and pause/resume scenarios, verifies header propagation, and ensures consumer lifecycle callbacks execute correctly. The test infrastructure and coding patterns provide a strong foundation for implementing robust Kafka consumer error handling and flow control in Camel applications.