diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java index 5b50f6e905e76..b1fbc4334d30e 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java @@ -33,8 +33,28 @@ public interface PartitionWriter { /** * Listener allowing to listen to high watermark changes. This is meant * to be used in conjunction with {@link PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords, short)}. + *
+ * A registered listener observes a single leadership tenure of the partition. It is + * delivered high watermark updates only while this broker is the leader of the + * partition. Once the partition is no longer led by this broker (it transitions to + * follower, is deleted, or fails), the listener is permanently retired: no further + * updates are delivered to it, even if the broker later regains leadership. A new + * listener must be registered to observe a subsequent tenure. + *
+ * Retiring the listener is required because, after a leadership change, the local log + * can be truncated and re-replicated from the new leader, so a high watermark observed + * afterwards may advance over records that this broker never wrote. This guarantees + * that every delivered update advances only over records that this broker wrote as + * leader. */ interface Listener { + /** + * Called when the high watermark of the partition advances. Only invoked while + * this broker is the leader of the partition (see {@link Listener}). + * + * @param tp The topic partition. + * @param offset The new high watermark. + */ void onHighWatermarkUpdated( TopicPartition tp, long offset @@ -43,6 +63,11 @@ void onHighWatermarkUpdated( /** * Register a {@link Listener}. + *
+ * The listener observes only the current leadership tenure: as described on + * {@link Listener}, it stops receiving updates once the partition is no longer led by + * this broker and is not re-armed if leadership is regained. A new listener must be + * registered to observe a later tenure. * * @param tp The partition to register the listener to. * @param listener The listener. diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 56a57c1af03f7..4dfa4dc56dedf 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -28,20 +28,46 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.Map /** * ListenerAdapter adapts the PartitionListener interface to the * PartitionWriter.Listener interface. + * + * This upholds the PartitionWriter.Listener contract that high watermark updates are + * delivered only while the partition is led by this broker. When the partition + * transitions to follower, is deleted or fails, its local log can be truncated and + * re-replicated from the new leader, so the high watermark would advance over records + * that this broker did not write; propagating those updates would corrupt the + * coordinator's committed state. The partition notifies these transitions before its + * fetcher is restarted (see ReplicaManager#applyDelta), i.e. before any such update can + * be produced, so gating on this flag is sufficient to stop them. */ private[group] class ListenerAdapter( val listener: PartitionWriter.Listener ) extends PartitionListener { + private val active = new AtomicBoolean(true) + override def onHighWatermarkUpdated( tp: TopicPartition, offset: Long ): Unit = { - listener.onHighWatermarkUpdated(tp, offset) + if (active.get()) { + listener.onHighWatermarkUpdated(tp, offset) + } + } + + override def onBecomingFollower(tp: TopicPartition): Unit = { + active.set(false) + } + + override def onFailed(tp: TopicPartition): Unit = { + active.set(false) + } + + override def onDeleted(tp: TopicPartition): Unit = { + active.set(false) } override def equals(that: Any): Boolean = that match { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index 6872eafe370ea..967768b611ecc 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -70,6 +70,56 @@ class CoordinatorPartitionWriterTest { ) } + @Test + def testListenerAdapterPropagatesHighWatermarkUpdates(): Unit = { + val tp = new TopicPartition("foo", 0) + val updates = new util.ArrayList[Long]() + val adapter = new ListenerAdapter(new PartitionWriter.Listener { + override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = updates.add(offset) + }) + + adapter.onHighWatermarkUpdated(tp, 10L) + adapter.onHighWatermarkUpdated(tp, 20L) + + assertEquals(util.List.of(10L, 20L), updates) + } + + @Test + def testListenerAdapterStopsPropagatingAfterBecomingFollower(): Unit = { + assertHighWatermarkPropagationStops(_.onBecomingFollower(_)) + } + + @Test + def testListenerAdapterStopsPropagatingAfterFailed(): Unit = { + assertHighWatermarkPropagationStops(_.onFailed(_)) + } + + @Test + def testListenerAdapterStopsPropagatingAfterDeleted(): Unit = { + assertHighWatermarkPropagationStops(_.onDeleted(_)) + } + + /** + * Verifies that no high watermark update is propagated to the wrapped listener + * once the given transition has been signalled. Such updates are not safe to + * apply because the partition is no longer led by this broker. + */ + private def assertHighWatermarkPropagationStops( + transition: (ListenerAdapter, TopicPartition) => Unit + ): Unit = { + val tp = new TopicPartition("foo", 0) + val updates = new util.ArrayList[Long]() + val adapter = new ListenerAdapter(new PartitionWriter.Listener { + override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = updates.add(offset) + }) + + adapter.onHighWatermarkUpdated(tp, 10L) + transition(adapter, tp) + adapter.onHighWatermarkUpdated(tp, 20L) + + assertEquals(util.List.of(10L), updates) + } + @Test def testConfig(): Unit = { val tp = new TopicPartition("foo", 0) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index d05ea995e5e3f..aa28437835902 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -5445,6 +5445,53 @@ class ReplicaManagerTest { } } + @Test + def testPartitionListenerWhenPartitionBecomesFollower(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val leaderEpoch = 5 + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val replicas = aliveBrokersIds.toList.map(Int.box).asJava + + val listener = new MockPartitionListener + listener.verify() + + // Broker 0 becomes leader of the partition. + val leaderDelta = createLeaderDelta( + topicId = topicId, + partition = tp, + leaderId = 0, + replicas = replicas, + isr = replicas, + leaderEpoch = leaderEpoch + ) + replicaManager.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply())) + + // Register a listener. + assertTrue(replicaManager.maybeAddListener(tp, listener)) + listener.verify() + + // Broker 0 transitions to follower of the partition with broker 1 as the new + // leader. The listener is notified that the partition is becoming a follower. + // This happens before the follower starts fetching from the new leader, hence + // before any high watermark update reflecting the new leader's records. + val followerDelta = createFollowerDelta( + topicId = topicId, + partition = tp, + followerId = 0, + leaderId = 1, + leaderEpoch = leaderEpoch + 1 + ) + replicaManager.applyDelta(followerDelta, imageFromTopics(followerDelta.apply())) + + listener.verify(expectedFollower = true) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0): TopicsDelta = { val leader = if (isStartIdLeader) startId else startId + 1 val delta = new TopicsDelta(TopicsImage.EMPTY)