Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,24 @@ public interface PartitionWriter {
/**
* Listener allowing to listen to high watermark changes. This is meant
* to be used in conjunction with {@link PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords, short)}.
* <p>
* High watermark updates are delivered only while this broker is the leader of the
* partition. Once the partition is no longer led by this broker (it transitions to
* follower, is deleted or fails), no further updates are delivered. This guarantee
Comment thread
dajac marked this conversation as resolved.
Outdated
* allows the listener to treat every update as advancing over records that this
* broker wrote as leader. It is required because, after a leadership change, the
* local log can be truncated and re-replicated from the new leader, so a high
* watermark observed as a follower may advance over records that this broker never
* wrote.
*/
interface Listener {
/**
* Called when the high watermark of the partition advances. Only invoked while
* this broker is the leader of the partition (see {@link Listener}).
*
* @param tp The topic partition.
* @param offset The new high watermark.
*/
void onHighWatermarkUpdated(
TopicPartition tp,
long offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,46 @@ import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}

import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Map

/**
* ListenerAdapter adapts the PartitionListener interface to the
* PartitionWriter.Listener interface.
*
* This upholds the PartitionWriter.Listener contract that high watermark updates are
* delivered only while the partition is led by this broker. When the partition
* transitions to follower, is deleted or fails, its local log can be truncated and
* re-replicated from the new leader, so the high watermark would advance over records
* that this broker did not write; propagating those updates would corrupt the
* coordinator's committed state. The partition notifies these transitions before its
* fetcher is restarted (see ReplicaManager#applyDelta), i.e. before any such update can
* be produced, so gating on this flag is sufficient to stop them.
*/
private[group] class ListenerAdapter(
val listener: PartitionWriter.Listener
) extends PartitionListener {
private val active = new AtomicBoolean(true)

override def onHighWatermarkUpdated(
tp: TopicPartition,
offset: Long
): Unit = {
listener.onHighWatermarkUpdated(tp, offset)
if (active.get()) {
listener.onHighWatermarkUpdated(tp, offset)
}
}

override def onBecomingFollower(tp: TopicPartition): Unit = {
active.set(false)
}

override def onFailed(tp: TopicPartition): Unit = {
active.set(false)
}

override def onDeleted(tp: TopicPartition): Unit = {
active.set(false)
}

override def equals(that: Any): Boolean = that match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,56 @@ class CoordinatorPartitionWriterTest {
)
}

@Test
def testListenerAdapterPropagatesHighWatermarkUpdates(): Unit = {
val tp = new TopicPartition("foo", 0)
val updates = new util.ArrayList[Long]()
val adapter = new ListenerAdapter(new PartitionWriter.Listener {
override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = updates.add(offset)
})

adapter.onHighWatermarkUpdated(tp, 10L)
adapter.onHighWatermarkUpdated(tp, 20L)

assertEquals(util.List.of(10L, 20L), updates)
}

@Test
def testListenerAdapterStopsPropagatingAfterBecomingFollower(): Unit = {
assertHighWatermarkPropagationStops(_.onBecomingFollower(_))
}

@Test
def testListenerAdapterStopsPropagatingAfterFailed(): Unit = {
assertHighWatermarkPropagationStops(_.onFailed(_))
}

@Test
def testListenerAdapterStopsPropagatingAfterDeleted(): Unit = {
assertHighWatermarkPropagationStops(_.onDeleted(_))
}

/**
* Verifies that no high watermark update is propagated to the wrapped listener
* once the given transition has been signalled. Such updates are not safe to
* apply because the partition is no longer led by this broker.
*/
private def assertHighWatermarkPropagationStops(
transition: (ListenerAdapter, TopicPartition) => Unit
): Unit = {
val tp = new TopicPartition("foo", 0)
val updates = new util.ArrayList[Long]()
val adapter = new ListenerAdapter(new PartitionWriter.Listener {
override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = updates.add(offset)
})

adapter.onHighWatermarkUpdated(tp, 10L)
transition(adapter, tp)
adapter.onHighWatermarkUpdated(tp, 20L)

assertEquals(util.List.of(10L), updates)
}

@Test
def testConfig(): Unit = {
val tp = new TopicPartition("foo", 0)
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5445,6 +5445,53 @@ class ReplicaManagerTest {
}
}

@Test
def testPartitionListenerWhenPartitionBecomesFollower(): Unit = {
val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
brokerId = 0, aliveBrokersIds)
try {
val tp = new TopicPartition(topic, 0)
val replicas = aliveBrokersIds.toList.map(Int.box).asJava

val listener = new MockPartitionListener
listener.verify()

// Broker 0 becomes leader of the partition.
val leaderDelta = createLeaderDelta(
topicId = topicId,
partition = tp,
leaderId = 0,
replicas = replicas,
isr = replicas,
leaderEpoch = leaderEpoch
)
replicaManager.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply()))

// Register a listener.
assertTrue(replicaManager.maybeAddListener(tp, listener))
listener.verify()

// Broker 0 transitions to follower of the partition with broker 1 as the new
// leader. The listener is notified that the partition is becoming a follower.
// This happens before the follower starts fetching from the new leader, hence
// before any high watermark update reflecting the new leader's records.
val followerDelta = createFollowerDelta(
topicId = topicId,
partition = tp,
followerId = 0,
leaderId = 1,
leaderEpoch = leaderEpoch + 1
)
replicaManager.applyDelta(followerDelta, imageFromTopics(followerDelta.apply()))

listener.verify(expectedFollower = true)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)
Expand Down
Loading