Reapply "Column paged merge batcher"#36842
Merged
DAlperin merged 8 commits intoJun 2, 2026
Merged
Conversation
2b64b12 to
35e4c9d
Compare
Adds a Materialize-private merge-batcher that routes per-chunk transient state through `ColumnPager`, bounding the resident-bytes peak under memory pressure. Behind `enable_column_paged_batcher` (default off). Three building blocks in `mz-timely-util`: * `ColumnMergeBatcher` + `merge_chains` + `extract_chain` in `columnar/merge_batcher.rs` — chains hold `PagedColumn` entries that resolve to disk on demand. Reuses the existing `Column::merge_from` / `Column::extract` building blocks. * `BuilderInput for Column<((K, V), T, R)>` so DD `OrdValBuilder` can consume the batcher's output without a container conversion. * `column_pager` gains a process-global pager singleton (matching the lower-level pager's global-atomic design) and a per-decision skip/page counter for diagnostics. Compute integration: * `RowRowColPagedBuilder` alias + `PartialEq<&RowRef> for DatumSeq` / `PushInto<&RowRef> for DatumContainer` so the Row-keyed arrange path type-checks. * Worker init in `apply_worker_config` reads three new dyncfgs and installs the process-global pager: `enable_column_paged_batcher` (on/off), `column_paged_batcher_backend` (`swap` | `file`), `column_paged_batcher_budget_fraction` (fraction of replica memory, default 5%). Per-worker / shared pool sizes derive from `memory_limiter::get_memory_limit` with sensible floors and caps. * Two arrange call sites switched to the paged path: `render/context.rs::arrange_collection` (central ArrangeBy) and `render/join/linear_join.rs::JoinStage`. Other arrange sites (logging) left on the legacy `ColInternalMerger` path. Also extends `Materialized` and `Clusterd` mzcompose services to accept `memory_swap` and `mem_swappiness`, so callers can configure container-level swap behavior independent of the batcher.
Adds three pieces of validation tooling for the column-paged merge batcher: a criterion microbench, an end-to-end timely example, and feature-benchmark scenarios. Criterion bench (`src/timely-util/benches/columnar_merge_batcher.rs`): compares the legacy `ColumnMerger` against the new path with disabled / swap / lz4 pagers across three input regimes (mixed, collisions, disjoint) and four cache-tier sizes. Prints a throughput summary table when the group finishes. Good for per-chunk-merge perf comparisons; doesn't exercise the dataflow operator graph. End-to-end example (`src/timely-util/examples/column_paged_spill.rs`): drives `arrange_core` over a cancellation workload (positives + negatives at the same time, so the spine stays empty and all pressure lives in the merge-batcher). Configurable workers / records / budget; back-to-back baseline + spill modes; optional RSS sampler thread via `ps`. Modeled on `differential-dataflow/examples/columnar_spill.rs` but uses our `Col2ValPagedBatcher` + `ColumnPager` + `TieredPolicy` directly instead of DD's `SpillBatcher`/`Threshold`/`FileSpill` plumbing. `cargo run --release --example column_paged_spill` for a smoke test; see `--help` for sweep options. Feature-benchmark scenarios (`misc/python/.../scenarios/benchmark_main.py`): * `DifferentialJoinColumnPaged` — same query shape as `DifferentialJoin`, paged batcher enabled. Measures steady-state overhead vs the legacy path. * `DifferentialJoinHydrationBaseline` / `DifferentialJoinHydrationFile` — sister leaves of a non-runnable `DifferentialJoinHydration` parent. Each measures the time to re-hydrate a linear-join arrangement after `REPLICATION FACTOR 0 -> 1` toggling. Baseline has the paged batcher off; File enables it with the file backend and `budget_fraction = 0.01` so chunks spill rather than competing with the spine for RAM. Compare under `--this-memory` + `--this-memory-swap` to evaluate user-space spill vs OS swap. Feature-benchmark CLI plumbing (`test/feature-benchmark/mzcompose.py`): adds `--this-memory`, `--this-memory-swap`, `--this-mem-swappiness` (and `--other-*` companions) so memory caps and swap behavior are configurable per side, plus `--skip-other` for iterating on `this` without the comparison round trip. The benchmark-result evaluator tolerates the single-side case by returning `None` ratios instead of indexing past the end of `_points`.
…bounds, copy_from
35e4c9d to
8246be7
Compare
…re harness
Three changes on top of the recycling fix to close the remaining
feature-benchmark regressions and let skipped scenarios coexist with
retry filtering:
* Whole-chunk passthrough in merge_chains. Heads arrive resident via
FetchIter, so endpoint peeks are free. When a head sorts entirely
before the other side's current record, ship it wholesale and skip
the per-record merge. Same shape as the legacy ColumnMerger fast
path; gated on positions[i] == 0.
* STASH_CAP lowered from 16 to 2 + MAX_RECYCLE_BYTES guard
(1 << 22, ~4 MiB). The stash isn't a hoard — it's a hot-buffer
cache for the result/keep/ship churn. Passthrough keeps most chunks
off the merge inner loop, so 2 buffers covers the steady-state
ship/refill ping-pong without inflating per-batcher resident
overhead (invisible to the pager budget).
* Gate DifferentialJoin{ColumnPaged,Hydration*} on MzVersion > 26.28.0
via can_run. Dev versions don't distinguish the dyncfg's presence,
so the scenarios skip on both sides during 0.x development and
start running once 26.28.0 ships.
* feature-benchmark/mzcompose: filter the rerun list by
has_scenario_result before has_scenario_regression. Skipped
scenarios (via can_run) leave no entry in the report; the previous
filter raised KeyError instead of just excluding them from reruns.
3d9e4c1 to
2770d66
Compare
Adds a kill switch over the type swap to Col2ValPagedBatcher / RowRowColPagedBuilder. The new dyncfgs: * enable_column_paged_batcher (default false): when true, arrange call sites use the columnar-native paged batcher / builder. When false, they fall back to the legacy columnation Col2ValBatcher / RowRowBuilder path that shipped before MaterializeInc#36627. * enable_column_paged_batcher_spill (default false): renamed from the previous enable_column_paged_batcher (which controlled eviction). With the path flag off it has no effect; with the path flag on it controls whether the pager actually evicts under budget pressure. Both flags default off; arranges run on the legacy columnation path unless someone opts in. DifferentialJoinHydrationFile scenario opts both on (path + spill) to exercise the spill path. Read at operator construction time, so a flip takes effect on dataflows created after the change; existing dataflows continue on whichever path they were built with. The runtime if/else at each arrange site monomorphizes both branches, but they return the same Arranged<S, TraceAgent<RowRowSpine<_,_>>> so the type system is happy and binary bloat is bounded. Touch sites: * compute-types/dyncfgs.rs: define both flags, register them. * compute/src/compute_state.rs: spill toggle reads the renamed flag. * compute/src/render/context.rs: thread use_paged_path through arrange_collection and branch the mz_arrange_core call. * compute/src/render/join/linear_join.rs: same branch at the JoinStage arrange. * feature_benchmark scenarios: HydrationFile now sets both flags. * parallel_workload + mzcompose lint allowlists: add the new flag.
2770d66 to
5def314
Compare
antiguru
approved these changes
Jun 2, 2026
Comment on lines
+101
to
+108
| /// Recycled empty `Column::Typed` chunks. Drained heads and shipped result | ||
| /// buffers feed in here; subsequent merge / extract calls pop from here | ||
| /// instead of starting from a zero-capacity `Column::default()`. Mirrors | ||
| /// the stash carried by the upstream `differential_dataflow` merge-batcher | ||
| /// framework, which this type forks. Without it, each shipped chunk | ||
| /// triggers a fresh per-leaf grow cycle and per-merge-round allocation | ||
| /// dominates the inner loop. | ||
| stash: Vec<Column<(D, T, R)>>, |
Member
There was a problem hiding this comment.
We should clear the stash in seal because it's unlikely that we'll immediately need the stash again, and it's a good opportunity to release some memory for whatever comes next.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Reapply note
Reapplies #36627, which was reverted in #36839 after feature-benchmark regressions tracked in CLU-100 (Insert 22% slower, Update 17%, CreateIndex 44%, DifferentialJoinHydrationBaseline 55%, SkewedJoin 72%, and others — 10 scenarios past the 10% threshold).
Root cause was inside the forked merge-batcher framework, not in the merge inner loop itself. The new
ColumnMergeBatchershipped without the recycled-chunk stash that the upstreamdifferential_dataflow::MergeBatchercarries: every shipped result and exhausted head dropped its leaf allocations, and the next merge round restarted from zero-capacityVecs. The per-leaf geometric regrowth tax (~2× the chunk's bytes in memcpy traffic per chunk) landed on every record that flowed through the two arrange call sites, multiplied bylog(N)merge rounds per record — matching the regression shape exactly (ingestion / hydration / maintained joins hit hard, arrange-light scenarios like FinishOrderByLimit hit less hard).The reapply has three changes on top of the original PR:
ColumnMergeBatcher: a small (STASH_CAP = 2, with aMAX_RECYCLE_BYTESper-buffer guard) free-list of clearedColumn::Typedbuffers, threaded throughmerge_chains/extract_chain/drain_side. Drained heads, finished result buffers, and consumed source buffers feed in; the next allocation pulls out instead of starting at default.merge_chains: heads arrive materialized viaFetchIter, so peeking endpoints is free. When a head sorts entirely before the other side's current record, ship it wholesale and skip the per-record merge. Same shape as the legacyColumnMerger::mergefast path, gated onpositions[i] == 0.enable_column_paged_batcher, defaultfalse) over the type swap at the two arrange sites. With the flag off, arranges fall back to the legacyCol2ValBatcher/RowRowBuilder(columnation-merger) path that shipped before Column paged merge batcher #36627. With it on, they use the columnar-nativeCol2ValPagedBatcher/RowRowColPagedBuilder. Eviction is gated by a separateenable_column_paged_batcher_spill(defaultfalse).Build #16668 confirmed the rebased combination of (1) + (2) closes every CLU-100 wallclock regression and stays inside memory thresholds. Build #16672 will be the first run with the feature flag wired in; with the default
falseit should match pre-#36627 main exactly.Motivation
The merge-batcher's transient state (chains of sealed input chunks awaiting merge / extract) sits between input ingestion and the spine. Under memory pressure that transient peak is what trips OOMs, not the spine itself — the spine has already consolidated.
This PR plugs
ColumnPager(#36552) into the merge-batcher so those chain entries can live on disk (or compressed in RAM) instead of resident memory, while leaving the hot-path merge / extract logic and on-wire chunk shape unchanged.Description
A new
ColumnMergeBatcherinmz-timely-utilforks the DD merge-batcher framework so chain entries arePagedColumn(Resident/Paged/Compressed) rather than residentColumns. Streaming driversmerge_chains/extract_chainwalk those chains via aFetchIterthat materializes one head per side on demand and hands finished output back to the pager. The per-chunk merge / extract logic itself is the sameColumn::merge_from/Column::extractalready used by the in-memoryColumnMerger. A per-batcherstashrecycles drained head buffers and shipped result buffers across calls so the inner loop reuses leaf-Veccapacity instead of regrowing from zero each round. Whole-chunk passthrough onpositions[i] == 0skips the per-record merge for disjoint-key heads.Compute integration:
compute-types:enable_column_paged_batcher(defaultfalse): kill switch over the batcher type swap. When off, arrange call sites useCol2ValBatcher/RowRowBuilder(legacy columnation path). When on, they useCol2ValPagedBatcher/RowRowColPagedBuilder(new columnar-native path that the pager can spill).enable_column_paged_batcher_spill(defaultfalse): controls whether the pager actually evicts under budget pressure. Only meaningful when the path flag is on.column_paged_batcher_budget_fraction(default 5% of replica memory; floored at 128 MiB). Pager backend follows--scratch-directoryavailability the same waymz_ore::pageralready does (file when a scratch directory is configured, swap otherwise).apply_worker_configderives(enabled, total, backend)from those dyncfgs and routes throughcolumn_pager::apply_tiered_config, which holds a process-wideTieredPolicysingleton and mutates its atomic budget / backend / codec in place. In-flightResidentTickets keep crediting the same atomic across reconfigures, so operator-driven tunes (or other workers in the same process reapplying the same config) don't orphan accounting onto a stale policy.render/context.rs(arrange_collection) andrender/join/linear_join.rs(JoinStagearrange) branch on the path flag at construction time. Both arms returnArranged<S, TraceAgent<RowRowSpine<_, _>>>so the runtimeif/elsetype-checks cleanly; monomorphization keeps both versions of the inner arrange body in the binary.Col2ValPagedBatchertype alias and aBuilderInput for Column<((K, V), T, R)>impl so the DDOrdValBuildercan consume the batcher'sColumnoutput without a container conversion.RowRowColPagedBuilder+ a couple of smallPushInto/PartialEqimpls inrow_spine.rsget the Row-keyed paths type-checking.MaterializedandClusterdmzcompose services gainmemory_swapandmem_swappinessso feature-benchmarks can configure container swap behavior independently of the batcher.Observability:
column_pager::metricsregisters aPagerMetricsstruct with the process metrics registry. Counters cover skip / pageout / pagein decisions and bytes through each path; computed gauges exposemz_column_pager_budget_remaining_bytesandmz_column_pager_budget_configured_bytesagainst the liveTieredPolicyatomics. Compute init wires the registration; the timely-util module is registry-agnostic, so callers without a registry (tests, benches, examples) just see no-op observers.Verification
mz-timely-util, columnar + column_pager modules): chunker correctness, per-chunk merge / extract, drain,ColumnMergeBatcherend-to-end seal underColumnPager::disabled()and aForcePagePolicythat forces every chunk through the pager. The recycling stash is exercised implicitly by every end-to-end test;TieredPolicy::reconfiguretests cover in-flight ticket preservation across pool resizes, shrink-saturates-at-zero, and live backend/codec swap. Proptests cover merge / extract invariants.benches/columnar_merge_batcher.rs): compares legacyColumnMergeragainst the paged path with disabled / swap / lz4 across mixed / collision / disjoint inputs and four cache-tier sizes; prints a throughput summary table.examples/column_paged_spill.rs): drivesarrange_coreover a cancellation workload (positives + negatives at the same time so the spine stays empty and all pressure lives in the batcher). Back-to-back baseline + spill modes with an optional RSS sampler thread.DifferentialJoinColumnPagedmeasures steady-state overhead vs.DifferentialJoinwith the path flag on.DifferentialJoinHydrationBaseline/DifferentialJoinHydrationFilemeasure re-hydration time afterREPLICATION FACTOR 0 → 1toggling. File variant sets bothenable_column_paged_batcherandenable_column_paged_batcher_spillto exercise actual eviction. Gated onMzVersion > 26.28.0viacan_run; dev versions skip on both sides until 26.28.0 ships.Risk
The feature flag defaults off. With
enable_column_paged_batcher = false, arrange operators run on the sameCol2ValBatcher/RowRowBuilderpath that shipped before #36627. The new columnar-native batcher infrastructure is still compiled in, but no production dataflow uses it until the flag is flipped. Reverting in production is a flag flip, not a code revert.The flag is read at operator construction time. Flipping the dyncfg only affects dataflows created after the change; existing dataflows continue on whichever path they were built with until they're rebuilt. This matches existing dyncfg semantics in compute (most flags don't switch live operators).
Whole-chunk passthrough is implemented for the resident case only. Heads arrive resident from
FetchIter(Skip path) or freshly decoded from disk (Paged/Compressed path), so peeking endpoints is free in both cases. The fast path is gated onpositions[i] == 0so only chains that haven't been partially consumed can short-circuit, matching the legacyColumnMerger::mergeshape.Recycling stash is not visible to the pager budget. The 2-entry per-batcher stash holds cleared-but-capacity-preserving
Column::Typedallocations that don't carryResidentTickets. The pager budget sees fewer bytes than RSS by up to ~4 MiB per batcher (MAX_RECYCLE_BYTES). This matches the behavior of the upstream DDMergeBatcherstash that the legacy path runs on (also untracked), so this is not a regression in accounting fidelity. If tighter accounting is wanted later, the follow-up is to give stashed chunks lightweight tickets so the policy can choose to evict them under pressure.Logging arranges still use the legacy
ColumnMergerpath. Bytes paged to swap / file don't enter themz_arrangement_batcher_*_rawRSS-shaped accounting tables, matching how those tables already treat shipped chunks.