diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 4232eaa4336ea..c8b4decd0e1f8 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -946,7 +946,7 @@ public ShareAcquiredRecords acquire( continue; } - InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, shareGroupDlqEnableSupplier.get()); + InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, isDLQEnabledForGroup()); if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) { log.info("Unable to acquire records for the batch: {} in share partition: {}-{}", inFlightBatch, groupId, topicIdPartition); @@ -1138,7 +1138,7 @@ private Optional releaseAcquiredRecordsForPerOffsetBatch(String membe DeliveryCountOps.NO_OP, this.maxDeliveryCount(), EMPTY_MEMBER_ID, - shareGroupDlqEnableSupplier.get() + isDLQEnabledForGroup() ); if (updateResult == null) { log.debug("Unable to release records from acquired state for the offset: {} in batch: {}" @@ -1182,7 +1182,7 @@ private Optional releaseAcquiredRecordsForCompleteBatch(String member DeliveryCountOps.NO_OP, this.maxDeliveryCount(), EMPTY_MEMBER_ID, - shareGroupDlqEnableSupplier.get() + isDLQEnabledForGroup() ); if (updateResult == null) { log.debug("Unable to release records from acquired state for the batch: {}" @@ -2004,7 +2004,7 @@ private int acquireSubsetBatchRecords( } InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, - maxDeliveryCount, memberId, shareGroupDlqEnableSupplier.get()); + maxDeliveryCount, memberId, isDLQEnabledForGroup()); if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) { log.trace("Unable to acquire records for the offset: {} in batch: {}" + " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch, @@ -2358,7 +2358,7 @@ private Optional acknowledgePerOffsetBatchRecords( DeliveryCountOps.NO_OP, this.maxDeliveryCount(), EMPTY_MEMBER_ID, - shareGroupDlqEnableSupplier.get() + isDLQEnabledForGroup() ); if (updateResult == null) { @@ -2446,7 +2446,7 @@ private Optional acknowledgeCompleteBatch( DeliveryCountOps.NO_OP, this.maxDeliveryCount(), EMPTY_MEMBER_ID, - shareGroupDlqEnableSupplier.get() + isDLQEnabledForGroup() ); if (updateResult == null) { log.debug("Unable to acknowledge records for the batch: {} with state: {}" @@ -3023,7 +3023,7 @@ private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFli DeliveryCountOps.NO_OP, maxDeliveryCount(), EMPTY_MEMBER_ID, - shareGroupDlqEnableSupplier.get()); + isDLQEnabledForGroup()); if (updateResult == null) { log.error("Unable to release acquisition lock on timeout for the batch: {}" + " for the share partition: {}-{} memberId: {}", inFlightBatch, groupId, topicIdPartition, memberId); @@ -3089,7 +3089,7 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl DeliveryCountOps.NO_OP, maxDeliveryCount(), EMPTY_MEMBER_ID, - shareGroupDlqEnableSupplier.get()); + isDLQEnabledForGroup()); if (updateResult == null) { log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {}" + " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch, @@ -3328,12 +3328,16 @@ private boolean isBatchAborted(RecordBatch batch, Set abortedProducerIds) } private RecordState recordStateWithDlq(byte ackType) { - if (shareGroupDlqEnableSupplier.get() && AcknowledgeType.REJECT.id == ackType) { + if (isDLQEnabledForGroup() && AcknowledgeType.REJECT.id == ackType) { return RecordState.ARCHIVING; } return ACK_TYPE_TO_RECORD_STATE.get(ackType); } + private boolean isDLQEnabledForGroup() { + return shareGroupDlqEnableSupplier.get() && configProvider.errorsDLQTopicName(groupId).isPresent(); + } + // Visible for testing. /** diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 570e09115ab22..477d44ac2c6e4 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -59,6 +59,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager; import org.apache.kafka.server.share.dlq.ShareGroupDLQManager; +import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter; import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; @@ -12620,9 +12621,12 @@ public void testDynamicPartitionMaxRecordLocksDecreaseBelowInFlightAffectsMaxRec @Test public void testAcknowledgeRejectWithDlqEnabled() { + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire 2 batches so that the first one stays in cache after being archived. @@ -12654,13 +12658,18 @@ public void testAcknowledgeRejectWithDlqEnabled() { // deliveryCompleteCount is 0 as evicted records are subtracted. assertEquals(0, sharePartition.deliveryCompleteCount()); + + // The rejected batch (offsets 5-9) is enqueued to the DLQ exactly once. + Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any()); } @Test public void testAcknowledgeRejectWithDlqDisabled() { + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> false) + .withShareGroupDlqManager(dlqManager) .build(); MemoryRecords records1 = memoryRecords(5, 5); @@ -12684,13 +12693,99 @@ public void testAcknowledgeRejectWithDlqDisabled() { assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); assertEquals(10, sharePartition.startOffset()); assertEquals(0, sharePartition.deliveryCompleteCount()); + + // DLQ is disabled, so the DLQ is never invoked. + Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any()); + } + + @Test + public void testAcknowledgeRejectWithDlqSupplierEnabledButNoDlqTopicConfigured() { + // The DLQ supplier returns true, but the group has no DLQ topic configured (the default + // ShareGroupConfigProvider returns an empty topic name). DLQ must therefore be treated as + // disabled: REJECT goes directly to ARCHIVED (no ARCHIVING intermediate state) and the DLQ + // is never invoked. + ShareGroupDLQManager dlqManager = mockDlqManager(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withShareGroupDlqEnableSupplier(() -> true) + .withShareGroupDlqManager(dlqManager) + .build(); + + // Acquire 2 batches so that the first one stays in cache after being archived. + MemoryRecords records1 = memoryRecords(5, 5); + MemoryRecords records2 = memoryRecords(10, 5); + List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 5); + assertEquals(1, acquiredRecordsList.size()); + acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 5); + assertEquals(1, acquiredRecordsList.size()); + + // Acknowledge the first batch with REJECT. + CompletableFuture ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.REJECT.id)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + + // Without an effective DLQ, REJECT goes directly to ARCHIVED and the batch at start offset + // is evicted from cache. + assertEquals(1, sharePartition.cachedState().size()); + assertNull(sharePartition.cachedState().get(5L)); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(0, sharePartition.deliveryCompleteCount()); + + // The DLQ must not be invoked since no DLQ topic is configured for the group. + Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any()); + } + + @Test + public void testReleaseAcquiredRecordsMaxDeliveryWithDlqSupplierEnabledButNoDlqTopicConfigured() { + // The DLQ supplier returns true, but the group has no DLQ topic configured (the default + // ShareGroupConfigProvider returns an empty topic name). DLQ must therefore be treated as + // disabled: when the delivery count reaches the max on release, records go directly to + // ARCHIVED (no ARCHIVING intermediate state) and the DLQ is never invoked. + ShareGroupDLQManager dlqManager = mockDlqManager(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records. + .withState(SharePartitionState.ACTIVE) + .withShareGroupDlqEnableSupplier(() -> true) + .withShareGroupDlqManager(dlqManager) + .build(); + + // Leading batch (offsets 0-9) stays acquired so the archived batch remains in cache. + fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); + + MemoryRecords records2 = memoryRecords(10, 5); + // First delivery attempt for offsets 10-14, delivery count becomes 1. + fetchAcquiredRecords(sharePartition, records2, 5); + // Release them back to AVAILABLE. + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.RELEASE.id)))); + // Second delivery attempt, delivery count reaches the max (2). + fetchAcquiredRecords(sharePartition, records2, 5); + + // Release again. Delivery count has reached the max, so the records are archived. + CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); + assertNull(releaseResult.join()); + assertFalse(releaseResult.isCompletedExceptionally()); + + // Without an effective DLQ, the records go directly to ARCHIVED (not ARCHIVING). + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(10L).batchState()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + + // The DLQ must not be invoked since no DLQ topic is configured for the group. + Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any()); } @Test public void testAcknowledgePerOffsetRejectWithDlqEnabled() { + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire a batch with 5 records (offsets 0-4) and a second batch to keep cache populated. @@ -12718,13 +12813,18 @@ public void testAcknowledgePerOffsetRejectWithDlqEnabled() { assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); assertEquals(5, sharePartition.startOffset()); assertEquals(0, sharePartition.deliveryCompleteCount()); + + // Offsets 3 and 4 are rejected per-offset, so each is enqueued to the DLQ separately. + Mockito.verify(dlqManager, Mockito.times(2)).enqueue(Mockito.any()); } @Test public void testAcknowledgePerOffsetRejectWithDlqDisabled() { + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> false) + .withShareGroupDlqManager(dlqManager) .build(); MemoryRecords records1 = memoryRecords(5); @@ -12750,15 +12850,21 @@ public void testAcknowledgePerOffsetRejectWithDlqDisabled() { assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); assertEquals(5, sharePartition.startOffset()); assertEquals(0, sharePartition.deliveryCompleteCount()); + + // DLQ is disabled, so the DLQ is never invoked. + Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqEnabledCompleteBatch() throws InterruptedException { + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire two batches so the first stays in cache after being archived. @@ -12803,15 +12909,21 @@ public void testAcquisitionLockTimeoutWithDlqEnabledCompleteBatch() throws Inter assertEquals(0, sharePartition.deliveryCompleteCount()); // Second batch remains AVAILABLE. assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).batchState()); + + // The archived complete batch (offsets 0-9) is enqueued to the DLQ exactly once. + Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqEnabledPerOffsetBatch() throws InterruptedException { + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire a batch of 10 records (offsets 0-9). @@ -12866,16 +12978,21 @@ public void testAcquisitionLockTimeoutWithDlqEnabledPerOffsetBatch() throws Inte // Offsets 0-4 are ARCHIVED, 5-9 are AVAILABLE. Next fetch offset moves to 5 // since offsets 0-4 are no longer fetchable. assertEquals(5, sharePartition.nextFetchOffset()); + + // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ separately. + Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqDisabledCompleteBatch() throws InterruptedException { // Verify that without DLQ, max delivery count still causes ARCHIVED (not ARCHIVING). + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> false) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire two batches. @@ -12905,16 +13022,22 @@ public void testAcquisitionLockTimeoutWithDlqDisabledCompleteBatch() throws Inte // Batch evicted, start offset advances. assertEquals(10, sharePartition.startOffset()); assertEquals(0, sharePartition.deliveryCompleteCount()); + + // DLQ is disabled, so the DLQ is never invoked. + Mockito.verify(dlqManager, Mockito.never()).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqEnabledMixedOffsets() throws InterruptedException { // Test where some offsets in a batch exceed max delivery count and some don't. + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire batch of 10 records (offsets 0-9). @@ -12953,12 +13076,16 @@ public void testAcquisitionLockTimeoutWithDlqEnabledMixedOffsets() throws Interr DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> "Batch at offset 0 was not evicted after DLQ archival. Timer size: " + sharePartition.timer().size() + ", cachedState keys: " + sharePartition.cachedState().keySet()); + + // Offsets 5-9 are archived per-offset, so each is enqueued to the DLQ separately. + Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() throws InterruptedException { // Phase 1 persist of ARCHIVING fails, but phase 2 still proceeds unconditionally // because timeout path uses tryUpdateState (no rollback). + ShareGroupDLQManager dlqManager = mockDlqManager(); Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); @@ -12975,6 +13102,8 @@ public void testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire two batches. @@ -13012,11 +13141,15 @@ public void testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() assertEquals(0, sharePartition.deliveryCompleteCount()); // Second batch went to AVAILABLE (delivery count 1 < max). assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).batchState()); + + // The archived complete batch (offsets 0-9) is enqueued to the DLQ exactly once. + Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() throws InterruptedException { // Phase 1 persist of ARCHIVING fails for per-offset batch, but phase 2 still proceeds. + ShareGroupDLQManager dlqManager = mockDlqManager(); Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); @@ -13033,6 +13166,8 @@ public void testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire batch of 10 records (offsets 0-9). @@ -13081,12 +13216,16 @@ public void testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() // Despite both persists failing, offsets 0-4 reached ARCHIVED in memory (no rollback for timeout). // Offsets 5-9 remain AVAILABLE. assertEquals(5, sharePartition.nextFetchOffset()); + + // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ separately. + Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBatch() throws InterruptedException { // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED) succeeds. // Records should reach ARCHIVED despite phase 1 failure. + ShareGroupDLQManager dlqManager = mockDlqManager(); Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); @@ -13111,6 +13250,8 @@ public void testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBa .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire two batches. @@ -13151,11 +13292,15 @@ public void testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBa assertEquals(10, sharePartition.startOffset()); assertEquals(0, sharePartition.deliveryCompleteCount()); assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).batchState()); + + // The archived complete batch (offsets 0-9) is enqueued to the DLQ exactly once. + Mockito.verify(dlqManager, Mockito.times(1)).enqueue(Mockito.any()); } @Test public void testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetBatch() throws InterruptedException { // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED) succeeds for per-offset batch. + ShareGroupDLQManager dlqManager = mockDlqManager(); Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); @@ -13180,6 +13325,8 @@ public void testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetB .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withConfigProvider(configProviderWithDlqTopic()) + .withShareGroupDlqManager(dlqManager) .build(); // Acquire batch of 10 records (offsets 0-9). @@ -13232,6 +13379,9 @@ public void testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetB // Phase 1 failed but phase 2 succeeded — offsets 0-4 reached ARCHIVED. // Offsets 5-9 remain AVAILABLE. assertEquals(5, sharePartition.nextFetchOffset()); + + // Offsets 0-4 are archived per-offset, so each is enqueued to the DLQ separately. + Mockito.verify(dlqManager, Mockito.times(5)).enqueue(Mockito.any()); } // Unit tests for processDlqPhase2 method directly. @@ -13263,10 +13413,12 @@ public void testInitiateDLQAndArchive(String name, boolean persistSucceeds, PartitionFactory.newPartitionErrorData(0, error.code(), error.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeResult)); + ShareGroupDLQManager dlqManager = mockDlqManager(); SharePartition sharePartition = SharePartitionBuilder.builder() .withPersister(persister) .withState(SharePartitionState.ACTIVE) .withShareGroupDlqEnableSupplier(() -> true) + .withShareGroupDlqManager(dlqManager) .build(); InFlightState state = new InFlightState(RecordState.ARCHIVING, deliveryCount, EMPTY_MEMBER_ID); @@ -13276,6 +13428,16 @@ public void testInitiateDLQAndArchive(String name, boolean persistSucceeds, assertEquals(expectedState, state.state()); assertFalse(state.hasOngoingStateTransition()); + // Verify the records were enqueued to the DLQ exactly once with the expected parameters. + ArgumentCaptor dlqCaptor = + ArgumentCaptor.forClass(ShareGroupDLQRecordParameter.class); + Mockito.verify(dlqManager, Mockito.times(1)).enqueue(dlqCaptor.capture()); + ShareGroupDLQRecordParameter dlqParam = dlqCaptor.getValue(); + assertEquals(firstOffset, dlqParam.firstOffset()); + assertEquals(lastOffset, dlqParam.lastOffset()); + assertEquals(Optional.of(deliveryCount), dlqParam.deliveryCount()); + assertEquals(Optional.ofNullable(dlqCause), dlqParam.cause()); + // Verify persister.writeState was called exactly once with the correct state batch. ArgumentCaptor captor = ArgumentCaptor.forClass(WriteShareGroupStateParameters.class); @@ -13301,6 +13463,20 @@ public void testInitiateDLQAndArchive(String name, boolean persistSucceeds, Mockito.verify(persister, Mockito.never()).readState(Mockito.any()); } + private static ShareGroupDLQManager mockDlqManager() { + ShareGroupDLQManager dlqManager = Mockito.mock(ShareGroupDLQManager.class); + Mockito.when(dlqManager.enqueue(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null)); + return dlqManager; + } + + private static ShareGroupConfigProvider configProviderWithDlqTopic() { + GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + GroupConfig groupConfig = Mockito.mock(GroupConfig.class); + Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig)); + Mockito.when(groupConfig.errorsDLQTopicName()).thenReturn("test-dlq-topic"); + return new ShareGroupConfigProvider(groupConfigManager); + } + private static ShareGroupConfigProvider configProviderWithRenewDisabled() { ShareGroupConfigProvider configProvider = Mockito.mock(ShareGroupConfigProvider.class); Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java index 868236f9c632c..ef00633dbb72b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java @@ -20,6 +20,8 @@ import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; +import java.util.Optional; + /** * A provider that retrieves share group dynamic configuration values, * falling back to default values when group-specific configurations are not present. @@ -98,4 +100,17 @@ public ShareGroupAutoOffsetResetStrategy autoOffsetReset(String groupId) { .flatMap(GroupConfig::shareAutoOffsetReset) .orElseGet(GroupConfig::defaultShareAutoOffsetReset); } + + /** + * The method is used to get the name of the configured DLQ topic on the share group. If the group config + * is present, then the value from the group config is used. Otherwise, empty optional is returned. + * + * @param groupId The group id for which the DLQ topic name is to be fetched. + * @return Optional representing DLQ topic name for the share group, empty if not found. + */ + public Optional errorsDLQTopicName(String groupId) { + return manager.groupConfig(groupId) + .map(GroupConfig::errorsDLQTopicName) + .filter(val -> !val.isEmpty()); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java index a426b51554b8a..ba4905f63f394 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java @@ -132,4 +132,27 @@ void testAutoOffsetResetWithoutGroupConfig() { assertEquals(GroupConfig.defaultShareAutoOffsetReset(), provider.autoOffsetReset("test-group")); } + + @Test + void testShareGroupDLQTopicWithGroupConfig() { + String shareGroupDLQTopicName = "dlq.testGroupDLQTopic"; + String shareGroupId = "test-group"; + GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); + GroupConfig groupConfig = mock(GroupConfig.class); + when(groupConfig.errorsDLQTopicName()).thenReturn(shareGroupDLQTopicName); + when(groupConfigManager.groupConfig(shareGroupId)).thenReturn(Optional.of(groupConfig)); + provider = new ShareGroupConfigProvider(groupConfigManager); + + assertEquals(Optional.of(shareGroupDLQTopicName), provider.errorsDLQTopicName(shareGroupId)); + } + + @Test + void testShareGroupDLQTopicWithoutGroupConfig() { + String shareGroupId = "test-group"; + GroupConfigManager groupConfigManager = mock(GroupConfigManager.class); + when(groupConfigManager.groupConfig(shareGroupId)).thenReturn(Optional.empty()); + provider = new ShareGroupConfigProvider(groupConfigManager); + + assertEquals(Optional.empty(), provider.errorsDLQTopicName(shareGroupId)); + } }