storage: drop already-persisted updates in upsert-v2 so lagging writers converge#36860
Merged
patrickwwbutler merged 1 commit intoJun 2, 2026
Merged
Conversation
…rs converge
An upsert-v2 Kafka data export's output shard could freeze permanently whenever
a second writer advanced the shard ahead of this operator: the export's persist
write_frontier stopped advancing, the parent source kept ingesting, the source
reported `running` with no error, and the downstream stayed stale for hours.
## The bug
The upsert continual-feedback operators read their own output shard back as a
"feedback" input and emit a datum at `ts` only once the feedback frontier
(the shard upper) reaches it. v2's `drain_sealed_input` classified each
buffered datum two ways: eligible iff `ts == persist_upper`, everything else
ineligible and re-stashed. That "everything else" lumped together two very
different cases -- `ts > persist_upper` (legitimately not yet emittable) and
`ts < persist_upper` (already persisted by another writer). Re-stashing the
latter is a trap: `persist_upper` only advances, so `ts == persist_upper`
can never again hold, the datum is re-stashed forever, and -- because the
operator downgrades its output capability to `min_ineligible_ts` -- its output
frontier gets pinned BELOW the shard upper. With the output frontier pinned,
`mint_batch_descriptions` mints nothing, the sink never appends, the shard
never advances, and the feedback loop is wedged.
## Why a single writer never hits it, but any concurrent writer can
The bug needs a datum at `ts < persist_upper` in the batcher, and a single
writer can never produce one. The only thing that advances `persist_upper` is
this operator's own output flowing back through its sink: it emits at `ts`
exactly when `persist_upper == ts`, the sink writes `[ts, ts+1)`, and
`persist_upper` becomes `ts+1`. By the time `persist_upper` is past `ts` that
datum has already been emitted -- the operator is never holding a datum while
the feedback races past it, because it is itself what moves the feedback.
A *second* writer on the same shard breaks that invariant. The persist sink is
a multi-writer `compare_and_append` race: peers render the same deterministic
dataflow, reclock the same offsets to the same timestamps, and CaS-dedup their
identical batches -- whoever wins advances the shard, the others find the upper
already moved (see persist_sink's "it was us or someone" accounting). The loser
of a race then holds buffered data at timestamps a peer has already committed,
i.e. `ts < persist_upper`, and -- on the old code -- strands it. This arises
in any configuration with a concurrent writer:
* Active-active storage replication, in ordinary steady state and BY DESIGN:
both replicas write, race every batch, and the one that loses a CaS jump
can strand. No upgrade or read-only mode required.
* 0dt cutover: the old generation is read-write and far ahead while the new
(replacement) generation rehydrates cold and read-only. (The startup
`resume_upper` filter drops already-persisted data, but it is captured once
at startup; data the old writer overtakes *during* catch-up slips past it.)
* Controller restart / reconnect / failover windows where a fresh instance
hydrates while a prior incarnation's writes are still landing.
## It is a race -- hit rate scales with the lag
Stranding requires a peer to jump `persist_upper` past a datum in the window
between when this writer buffers it and its next drain, with that datum still
buffered at the deciding instant. So the probability scales with how far behind
the at-risk writer is:
* Warm active-active (both hydrated, racing in lockstep): the gap is at most
a little CaS jitter plus one batch, so stranding is RARE -- but a GC pause,
scheduling hiccup, or source burst can widen the window, so it is not zero.
* Cold 0dt catch-up (replacement far behind a caught-up writer): a large
backlog sits below the upper essentially the whole time, so stranding is
NEAR-CERTAIN. This is where it was actually observed.
Either way the failure is sticky and silent: once it strands, the earliest
stranded `ts` pins the frontier for good (`persist_upper` only climbs, so the
datum can never become eligible again), status stays `running` with no error,
and only a dataflow restart clears it (`resume_upper` re-drops the sub-upper
data) -- but nothing triggers one. A rare per-moment event over a long-running
deployment converges toward "happens eventually," and is terminal when it does.
## The fix
Classify three ways and DROP `ts < persist_upper` (already persisted by a
peer; we could not emit correct retractions for it and the sink would filter
it anyway), re-stash only `ts > persist_upper`, process `ts == persist_upper`.
This mirrors v1's `relevant = persist_upper.less_equal(ts)` and lets the output
frontier advance past the shard upper instead of pinning below it. The happy
path (`==` and `>`) is unchanged; only the below-upper case flips from
"re-stash forever / pin the frontier" to "drop and advance."
## Tested
- `lagging_replacement_below_upper_strands_data` (v2): drives the exact shape
-- feedback advanced to T=10 with no operator output, then source data at
ts=5,7 < T. Without the fix the output frontier pins at 5 (verified: the test
fails with `left: [(5)]`); with the fix the data is dropped and the frontier
advances to 11.
- `lagging_replacement_below_upper_is_dropped` (v1): the same scenario against
v1, asserting it already drops the data and advances cleanly -- the contrast
that makes v2 the outlier.
antiguru
approved these changes
Jun 2, 2026
Member
antiguru
left a comment
There was a problem hiding this comment.
Reasoning makes sense to me.
Contributor
|
Awesome find! thanks for digging in on this! |
antiguru
pushed a commit
that referenced
this pull request
Jun 3, 2026
…rs converge (#36860) An upsert-v2 Kafka data export's output shard could freeze permanently whenever a second writer advanced the shard ahead of this operator: the export's persist write_frontier stopped advancing, the parent source kept ingesting, the source reported `running` with no error, and the downstream stayed stale for hours. ## The bug The upsert continual-feedback operators read their own output shard back as a "feedback" input and emit a datum at `ts` only once the feedback frontier (the shard upper) reaches it. v2's `drain_sealed_input` classified each buffered datum two ways: eligible iff `ts == persist_upper`, everything else ineligible and re-stashed. That "everything else" lumped together two very different cases -- `ts > persist_upper` (legitimately not yet emittable) and `ts < persist_upper` (already persisted by another writer). Re-stashing the latter is a trap: `persist_upper` only advances, so `ts == persist_upper` can never again hold, the datum is re-stashed forever, and -- because the operator downgrades its output capability to `min_ineligible_ts` -- its output frontier gets pinned BELOW the shard upper. With the output frontier pinned, `mint_batch_descriptions` mints nothing, the sink never appends, the shard never advances, and the feedback loop is wedged. ## Why a single writer never hits it, but any concurrent writer can The bug needs a datum at `ts < persist_upper` in the batcher, and a single writer can never produce one. The only thing that advances `persist_upper` is this operator's own output flowing back through its sink: it emits at `ts` exactly when `persist_upper == ts`, the sink writes `[ts, ts+1)`, and `persist_upper` becomes `ts+1`. By the time `persist_upper` is past `ts` that datum has already been emitted -- the operator is never holding a datum while the feedback races past it, because it is itself what moves the feedback. A *second* writer on the same shard breaks that invariant. The persist sink is a multi-writer `compare_and_append` race: peers render the same deterministic dataflow, reclock the same offsets to the same timestamps, and CaS-dedup their identical batches -- whoever wins advances the shard, the others find the upper already moved (see persist_sink's "it was us or someone" accounting). The loser of a race then holds buffered data at timestamps a peer has already committed, i.e. `ts < persist_upper`, and -- on the old code -- strands it. This arises in any configuration with a concurrent writer: * Active-active storage replication, in ordinary steady state and BY DESIGN: both replicas write, race every batch, and the one that loses a CaS jump can strand. No upgrade or read-only mode required. * 0dt cutover: the old generation is read-write and far ahead while the new (replacement) generation rehydrates cold and read-only. (The startup `resume_upper` filter drops already-persisted data, but it is captured once at startup; data the old writer overtakes *during* catch-up slips past it.) * Controller restart / reconnect / failover windows where a fresh instance hydrates while a prior incarnation's writes are still landing. ## It is a race -- hit rate scales with the lag Stranding requires a peer to jump `persist_upper` past a datum in the window between when this writer buffers it and its next drain, with that datum still buffered at the deciding instant. So the probability scales with how far behind the at-risk writer is: * Warm active-active (both hydrated, racing in lockstep): the gap is at most a little CaS jitter plus one batch, so stranding is RARE -- but a GC pause, scheduling hiccup, or source burst can widen the window, so it is not zero. * Cold 0dt catch-up (replacement far behind a caught-up writer): a large backlog sits below the upper essentially the whole time, so stranding is NEAR-CERTAIN. This is where it was actually observed. Either way the failure is sticky and silent: once it strands, the earliest stranded `ts` pins the frontier for good (`persist_upper` only climbs, so the datum can never become eligible again), status stays `running` with no error, and only a dataflow restart clears it (`resume_upper` re-drops the sub-upper data) -- but nothing triggers one. A rare per-moment event over a long-running deployment converges toward "happens eventually," and is terminal when it does. ## The fix Classify three ways and DROP `ts < persist_upper` (already persisted by a peer; we could not emit correct retractions for it and the sink would filter it anyway), re-stash only `ts > persist_upper`, process `ts == persist_upper`. This mirrors v1's `relevant = persist_upper.less_equal(ts)` and lets the output frontier advance past the shard upper instead of pinning below it. The happy path (`==` and `>`) is unchanged; only the below-upper case flips from "re-stash forever / pin the frontier" to "drop and advance." ## Tested - `lagging_replacement_below_upper_strands_data` (v2): drives the exact shape -- feedback advanced to T=10 with no operator output, then source data at ts=5,7 < T. Without the fix the output frontier pins at 5 (verified: the test fails with `left: [(5)]`); with the fix the data is dropped and the frontier advances to 11. - `lagging_replacement_below_upper_is_dropped` (v1): the same scenario against v1, asserting it already drops the data and advances cleanly -- the contrast that makes v2 the outlier.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
An upsert-v2 Kafka data export's output shard could freeze permanently whenever
a second writer advanced the shard ahead of this operator: the export's persist
write_frontier stopped advancing, the parent source kept ingesting, the source
reported
runningwith no error, and the downstream stayed stale for hours.The bug
The upsert continual-feedback operators read their own output shard back as a
"feedback" input and emit a datum at
tsonly once the feedback frontier(the shard upper) reaches it. v2's
drain_sealed_inputclassified eachbuffered datum two ways: eligible iff
ts == persist_upper, everything elseineligible and re-stashed. That "everything else" lumped together two very
different cases --
ts > persist_upper(legitimately not yet emittable) andts < persist_upper(already persisted by another writer). Re-stashing thelatter is a trap:
persist_upperonly advances, sots == persist_uppercan never again hold, the datum is re-stashed forever, and -- because the
operator downgrades its output capability to
min_ineligible_ts-- its outputfrontier gets pinned BELOW the shard upper. With the output frontier pinned,
mint_batch_descriptionsmints nothing, the sink never appends, the shardnever advances, and the feedback loop is wedged.
Why a single writer never hits it, but any concurrent writer can
The bug needs a datum at
ts < persist_upperin the batcher, and a singlewriter can never produce one. The only thing that advances
persist_upperisthis operator's own output flowing back through its sink: it emits at
tsexactly when
persist_upper == ts, the sink writes[ts, ts+1), andpersist_upperbecomests+1. By the timepersist_upperis pasttsthatdatum has already been emitted -- the operator is never holding a datum while
the feedback races past it, because it is itself what moves the feedback.
A second writer on the same shard breaks that invariant. The persist sink is
a multi-writer
compare_and_appendrace: peers render the same deterministicdataflow, reclock the same offsets to the same timestamps, and CaS-dedup their
identical batches -- whoever wins advances the shard, the others find the upper
already moved (see persist_sink's "it was us or someone" accounting). The loser
of a race then holds buffered data at timestamps a peer has already committed,
i.e.
ts < persist_upper, and -- on the old code -- strands it. This arisesin any configuration with a concurrent writer:
both replicas write, race every batch, and the one that loses a CaS jump
can strand. No upgrade or read-only mode required.
(replacement) generation rehydrates cold and read-only. (The startup
resume_upperfilter drops already-persisted data, but it is captured onceat startup; data the old writer overtakes during catch-up slips past it.)
hydrates while a prior incarnation's writes are still landing.
It is a race -- hit rate scales with the lag
Stranding requires a peer to jump
persist_upperpast a datum in the windowbetween when this writer buffers it and its next drain, with that datum still
buffered at the deciding instant. So the probability scales with how far behind
the at-risk writer is:
a little CaS jitter plus one batch, so stranding is RARE -- but a GC pause,
scheduling hiccup, or source burst can widen the window, so it is not zero.
backlog sits below the upper essentially the whole time, so stranding is
NEAR-CERTAIN. This is where it was actually observed.
Either way the failure is sticky and silent: once it strands, the earliest
stranded
tspins the frontier for good (persist_upperonly climbs, so thedatum can never become eligible again), status stays
runningwith no error,and only a dataflow restart clears it (
resume_upperre-drops the sub-upperdata) -- but nothing triggers one. A rare per-moment event over a long-running
deployment converges toward "happens eventually," and is terminal when it does.
The fix
Classify three ways and DROP
ts < persist_upper(already persisted by apeer; we could not emit correct retractions for it and the sink would filter
it anyway), re-stash only
ts > persist_upper, processts == persist_upper.This mirrors v1's
relevant = persist_upper.less_equal(ts)and lets the outputfrontier advance past the shard upper instead of pinning below it. The happy
path (
==and>) is unchanged; only the below-upper case flips from"re-stash forever / pin the frontier" to "drop and advance."
Tested
lagging_replacement_below_upper_strands_data(v2): drives the exact shape-- feedback advanced to T=10 with no operator output, then source data at
ts=5,7 < T. Without the fix the output frontier pins at 5 (verified: the test
fails with
left: [(5)]); with the fix the data is dropped and the frontieradvances to 11.
lagging_replacement_below_upper_is_dropped(v1): the same scenario againstv1, asserting it already drops the data and advances cleanly -- the contrast
that makes v2 the outlier.