KafkaPausableConsumerIT.java
Overview
`KafkaPausableConsumerIT` is an integration test class for validating the behavior of a pausable Kafka consumer within the Apache Camel Kafka component. Its main purpose is to verify that a Kafka consumer, which can be paused and resumed programmatically based on a custom listener's logic, correctly consumes messages from a Kafka topic and handles retry logic with error handling.
This test class extends [BaseKafkaTestSupport](/projects/289/68683) to leverage Kafka test utilities and infrastructure. It sets up a Kafka route that consumes from a topic, applies a pausable consumer listener, processes messages with induced errors to simulate retries, and verifies that only a limited number of messages successfully reach the final endpoint.
Key features tested include:
Pausing and resuming Kafka consumer based on retry counts.
Propagation and filtering of Kafka message headers.
Integration of a custom consumer listener to track lifecycle events.
Automatic committing of offsets with retries and error handling.
Use of Camel's testing framework and Awaitility to wait for asynchronous conditions.
Class: KafkaPausableConsumerIT
Extends
Constants
Name | Type | Description |
|---|---|---|
`SOURCE_TOPIC` | `String` | Kafka topic name used for producing and consuming test messages (`"pause-source"`). |
`LOG` | `Logger` | Logger instance for logging test activity. |
`RETRY_COUNT` | `int` | Maximum number of retries used to control pausing logic (set to 10). |
`count` | `LongAdder` | Atomic counter tracking the number of processed attempts. |
`testConsumerListener` | `TestListener` | Custom listener instance to track consumer lifecycle callbacks. |
Fields
Name | Type | Description |
|---|---|---|
`producer` | Kafka producer instance used to send test messages. | |
`executorService` | `ScheduledExecutorService` | Scheduler thread pool to increment the retry count periodically. |
Methods
static boolean canContinue()
Checks whether the consumer should continue consuming or pause based on the current retry count.
Returns:
trueif the first message or after the retry count is reached; otherwisefalse.Usage:
Used as a predicate in the.pausable()method to control consumer pause/resume.
static int getCount()
Returns the current retry count as an integer.
Returns:
Current value ofcount.
@BeforeEach void before()
JUnit lifecycle method executed before each test.
Initializes the Kafka producer with default properties.
Clears any captured records in the MockConsumerInterceptor.
Starts a scheduled task that increments the retry count every second after an initial 5-second delay.
@AfterEach void after()
JUnit lifecycle method executed after each test.
Closes the Kafka producer.
Deletes the test topic (
SOURCE_TOPIC) from Kafka to clean up.Shuts down the scheduled executor service.
void increment()
Increments the atomic retry count by one.
Invoked periodically by the scheduled executor to simulate retry attempts.
RouteBuilder createRouteBuilder()
Overrides the method from [BaseKafkaTestSupport](/projects/289/68683) to define the test Camel routes.
Defines two routes:
Kafka Consumer Route:
Consumes messages from
SOURCE_TOPICwith consumer group"KafkaPausableConsumerIT".Uses String deserializers for key and value.
Enables auto commit of offsets every 1000ms.
Attaches MockConsumerInterceptor for testing.
Applies
.pausable(testConsumerListener, o -> canContinue())to pause/resume consumption based on retry count.Logs received messages.
Forwards messages to
"direct:intermediate"endpoint.
Intermediate Processing Route:
Receives messages from
"direct:intermediate".Logs message content.
Throws a
RuntimeCamelExceptionif the retry count is less than or equal toRETRY_COUNTto simulate processing failure.Sends processed messages to a mock result endpoint (
KafkaTestUtil.MOCK_RESULT).
@Test @Timeout kafkaMessageIsConsumedByCamel()
Main integration test validating message consumption and pausing behavior.
Sends 5 distinct messages to the Kafka topic, each with custom headers:
"PropagatedCustomHeader"which should be propagated."CamelSkippedHeader"which should be skipped by the consumer.
Sets expectations on the mock endpoint:
Exactly 5 messages should be received.
Messages bodies should match the sent messages.
The
LAST_RECORD_BEFORE_COMMITheader should not be present (due to auto commit).The propagated header should be present; the skipped header should not.
Waits (with timeout) for the retry count to exceed 10, ensuring retries occur.
Ensures that the custom
TestListenermethodsafterConsume()andafterProcess()are called.Verifies the mock endpoint's assertions and header filtering.
Inner Class: TestListener
Extends [KafkaConsumerListener](/projects/289/68604) to track lifecycle callback invocations.
Fields:
volatile boolean afterConsumeCalled- Flag set whenafterConsumeis called.volatile boolean afterProcessCalled- Flag set whenafterProcessis called.
Overrides:
afterConsume(Object ignored): SetsafterConsumeCalled = trueand calls superclass method.afterProcess(ProcessingResult result): SetsafterProcessCalled = trueand calls superclass method.
Used to confirm that lifecycle hooks are triggered during message consumption and processing.
Important Implementation Details
Pausable Consumer:
The.pausable()DSL method is used to control consumption flow. It uses theTestListenerand a lambda predicateo -> canContinue()to determine when to pause or resume the Kafka consumer. The consumer pauses when retry count is between 2 and 9, simulating backpressure/error retry scenarios.Retry Simulation:
The intermediate route throws an exception on processing while the retry count is below or equal to the limit, causing the consumer to retry consuming the same messages multiple times.Header Propagation:
Headers are selectively propagated to the Camel exchange. The test verifies that only certain headers (likePropagatedCustomHeader) pass through, while others (CamelSkippedHeader) are skipped.Scheduled Increment:
The retry count is incremented every second by a scheduled task, controlling the pausing logic dynamically during the test.Awaitility Usage:
The test uses Awaitility to wait for asynchronous conditions (retry count, listener callbacks) with timeouts, ensuring test reliability.
Interaction with Other Components
Kafka Cluster:
Interacts with a Kafka broker (likely a test container or embedded Kafka in BaseKafkaTestSupport).Apache Camel:
Uses Camel Kafka component to define Kafka consumer routes.MockConsumerInterceptor:
Attached to Kafka consumer for intercepting and testing record consumption.KafkaTestUtil:
Provides utilities such as Kafka client properties, admin client for topic management, and constants like mock endpoint URIs.JUnit 5 & Awaitility:
Used for test lifecycle management and waiting on asynchronous events.SLF4J Logger:
Logs key events during test execution.
Usage Example
This class itself is a test and not intended for direct reuse. However, the pattern demonstrated can be adapted to build pausable Kafka consumers with retries and error handling in Camel routes.
Example snippet from `createRouteBuilder()` for a pausable consumer:
from("kafka:" + SOURCE_TOPIC + "?groupId=someGroup&autoOffsetReset=earliest&...")
.pausable(testConsumerListener, o -> canContinue())
.process(exchange -> {
// Your processing logic here
})
.to("someEndpoint");
Mermaid Class Diagram
classDiagram
class KafkaPausableConsumerIT {
+static final String SOURCE_TOPIC
-static final Logger LOG
-static final int RETRY_COUNT
-static final LongAdder count
-static final TestListener testConsumerListener
-KafkaProducer<String,String> producer
-ScheduledExecutorService executorService
+static boolean canContinue()
+static int getCount()
+void before()
+void after()
+void increment()
+RouteBuilder createRouteBuilder()
+void kafkaMessageIsConsumedByCamel()
}
class TestListener {
-volatile boolean afterConsumeCalled
-volatile boolean afterProcessCalled
+boolean afterConsume(Object ignored)
+boolean afterProcess(ProcessingResult result)
}
KafkaPausableConsumerIT --> TestListener : uses
Summary
`KafkaPausableConsumerIT` is a comprehensive integration test class designed to validate a pausable Kafka consumer within Apache Camel. It effectively simulates retry and pause/resume scenarios, verifies header propagation, and ensures consumer lifecycle callbacks execute correctly. The test infrastructure and coding patterns provide a strong foundation for implementing robust Kafka consumer error handling and flow control in Camel applications.