Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ public ShareAcquiredRecords acquire(
continue;
}

InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, shareGroupDlqEnableSupplier.get());
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, isDLQEnabledForGroup());
if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
log.info("Unable to acquire records for the batch: {} in share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
Expand Down Expand Up @@ -1138,7 +1138,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
shareGroupDlqEnableSupplier.get()
isDLQEnabledForGroup()
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state for the offset: {} in batch: {}"
Expand Down Expand Up @@ -1182,7 +1182,7 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
shareGroupDlqEnableSupplier.get()
isDLQEnabledForGroup()
);
if (updateResult == null) {
log.debug("Unable to release records from acquired state for the batch: {}"
Expand Down Expand Up @@ -2004,7 +2004,7 @@ private int acquireSubsetBatchRecords(
}

InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE,
maxDeliveryCount, memberId, shareGroupDlqEnableSupplier.get());
maxDeliveryCount, memberId, isDLQEnabledForGroup());
if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch,
Expand Down Expand Up @@ -2358,7 +2358,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
shareGroupDlqEnableSupplier.get()
isDLQEnabledForGroup()
);

if (updateResult == null) {
Expand Down Expand Up @@ -2446,7 +2446,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
DeliveryCountOps.NO_OP,
this.maxDeliveryCount(),
EMPTY_MEMBER_ID,
shareGroupDlqEnableSupplier.get()
isDLQEnabledForGroup()
);
if (updateResult == null) {
log.debug("Unable to acknowledge records for the batch: {} with state: {}"
Expand Down Expand Up @@ -3023,7 +3023,7 @@ private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFli
DeliveryCountOps.NO_OP,
maxDeliveryCount(),
EMPTY_MEMBER_ID,
shareGroupDlqEnableSupplier.get());
isDLQEnabledForGroup());
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for the batch: {}"
+ " for the share partition: {}-{} memberId: {}", inFlightBatch, groupId, topicIdPartition, memberId);
Expand Down Expand Up @@ -3089,7 +3089,7 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
DeliveryCountOps.NO_OP,
maxDeliveryCount(),
EMPTY_MEMBER_ID,
shareGroupDlqEnableSupplier.get());
isDLQEnabledForGroup());
if (updateResult == null) {
log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {}"
+ " for the share partition: {}-{} memberId: {}", offsetState.getKey(), inFlightBatch,
Expand Down Expand Up @@ -3328,12 +3328,16 @@ private boolean isBatchAborted(RecordBatch batch, Set<Long> abortedProducerIds)
}

private RecordState recordStateWithDlq(byte ackType) {
if (shareGroupDlqEnableSupplier.get() && AcknowledgeType.REJECT.id == ackType) {
if (isDLQEnabledForGroup() && AcknowledgeType.REJECT.id == ackType) {
return RecordState.ARCHIVING;
}
return ACK_TYPE_TO_RECORD_STATE.get(ackType);
}

private boolean isDLQEnabledForGroup() {
return shareGroupDlqEnableSupplier.get() && !configProvider.errorsDLQTopicName(groupId).isEmpty();
}

// Visible for testing.

/**
Expand Down
Loading
Loading