Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Write the original consumed record into a dead letter queue. The dead letter queue is a Kafka topic located
Expand Down Expand Up @@ -79,15 +81,22 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr
SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,
ErrorHandlingMetrics errorHandlingMetrics) {
String topic = sinkConfig.dlqTopicName();
long timeoutMs = TimeUnit.SECONDS.toMillis(30);

try (Admin admin = Admin.create(adminProps)) {
if (!admin.listTopics().names().get().contains(topic)) {
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
admin.createTopics(Set.of(schemaTopicRequest)).all().get();
if (!admin.listTopics().names().get(timeoutMs, TimeUnit.MILLISECONDS).contains(topic)) {
log.info("Topic {} does not exist; creating it with {} partitions and replication factor {}",
topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS,
sinkConfig.dlqTopicReplicationFactor());
admin.createTopics(Set.of(schemaTopicRequest)).all().get(timeoutMs, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
} catch (TimeoutException e) {
throw new ConnectException(
"Timed out waiting for DLQ topic '" + topic + "' to be verified or created", e);
} catch (ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,50 @@
*/
package org.apache.kafka.connect.runtime.errors;

import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_OFFSET;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_PARTITION;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectMetrics;
Expand All @@ -31,43 +70,17 @@
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_OFFSET;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_PARTITION;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class ErrorReporterTest {
Expand Down Expand Up @@ -101,6 +114,107 @@ public void tearDown() {
}
}

@Test
@SuppressWarnings("unchecked")
void createAndSetupTimesOutWhenBrokerUnreachable() throws Exception {
Admin admin = mock(Admin.class);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
KafkaFuture<Set<String>> namesFuture = mock(KafkaFuture.class);

when(admin.listTopics()).thenReturn(listTopicsResult);
when(listTopicsResult.names()).thenReturn(namesFuture);
when(namesFuture.get(anyLong(), any())).thenThrow(new TimeoutException("simulated unreachable broker"));

SinkConnectorConfig sinkConfig = config(Map.of(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));
Map<String, Object> adminProps = Map.of();
Map<String, Object> producerProps = Map.of();

try (MockedStatic<Admin> adminStatic = mockStatic(Admin.class)) {
adminStatic.when(() -> Admin.create(anyMap())).thenReturn(admin);

ConnectException e = assertThrows(ConnectException.class, () ->
DeadLetterQueueReporter.createAndSetup(adminProps, TASK_ID, sinkConfig, producerProps, errorHandlingMetrics));
assertTrue(e.getMessage().contains("Timed out waiting for DLQ topic"));
}

verify(admin).close();
}

@Test
@SuppressWarnings("unchecked")
void createAndSetupRestoresInterruptOnInterruptedException() throws Exception {
Admin admin = mock(Admin.class);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
KafkaFuture<Set<String>> namesFuture = mock(KafkaFuture.class);

when(admin.listTopics()).thenReturn(listTopicsResult);
when(listTopicsResult.names()).thenReturn(namesFuture);
when(namesFuture.get(anyLong(), any())).thenThrow(new InterruptedException("simulated interrupt"));

SinkConnectorConfig sinkConfig = config(Map.of(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));

AtomicBoolean interruptRestored = new AtomicBoolean(false);

Thread thread = getThread(admin, sinkConfig, interruptRestored);
thread.join();

assertTrue(interruptRestored.get(),
"interrupt flag should be restored before re-throwing as ConnectException");
}

private Thread getThread(Admin admin, SinkConnectorConfig sinkConfig, AtomicBoolean interruptRestored) {
Map<String, Object> adminProps = Map.of();
Map<String, Object> producerProps = Map.of();
Thread thread = new Thread(() -> {
try (MockedStatic<Admin> adminStatic = mockStatic(Admin.class)) {
adminStatic.when(() -> Admin.create(anyMap())).thenReturn(admin);
assertThrows(ConnectException.class, () ->
DeadLetterQueueReporter.createAndSetup(adminProps, TASK_ID, sinkConfig, producerProps, errorHandlingMetrics));
interruptRestored.set(Thread.currentThread().isInterrupted());
}
});

thread.start();
return thread;
}

@Test
@SuppressWarnings({"unchecked", "rawtypes"})
void firstTimeDlqTopicCreationLogsAtInfoNotError() throws Exception {
Admin admin = mock(Admin.class);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
KafkaFuture<Set<String>> namesFuture = mock(KafkaFuture.class);
CreateTopicsResult createTopicsResult = mock(CreateTopicsResult.class);
KafkaFuture<Void> createFuture = mock(KafkaFuture.class);

when(admin.listTopics()).thenReturn(listTopicsResult);
when(listTopicsResult.names()).thenReturn(namesFuture);

when(namesFuture.get(anyLong(), any())).thenReturn(Set.of());
when(admin.createTopics(any())).thenReturn(createTopicsResult);
when(createTopicsResult.all()).thenReturn(createFuture);
when(createFuture.get(anyLong(), any())).thenReturn(null);

SinkConnectorConfig sinkConfig = config(Map.of(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));

try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(DeadLetterQueueReporter.class);
MockedStatic<Admin> adminStatic = mockStatic(Admin.class);
MockedConstruction<KafkaProducer> ignored = mockConstruction(KafkaProducer.class)) {

adminStatic.when(() -> Admin.create(anyMap())).thenReturn(admin);

DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(
Map.of(), TASK_ID, sinkConfig, Map.of(), errorHandlingMetrics);
assertNotNull(reporter);

assertTrue(logCaptureAppender.getMessages("ERROR").isEmpty(),
"first-time DLQ topic creation should not log at ERROR");
assertTrue(logCaptureAppender.getMessages("INFO").stream()
.anyMatch(msg -> msg.contains(DLQ_TOPIC) && msg.contains("does not exist")),
"first-time DLQ topic creation should log at INFO");
}
}

@Test
public void initializeDLQWithNullMetrics() {
assertThrows(NullPointerException.class, () -> new DeadLetterQueueReporter(producer, config(Map.of()), TASK_ID, null));
Expand Down