diff --git a/Cargo.lock b/Cargo.lock index ed98cb17a8f7f..45b6359669e7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8390,6 +8390,7 @@ dependencies = [ "differential-dataflow", "either", "futures-util", + "itertools 0.14.0", "lgalloc", "lz4_flex", "mz-ore", diff --git a/misc/python/materialize/feature_benchmark/benchmark_result.py b/misc/python/materialize/feature_benchmark/benchmark_result.py index 31c5f56f0c7ef..dd272a6688f2e 100644 --- a/misc/python/materialize/feature_benchmark/benchmark_result.py +++ b/misc/python/materialize/feature_benchmark/benchmark_result.py @@ -93,6 +93,11 @@ def this_as_str(self) -> str: return f"{self.this():>11.3f}" def other(self) -> T: + # `_points` has length 1 when the runner ran only the THIS side + # (e.g. `--skip-other`); treat the absent baseline as `None` so + # report rendering falls through to its `None` formatting. + if len(self._points) < 2: + return None # type: ignore[return-value] return self._points[1] def other_as_str(self) -> str: diff --git a/misc/python/materialize/feature_benchmark/benchmark_result_evaluator.py b/misc/python/materialize/feature_benchmark/benchmark_result_evaluator.py index 6900b3f57ce90..87fcd997a07ae 100644 --- a/misc/python/materialize/feature_benchmark/benchmark_result_evaluator.py +++ b/misc/python/materialize/feature_benchmark/benchmark_result_evaluator.py @@ -51,7 +51,13 @@ def get_threshold(self, metric: BenchmarkScenarioMetric) -> float: return self.threshold_by_measurement_type[metric.measurement_type] def ratio(self, metric: BenchmarkScenarioMetric) -> float | None: - if metric._points[0] is None or metric._points[1] is None: + # `_points` has length 1 when the runner ran only the THIS side + # (e.g. `--skip-other`); there's no baseline to compare against. + if ( + len(metric._points) < 2 + or metric._points[0] is None + or metric._points[1] is None + ): return None else: return metric._points[0] / metric._points[1] diff --git a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py index 4f1d577fb6e0c..0a9299c459c35 100644 --- a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py +++ b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py @@ -947,6 +947,157 @@ def benchmark(self) -> MeasurementSource: """) +class DifferentialJoinColumnPaged(Dataflow): + """Same shape as `DifferentialJoin`, but with the column-paged merge + batcher enabled for the linear-join arrange stage. + + Compare against `DifferentialJoin` to gauge the steady-state overhead of + the paged path (resident chunks plus byte-budget bookkeeping) when no + pressure forces spill. To measure spill cost, see + `DifferentialJoinHydrationFile`. + """ + + @classmethod + def can_run(cls, version: MzVersion) -> bool: + # Requires `enable_column_paged_batcher`, introduced in the merge + # batcher rework that lands in 26.28.0. Skips on both sides while + # we're still on `26.28.0-dev.0` because `MzVersion` strips the git + # hash and can't distinguish dev builds with the dyncfg from dev + # builds without it. + return version > MzVersion.create(26, 28, 0) + + def init(self) -> list[Action]: + return [ + self.view_ten(), + TdAction(f""" +$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}} +$ postgres-execute connection=mz_system +ALTER SYSTEM SET enable_column_paged_batcher = true; + +> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} +"""), + ] + + def benchmark(self) -> MeasurementSource: + return Td(f""" +> SELECT 1; + /* A */ +1 + + +> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1) + /* B */ +{self.n()} +""") + + +class DifferentialJoinHydration(Dataflow): + """Non-leaf parent for the linear-join hydration benchmark family. + + Holds the shared `init` / `benchmark` (replica-toggle hydration loop) so + Baseline and File variants only need to override `shared()` with the + dyncfgs they want set. Has subclasses, so the feature-benchmark runner + treats it as non-leaf and never executes it directly — pick one of the + leaf classes via `--root-scenario`. + + Run both leaves under a memory-capped Materialized (`--this-memory=2g`) + so the baseline has to swap and the paged-file variant has somewhere + predictable to spill. + """ + + # SCALE=8 → 100M rows per side, ~1.6 GiB per side input. Two sides plus + # the join arrangement (typically 2–4× input) reliably exceeds a few + # GiB total; a 2g container cap forces real swap pressure on the + # baseline. File variant's 16 MiB per-worker + 128 MiB shared budget + # means almost everything spills under that cap. + SCALE = 8 + + @classmethod + def can_run(cls, version: MzVersion) -> bool: + # Both leaf variants set `enable_column_paged_batcher` (baseline = + # false, file = true), so gate on the dyncfg's introduction in + # 26.28.0. Comment in `DifferentialJoinColumnPaged.can_run` explains + # why dev versions can't distinguish the dyncfg's presence. + return version > MzVersion.create(26, 28, 0) + + def init(self) -> list[Action]: + # `v1` lives on the default cluster, not `join_cluster`, so the + # replication-factor toggle in `benchmark` only tears down `v2`'s + # dataflow. Keeps the measurement scoped to the join-arrangement + # rebuild we're trying to measure. + return [ + self.view_ten(), + TdAction(f""" +> CREATE MATERIALIZED VIEW v1 + AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} +> SELECT COUNT(*) FROM v1 +{self.n()} + +> CREATE CLUSTER join_cluster SIZE 'scale=1,workers=16', REPLICATION FACTOR 1 +"""), + ] + + def benchmark(self) -> MeasurementSource: + # Match HydrateIndex's pattern: take the cluster offline *before* + # defining the object so the dataflow doesn't pre-hydrate. The + # `REPLICATION FACTOR 1` flip after `/* A */` is the actual + # hydration trigger we want to time. + return Td(f""" +> DROP MATERIALIZED VIEW IF EXISTS v2 + +> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 0) + +> CREATE MATERIALIZED VIEW v2 + IN CLUSTER join_cluster + AS SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1) + +> SELECT 1 + /* A */ +1 +> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 1) +> SET CLUSTER = join_cluster +> SELECT * FROM v2 + /* B */ +{self.n()} +> SET CLUSTER = default +""") + + +class DifferentialJoinHydrationBaseline(DifferentialJoinHydration): + """Hydration measurement with the paged batcher disabled (current + production path). Compare against `DifferentialJoinHydrationFile` to + see if user-space file-backed spill beats OS swap under pressure. + """ + + def shared(self) -> Action: + return TdAction(""" +$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +$ postgres-execute connection=mz_system +ALTER SYSTEM SET enable_column_paged_batcher = false; +""") + + +class DifferentialJoinHydrationFile(DifferentialJoinHydration): + """Hydration time with the column-paged batcher on, file backend, and + a tight budget fraction so the merge-batcher transient spills rather + than competing with the spine for RAM. + + `budget_fraction = 0.01` (1% of announced memory limit) lands in the + clamp floors of the worker-init derivation (per-worker 16 MiB, + shared 128 MiB), giving us the same effective sizing we benchmarked + before the fraction-knob refactor. + """ + + def shared(self) -> Action: + return TdAction(""" +$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +$ postgres-execute connection=mz_system +ALTER SYSTEM SET enable_column_paged_batcher = true; +ALTER SYSTEM SET enable_column_paged_batcher_spill = true; +ALTER SYSTEM SET column_paged_batcher_budget_fraction = 0.01; +""") + + class FullOuterJoin(Dataflow): def benchmark(self) -> BenchmarkingSequence: columns_select = ", ".join( diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 7e66057952815..ad342c73224d5 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -494,6 +494,9 @@ def get_default_system_parameters( "enable_compute_half_join2", "enable_mz_join_core", "linear_join_yielding", + "enable_column_paged_batcher", + "enable_column_paged_batcher_spill", + "column_paged_batcher_budget_fraction", "enable_lgalloc_eager_reclamation", "lgalloc_background_interval", "lgalloc_file_growth_dampener", diff --git a/misc/python/materialize/mzcompose/service.py b/misc/python/materialize/mzcompose/service.py index 3d5d4e4c38b0c..51370c73d9363 100644 --- a/misc/python/materialize/mzcompose/service.py +++ b/misc/python/materialize/mzcompose/service.py @@ -184,6 +184,13 @@ class ServiceConfig(TypedDict, total=False): security_opt: list[str] | None """Additional security options to apply to the container.""" + memswap_limit: int | str | None + """Total memory+swap limit (Docker `memswap_limit`). `-1` disables the swap + limit, leaving only the memory limit in effect.""" + + mem_swappiness: int | None + """Container swap tendency, 0–100 (Docker `mem_swappiness`).""" + class Service: """A Docker Compose service in a `Composition`. diff --git a/misc/python/materialize/mzcompose/services/clusterd.py b/misc/python/materialize/mzcompose/services/clusterd.py index e07ca490a5355..f05a465a46cf4 100644 --- a/misc/python/materialize/mzcompose/services/clusterd.py +++ b/misc/python/materialize/mzcompose/services/clusterd.py @@ -24,6 +24,8 @@ def __init__( environment_id: str | None = None, environment_extra: list[str] = [], memory: str | None = None, + memory_swap: str | None = None, + mem_swappiness: int | None = None, cpu: str | None = None, options: list[str] = [], restart: str = "no", @@ -93,6 +95,17 @@ def __init__( limits["cpus"] = cpu config["deploy"] = {"resources": {"limits": limits}} + # Swap controls aren't part of compose's `deploy.resources` schema; they + # live as top-level compose v2 service keys (`memswap_limit`, + # `mem_swappiness`). Setting `memswap_limit > mem_limit` enables the + # container to use host swap when RAM pressure builds, which lets the + # kernel page out anonymous memory rather than OOM-killing. Useful for + # benchmarking "OS swap" as a baseline vs application-managed spill. + if memory_swap is not None: + config["memswap_limit"] = memory_swap + if mem_swappiness is not None: + config["mem_swappiness"] = mem_swappiness + config.update( { "command": options, diff --git a/misc/python/materialize/mzcompose/services/materialized.py b/misc/python/materialize/mzcompose/services/materialized.py index 0be2b747811b6..9549e0d2fcb29 100644 --- a/misc/python/materialize/mzcompose/services/materialized.py +++ b/misc/python/materialize/mzcompose/services/materialized.py @@ -71,6 +71,8 @@ def __init__( volumes_extra: list[str] = [], depends_on: list[str] = [], memory: str | None = None, + memory_swap: str | None = None, + mem_swappiness: int | None = None, cpu: str | None = None, options: list[str] = [], persist_blob_url: str | None = None, @@ -332,6 +334,15 @@ def __init__( limits["cpus"] = cpu config["deploy"] = {"resources": {"limits": limits}} + # Swap controls live as top-level compose v2 service keys, not under + # `deploy.resources`. `memswap_limit > mem_limit` lets the container use + # host swap so the kernel can page out anonymous memory rather than OOM. + # `mem_swappiness=100` biases the kernel toward swapping aggressively. + if memory_swap is not None: + config["memswap_limit"] = memory_swap + if mem_swappiness is not None: + config["mem_swappiness"] = mem_swappiness + if sanity_restart: # Workaround for https://github.com/docker/compose/issues/11133 config["labels"] = {"sanity_restart": True} diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 60c36ac578c2e..acce344646167 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1607,6 +1607,16 @@ def __init__( self.flags_with_values["enable_upsert_v2"] = BOOLEAN_FLAG_VALUES self.flags_with_values["enable_coalesce_case_transform"] = BOOLEAN_FLAG_VALUES self.flags_with_values["enable_compute_sync_mv_sink"] = BOOLEAN_FLAG_VALUES + self.flags_with_values["enable_column_paged_batcher"] = BOOLEAN_FLAG_VALUES + self.flags_with_values["enable_column_paged_batcher_spill"] = ( + BOOLEAN_FLAG_VALUES + ) + self.flags_with_values["column_paged_batcher_budget_fraction"] = [ + "0.0", + "0.01", + "0.05", + "0.25", + ] # If you are adding a new config flag in Materialize, consider using it # here instead of just marking it as uninteresting to silence the diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 70235d9ddf2e5..5753b2cb8929e 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -23,6 +23,62 @@ pub const ENABLE_HALF_JOIN2: Config = Config::new( "Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.", ); +/// Use the column-paged merge batcher code path at arrange sites. When +/// `true`, arrange operators use `Col2ValPagedBatcher` (in +/// `mz_timely_util::columnar`) and `RowRowColPagedBuilder` (in +/// `mz_row_spine`) — the columnar-native batcher that the pager can +/// spill (gated by [`ENABLE_COLUMN_PAGED_BATCHER_SPILL`]). When `false` +/// (the default), the same arrange sites use the legacy +/// `Col2ValBatcher` / `RowRowBuilder` (columnation-merger) path that +/// shipped before #36627. Read at operator construction time; flips +/// take effect on dataflows created after the change. +/// +/// Disabled by default while the new path is stabilizing. +/// `DifferentialJoinHydration*` feature-benchmark scenarios opt in +/// explicitly so the spill path is measured. +pub const ENABLE_COLUMN_PAGED_BATCHER: Config = Config::new( + "enable_column_paged_batcher", + false, + "Use the columnar-native paged merge batcher at arrange sites. When `false` (default), \ + arranges fall back to the legacy columnation `Col2ValBatcher` / `RowRowBuilder` path.", +); + +/// Allow the column-paged batcher's pager to actually evict chunks +/// under memory pressure. Only meaningful when +/// [`ENABLE_COLUMN_PAGED_BATCHER`] is `true`; with the spill flag off +/// the pager keeps every chunk resident regardless of budget. +/// +/// Off by default, even when the batcher path itself is on, so the +/// no-pressure case stays a pure resident operation. Tune the budget / +/// backend via [`COLUMN_PAGED_BATCHER_BUDGET_FRACTION`]. +pub const ENABLE_COLUMN_PAGED_BATCHER_SPILL: Config = Config::new( + "enable_column_paged_batcher_spill", + false, + "Allow the column-paged batcher's pager to evict chunks under memory pressure. Only \ + meaningful when `enable_column_paged_batcher = true`.", +); + +/// Total resident-byte budget the column-paged batcher's tiered policy +/// (`mz_timely_util::column_pager::policy::TieredPolicy`) is allowed to +/// hold across all workers in this process, expressed as a fraction of +/// the replica's announced memory limit. A single +/// process-wide pool tracks all resident chunks; allocations beyond the +/// pool spill to the configured backend. +/// +/// `0.05` (5%) is a reasonable starting point: large enough that the +/// per-call ColumnBuilder ship-threshold (~2 MiB) fits multiple chunks +/// per worker, small enough that the merge-batcher's transient state +/// doesn't crowd out the spine. Set lower to spill more aggressively +/// under pressure. The computed budget is floored at 128 MiB so the +/// no-pressure case doesn't page per chunk. Ignored when +/// `enable_column_paged_batcher_spill` is `false`. +pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config = Config::new( + "column_paged_batcher_budget_fraction", + 0.05, + "Fraction of replica memory the column-paged batcher's tiered policy may hold resident \ + before spilling to the backend. Total budget = max(mem_limit * fraction, 128 MiB).", +); + /// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`. pub const ENABLE_MZ_JOIN_CORE: Config = Config::new( "enable_mz_join_core", @@ -424,4 +480,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL) .add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION) .add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS) + .add(&ENABLE_COLUMN_PAGED_BATCHER) + .add(&ENABLE_COLUMN_PAGED_BATCHER_SPILL) + .add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION) } diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index e272989580cec..c5aa2da03727f 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -36,7 +36,7 @@ use mz_compute_types::plan::render_plan::RenderPlan; use mz_dyncfg::ConfigSet; use mz_expr::row::RowCollection; use mz_expr::{RowComparator, SafeMfpPlan}; -use mz_ore::cast::CastFrom; +use mz_ore::cast::{CastFrom, CastLossy}; use mz_ore::collections::CollectionExt; use mz_ore::metrics::{MetricsRegistry, UIntGauge}; use mz_ore::now::EpochMillis; @@ -308,6 +308,48 @@ impl ComputeState { std::sync::atomic::Ordering::Relaxed, ); + // Apply column-paged-batcher configuration. Routes through + // `apply_tiered_config`, which reuses a process-wide `TieredPolicy` + // singleton — operator-driven tunes mutate the existing atomics + // rather than installing a fresh policy with a fresh budget atomic + // that would orphan in-flight resident tickets. + // + // Backend selection mirrors the lower-level `mz_ore::pager` + // already configured above: file when a scratch directory is + // available, swap otherwise. + { + use mz_ore::pager::Backend; + use mz_timely_util::column_pager::apply_tiered_config; + + let enabled = ENABLE_COLUMN_PAGED_BATCHER_SPILL.get(config); + + // Budget derivation: fraction × announced memory limit, with a + // 128 MiB floor so the no-pressure case doesn't page per chunk. + // Falls back to a 4 GiB assumption if no limit was announced + // (e.g. dev environments). + const MIB: usize = 1024 * 1024; + const DEFAULT_MEM_LIMIT: usize = 4 * 1024 * MIB; + let mem_limit = crate::memory_limiter::get_memory_limit().unwrap_or(DEFAULT_MEM_LIMIT); + let fraction = COLUMN_PAGED_BATCHER_BUDGET_FRACTION.get(config).max(0.0); + let total = usize::cast_lossy(f64::cast_lossy(mem_limit) * fraction).max(128 * MIB); + + let backend = if self.context.scratch_directory.is_some() { + Backend::File + } else { + Backend::Swap + }; + + debug!( + enabled, + ?backend, + fraction, + mem_limit, + budget_bytes = total, + "column-paged batcher: applying tiered config", + ); + apply_tiered_config(enabled, total, backend, None); + } + // Remember the maintenance interval locally to avoid reading it from the config set on // every server iteration. self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config); diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 2e8094576c133..8759989f7a5f8 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1487,7 +1487,7 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { #[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have. /// A timestamp type that can be used for operations within MZ's dataflow layer. -pub trait RenderTimestamp: MzTimestamp + Refines { +pub trait RenderTimestamp: MzTimestamp + Default + Refines { /// The system timestamp component of the timestamp. /// /// This is useful for manipulating the system time, as when delaying diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 2c0faf9e4ef49..fa8d0e858c9e6 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -20,8 +20,8 @@ use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::{AsCollection, Data, VecCollection}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::dyncfgs::{ - ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_COMPUTE_TEMPORAL_BUCKETING, - TEMPORAL_BUCKETING_SUMMARY, + ENABLE_COLUMN_PAGED_BATCHER, ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, + ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY, }; use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections}; use mz_dyncfg::ConfigSet; @@ -31,7 +31,7 @@ use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow}; use mz_storage_types::controller::CollectionMetadata; use mz_timely_util::columnar::builder::ColumnBuilder; -use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; +use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange}; use timely::ContainerBuilder; use timely::container::{CapacityContainerBuilder, PushInto}; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; @@ -49,7 +49,7 @@ use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp}; use crate::typedefs::{ ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine, }; -use mz_row_spine::{DatumSeq, RowRowBuilder}; +use mz_row_spine::{DatumSeq, RowRowBuilder, RowRowColPagedBuilder}; /// Dataflow-local collections and arrangements. /// @@ -1073,8 +1073,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } else { oks }; - let (oks, errs_keyed, passthrough) = - Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); + let use_paged_path = ENABLE_COLUMN_PAGED_BATCHER.get(config_set); + let (oks, errs_keyed, passthrough) = Self::arrange_collection( + &name, + oks, + key.clone(), + thinning.clone(), + use_paged_path, + ); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); self.collection = Some((passthrough, errs)); let errs = @@ -1103,6 +1109,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { oks: VecCollection<'scope, T, Row, Diff>, key: Vec, thinning: Vec, + use_paged_path: bool, ) -> ( Arranged<'scope, RowRowAgent>, VecCollection<'scope, T, DataflowErrorSer, Diff>, @@ -1154,18 +1161,23 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } }); - let oks = ok_stream - .mz_arrange_core::< + let exchange = + ExchangeCore::, _>::new_core(columnar_exchange::); + let oks = if use_paged_path { + ok_stream.mz_arrange_core::< + _, + Col2ValPagedBatcher<_, _, _, _>, + RowRowColPagedBuilder<_, _>, + RowRowSpine<_, _>, + >(exchange, name) + } else { + ok_stream.mz_arrange_core::< _, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>, - >( - ExchangeCore::, _>::new_core( - columnar_exchange::, - ), - name - ); + >(exchange, name) + }; ( oks, err_stream.as_collection(), diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index bbd66652da5e4..c79360f1ae632 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -18,7 +18,9 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::Arranged; use differential_dataflow::trace::TraceReader; use differential_dataflow::{AsCollection, Data, VecCollection}; -use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING}; +use mz_compute_types::dyncfgs::{ + ENABLE_COLUMN_PAGED_BATCHER, ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING, +}; use mz_compute_types::plan::join::JoinClosure; use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan}; use mz_dyncfg::ConfigSet; @@ -26,7 +28,7 @@ use mz_expr::Eval; use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow}; use mz_timely_util::columnar::builder::ColumnBuilder; -use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; +use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange}; use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::dataflow::Scope; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; @@ -38,7 +40,7 @@ use crate::render::context::{ArrangementFlavor, CollectionBundle, Context}; use crate::render::errors::DataflowErrorSer; use crate::render::join::mz_join_core::mz_join_core; use crate::typedefs::{RowRowAgent, RowRowEnter}; -use mz_row_spine::{RowRowBuilder, RowRowSpine}; +use mz_row_spine::{RowRowBuilder, RowRowColPagedBuilder, RowRowSpine}; /// Available linear join implementations. /// @@ -381,18 +383,24 @@ where errors.push(errs.as_collection()); - let arranged = keyed - .mz_arrange_core::< + let exchange = ExchangeCore::, _>::new_core( + columnar_exchange::, + ); + let arranged = if ENABLE_COLUMN_PAGED_BATCHER.get(&self.config_set) { + keyed.mz_arrange_core::< + _, + Col2ValPagedBatcher<_, _, _, _>, + RowRowColPagedBuilder<_, _>, + RowRowSpine<_, _>, + >(exchange, "JoinStage") + } else { + keyed.mz_arrange_core::< _, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>, - >( - ExchangeCore::, _>::new_core( - columnar_exchange::, - ), - "JoinStage" - ); + >(exchange, "JoinStage") + }; joined = JoinedFlavor::Local(arranged); } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 535595e5b7985..b263db122cd3d 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -101,6 +101,11 @@ pub async fn serve( assert_eq!(storage_log_readers.len(), workers_per_process); storage_log_readers.into_iter().map(Some).collect() }; + mz_timely_util::column_pager::metrics::register( + metrics_registry, + mz_timely_util::column_pager::tiered_policy(), + ); + let config = Config { persist_clients, txns_ctx, diff --git a/src/row-spine/src/lib.rs b/src/row-spine/src/lib.rs index 725b7c4dae6b5..98cfec1788c1d 100644 --- a/src/row-spine/src/lib.rs +++ b/src/row-spine/src/lib.rs @@ -22,8 +22,8 @@ pub use self::container::DatumContainer; pub use self::container::DatumSeq; pub use self::offset_opt::OffsetOptimized; pub use self::spines::{ - RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher, - RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder, ValRowSpine, + RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowColPagedBuilder, RowRowSpine, + RowSpine, RowValBatcher, RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder, ValRowSpine, }; use differential_dataflow::trace::implementations::OffsetList; @@ -40,6 +40,7 @@ mod spines { use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; use mz_repr::Row; + use mz_timely_util::columnar::Column; use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack}; use crate::{DatumContainer, OffsetOptimized}; @@ -59,6 +60,13 @@ mod spines { OrdValBuilder, ColumnationStack<((Row, Row), T, R)>>, >; + /// `RowRowBuilder` variant that consumes [`Column`] chunks. Pairs with + /// [`Col2ValPagedBatcher`] for the spillable arrange path. + /// + /// [`Col2ValPagedBatcher`]: mz_timely_util::columnar::Col2ValPagedBatcher + pub type RowRowColPagedBuilder = + RcBuilder, Column<((Row, Row), T, R)>>>; + pub type RowValSpine = Spine>>>; pub type RowValBatcher = KeyValBatcher; pub type RowValBuilder = RcBuilder< @@ -148,7 +156,7 @@ mod container { use differential_dataflow::trace::implementations::BatchContainer; use timely::container::PushInto; - use mz_repr::{Datum, Row, RowPacker, read_datum}; + use mz_repr::{Datum, Row, RowPacker, RowRef, read_datum}; use super::bytes_container::BytesContainer; @@ -248,6 +256,12 @@ mod container { } } + impl PushInto<&RowRef> for DatumContainer { + fn push_into(&mut self, item: &RowRef) { + self.bytes.push_into(item.data()) + } + } + #[derive(Debug)] pub struct DatumSeq<'a> { bytes: &'a [u8], @@ -297,6 +311,12 @@ mod container { self.bytes.eq(other.data()) } } + impl<'a> PartialEq<&RowRef> for DatumSeq<'a> { + #[inline] + fn eq(&self, other: &&RowRef) -> bool { + self.bytes.eq(other.data()) + } + } impl<'a> Eq for DatumSeq<'a> {} impl<'a, 'b> PartialOrd> for DatumSeq<'b> { #[inline] diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index 6000b887ad64d..1a1ebb5703663 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -21,6 +21,10 @@ harness = false name = "columnar_merger" harness = false +[[bench]] +name = "columnar_merge_batcher" +harness = false + [[bench]] name = "column_pager" harness = false @@ -51,6 +55,7 @@ allocation-counter = { workspace = true, optional = true } [dev-dependencies] criterion.workspace = true +itertools.workspace = true proptest.workspace = true rand.workspace = true tempfile.workspace = true diff --git a/src/timely-util/benches/columnar_merge_batcher.rs b/src/timely-util/benches/columnar_merge_batcher.rs new file mode 100644 index 0000000000000..cbfd80d4f0140 --- /dev/null +++ b/src/timely-util/benches/columnar_merge_batcher.rs @@ -0,0 +1,451 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Microbenchmark comparing the legacy column-backed `ColumnMerger` against +//! the new pageable `ColumnMergeBatcher` driver on the merge-batcher's hot +//! path. +//! +//! Each iteration drives a single 2-input merge — either via `Merger::merge` +//! (legacy) or via `merge_chains` (new, driven through a `ColumnPager`). +//! +//! Three pager configurations sweep the cost of the new path: +//! +//! - **`paged-disabled`** — `ColumnPager::disabled`; chunks stay `Resident` +//! throughout. Compared to `column`, this isolates the pager-wrapping +//! overhead (extra `Resident(_, ticket)` enum dispatch and the +//! `FetchIter`-shaped driver). +//! - **`paged-swap`** — every chunk routes through the Swap backend +//! uncompressed. Measures the cost of byte-level serialization + buffered +//! allocation moves with no codec work. +//! - **`paged-lz4`** — same as `paged-swap` but with lz4 frame compression. +//! Adds codec CPU cost to the swap baseline. +//! +//! Two axes match the sister bench `columnar_merger.rs`: +//! regime × size. See that file for axis rationale. + +use std::collections::VecDeque; +use std::mem::size_of; +use std::sync::Arc; + +use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use differential_dataflow::trace::implementations::merge_batcher::Merger; +use itertools::Itertools; +use mz_ore::cast::{CastFrom, CastLossy}; +use mz_ore::pager::Backend; +use mz_timely_util::column_pager::{ + Codec, ColumnPager, PageDecision, PageEvent, PageHint, PagedColumn, PagingPolicy, +}; +use mz_timely_util::columnar::Column; +use mz_timely_util::columnar::batcher::ColumnMerger; +use mz_timely_util::columnar::merge_batcher::{FetchIter, merge_chains}; +use rand::{Rng, SeedableRng, rngs::StdRng}; +use timely::container::PushInto; + +type Data = (u64, u64); +type Time = u64; +type Diff = i64; +type Tuple = (Data, Time, Diff); + +/// Per-side heap footprints to sweep across. Same tiers as +/// `columnar_merger.rs`. +const SIZES: &[(&str, usize)] = &[ + ("32K", 32 * 1024), + ("512K", 512 * 1024), + ("8M", 8 * 1024 * 1024), + ("128M", 128 * 1024 * 1024), +]; + +/// Always-page policy parameterized over backend + codec. Used to force the +/// merger through the byte-shaped path even when chunks are tiny. +struct ForcePage { + backend: Backend, + codec: Option, +} +impl PagingPolicy for ForcePage { + fn decide(&self, _hint: PageHint) -> PageDecision { + PageDecision::Page { + backend: self.backend, + codec: self.codec, + } + } + fn record(&self, _event: PageEvent) {} +} + +fn pager_disabled() -> ColumnPager { + ColumnPager::disabled() +} + +fn pager_force(backend: Backend, codec: Option) -> ColumnPager { + ColumnPager::new(Arc::new(ForcePage { backend, codec })) +} + +/// Wrap a single resident `Column` as a one-entry chain. +fn one_chain(mut c: Column, pager: &ColumnPager) -> VecDeque> { + let paged = pager.page(&mut c); + VecDeque::from([paged]) +} + +fn make(seed: u64, n: usize, key_range: u64, time_range: u64) -> Vec { + let mut rng = StdRng::seed_from_u64(seed); + let mut raw: Vec = (0..n) + .map(|_| { + ( + ( + rng.random_range(0..key_range), + rng.random_range(0..key_range), + ), + rng.random_range(0..time_range), + rng.random_range(-3i64..=3), + ) + }) + .collect(); + raw.sort(); + let mut out: Vec = Vec::new(); + for (d, t, r) in raw { + if let Some(last) = out.last_mut() { + if last.0 == d && last.1 == t { + last.2 += r; + continue; + } + } + out.push((d, t, r)); + } + out.retain(|x| x.2 != 0); + out +} + +fn build_column(data: &[Tuple]) -> Column { + let mut col: Column = Default::default(); + for &tup in data { + col.push_into(tup); + } + col +} + +fn configs(n: usize) -> [(&'static str, Vec, Vec); 3] { + let n_u64 = u64::cast_from(n); + [ + ("mixed", make(1, n, 2 * n_u64, 4), make(2, n, 2 * n_u64, 4)), + ( + "collisions", + make(3, n, u64::cast_from(n / 4), 2), + make(4, n, u64::cast_from(n / 4), 2), + ), + ( + "disjoint", + make(5, n, n_u64, 4), + make(6, n, n_u64, 4) + .into_iter() + .map(|((k1, k2), t, r)| ((k1 + n_u64, k2 + n_u64), t, r)) + .collect(), + ), + ] +} + +/// One row of the throughput summary — bytes-per-iter, plus the four variant +/// labels we'll look up in `target/criterion////...`. +const VARIANTS: &[&str] = &["column", "paged-disabled", "paged-swap", "paged-lz4"]; + +fn bench_merge_batcher(c: &mut Criterion) { + let mut group = c.benchmark_group("merge_batcher_two_sorted"); + + let bytes_per_record = size_of::(); + let mut summary: Vec<(String, u64)> = Vec::new(); + + for (size_label, bytes_per_side) in SIZES { + let n = bytes_per_side / bytes_per_record; + let cfgs = configs(n); + + for (regime, a, b) in &cfgs { + let bytes = u64::try_from((a.len() + b.len()) * bytes_per_record).unwrap(); + group.throughput(Throughput::Bytes(bytes)); + + let id = format!("{regime}/{size_label}"); + summary.push((id.clone(), bytes)); + + // Variant 1: legacy `ColumnMerger::merge`. Baseline. + group.bench_with_input(BenchmarkId::new("column", &id), &(), |bencher, _| { + bencher.iter_batched( + || (build_column(a), build_column(b)), + |(ca, cb)| { + let mut merger: ColumnMerger = Default::default(); + let mut output = Vec::new(); + let mut stash = Vec::new(); + merger.merge(vec![ca], vec![cb], &mut output, &mut stash); + output + }, + BatchSize::LargeInput, + ); + }); + + // Variant 2: new path, disabled pager. Isolates wrapping cost. + group.bench_with_input( + BenchmarkId::new("paged-disabled", &id), + &(), + |bencher, _| { + let pager = pager_disabled(); + bencher.iter_batched( + || { + ( + one_chain(build_column(a), &pager), + one_chain(build_column(b), &pager), + ) + }, + |(q1, q2)| { + let mut output: Vec> = Vec::new(); + let mut stash: Vec> = Vec::new(); + merge_chains( + FetchIter::new(q1, &pager), + FetchIter::new(q2, &pager), + |p| output.push(p), + &mut stash, + ); + output + }, + BatchSize::LargeInput, + ); + }, + ); + + // Variant 3: force-page to Swap, no codec. + group.bench_with_input(BenchmarkId::new("paged-swap", &id), &(), |bencher, _| { + let pager = pager_force(Backend::Swap, None); + bencher.iter_batched( + || { + ( + one_chain(build_column(a), &pager), + one_chain(build_column(b), &pager), + ) + }, + |(q1, q2)| { + let mut output: Vec> = Vec::new(); + let mut stash: Vec> = Vec::new(); + merge_chains( + FetchIter::new(q1, &pager), + FetchIter::new(q2, &pager), + |p| output.push(p), + &mut stash, + ); + output + }, + BatchSize::LargeInput, + ); + }); + + // Variant 4: force-page to Swap with lz4. Codec cost. + group.bench_with_input(BenchmarkId::new("paged-lz4", &id), &(), |bencher, _| { + let pager = pager_force(Backend::Swap, Some(Codec::Lz4)); + bencher.iter_batched( + || { + ( + one_chain(build_column(a), &pager), + one_chain(build_column(b), &pager), + ) + }, + |(q1, q2)| { + let mut output: Vec> = Vec::new(); + let mut stash: Vec> = Vec::new(); + merge_chains( + FetchIter::new(q1, &pager), + FetchIter::new(q2, &pager), + |p| output.push(p), + &mut stash, + ); + output + }, + BatchSize::LargeInput, + ); + }); + } + } + + group.finish(); + + print_throughput_table( + "Throughput summary — primitive ((u64, u64), u64, i64):", + "merge_batcher_two_sorted", + &summary, + ); +} + +// =========================================================================== +// Throughput summary helpers +// +// Same shape as `columnar_merger.rs` but widened for our four variants. The +// helpers are duplicated rather than shared because bench files don't have +// an easy way to import each other. +// =========================================================================== + +fn criterion_dir() -> std::path::PathBuf { + let mut cur = std::env::current_dir().unwrap_or_default(); + loop { + let candidate = cur.join("target").join("criterion"); + if candidate.is_dir() { + return candidate; + } + if !cur.pop() { + return std::path::PathBuf::from("target/criterion"); + } + } +} + +fn read_criterion_median_ns(group: &str, bench_id: &str) -> Option { + let path = criterion_dir() + .join(group) + .join(bench_id) + .join("new") + .join("estimates.json"); + let json = std::fs::read_to_string(&path).ok()?; + let median_idx = json.find("\"median\"")?; + let after = &json[median_idx..]; + let pe_marker = "\"point_estimate\""; + let pe_idx = after.find(pe_marker)?; + let rest = after[pe_idx + pe_marker.len()..].trim_start(); + let rest = rest.strip_prefix(':')?.trim_start(); + let end = rest.find(|c: char| c == ',' || c == '}')?; + rest[..end].trim().parse::().ok() +} + +fn fmt_throughput(bytes: u64, ns: f64) -> String { + if !ns.is_finite() || ns <= 0.0 { + return "—".to_string(); + } + let bytes_per_sec = f64::cast_lossy(bytes) * 1e9 / ns; + let gibs = bytes_per_sec / f64::cast_lossy(1u64 << 30); + if gibs >= 1.0 { + format!("{gibs:.2} GiB/s") + } else { + let mibs = bytes_per_sec / f64::cast_lossy(1u64 << 20); + format!("{mibs:.0} MiB/s") + } +} + +fn fmt_time(ns: f64) -> String { + if !ns.is_finite() { + "—".to_string() + } else if ns < 1e3 { + format!("{:.0} ns", ns) + } else if ns < 1e6 { + format!("{:.1} µs", ns / 1e3) + } else if ns < 1e9 { + format!("{:.2} ms", ns / 1e6) + } else { + format!("{:.2} s", ns / 1e9) + } +} + +fn fmt_ratio(num_ns: f64, den_ns: f64) -> String { + if !(num_ns.is_finite() && den_ns.is_finite()) || den_ns <= 0.0 { + return "—".to_string(); + } + let r = num_ns / den_ns; + if (r - 1.0).abs() < 0.01 { + "≈ 1.00×".to_string() + } else if r < 1.0 { + format!("{:.2}× faster", 1.0 / r) + } else { + format!("{:.2}× slower", r) + } +} + +fn print_throughput_table(title: &str, group: &str, rows: &[(String, u64)]) { + // Columns: Config | column | paged-disabled | paged-swap | paged-lz4 | + // disabled vs column. + let mut cells: Vec> = Vec::with_capacity(rows.len()); + for (label, bytes) in rows { + let ns: Vec = VARIANTS + .iter() + .map(|v| { + let bench_id = format!("{}/{}", v, label.replace('/', "_")); + read_criterion_median_ns(group, &bench_id).unwrap_or(f64::NAN) + }) + .collect(); + let column_ns = ns[0]; + let disabled_ns = ns[1]; + + let mut row = vec![label.clone()]; + for (variant_ns, _variant) in ns.iter().zip_eq(VARIANTS.iter()) { + row.push(format!( + "{} ({})", + fmt_throughput(*bytes, *variant_ns), + fmt_time(*variant_ns) + )); + } + row.push(fmt_ratio(disabled_ns, column_ns)); + cells.push(row); + } + + let headers = [ + "Config", + "column", + "paged-disabled", + "paged-swap", + "paged-lz4", + "disabled vs column", + ]; + let max_chars = |i: usize| -> usize { + cells + .iter() + .map(|c| c[i].chars().count()) + .max() + .unwrap_or(0) + .max(headers[i].chars().count()) + }; + let widths: Vec = (0..headers.len()).map(max_chars).collect(); + + let bar = |l: char, m: char, r: char| -> String { + let mut s = String::new(); + s.push(l); + for (i, &w) in widths.iter().enumerate() { + for _ in 0..w + 2 { + s.push('─'); + } + s.push(if i + 1 < widths.len() { m } else { r }); + } + s + }; + + println!(); + println!("{title}"); + println!(); + println!("{}", bar('┌', '┬', '┐')); + let header_row = headers + .iter() + .zip_eq(widths.iter()) + .map(|(h, w)| format!(" {:^w$} ", h, w = w)) + .collect::>() + .join("│"); + println!("│{header_row}│"); + println!("{}", bar('├', '┼', '┤')); + for (i, row) in cells.iter().enumerate() { + if i > 0 { + println!("{}", bar('├', '┼', '┤')); + } + let line = row + .iter() + .zip_eq(widths.iter()) + .enumerate() + .map(|(idx, (cell, w))| { + if idx == 0 { + format!(" {:w$} ", cell, w = w) + } else { + format!(" {:>() + .join("│"); + println!("│{line}│"); + } + println!("{}", bar('└', '┴', '┘')); +} + +criterion_group!(benches, bench_merge_batcher); +criterion_main!(benches); diff --git a/src/timely-util/examples/column_paged_spill.rs b/src/timely-util/examples/column_paged_spill.rs new file mode 100644 index 0000000000000..298da11503c78 --- /dev/null +++ b/src/timely-util/examples/column_paged_spill.rs @@ -0,0 +1,330 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! End-to-end spill demo for the column-paged merge batcher. +//! +//! Drives a real timely dataflow (`arrange_core` over multiple workers) +//! against a cancellation workload: each `(k, v, t, +d)` is followed by +//! `(k, v, t, -d)` at the same logical time, so the *spine* stays empty +//! and all memory pressure lives in the merge-batcher's transient state. +//! That's the regime where the paged batcher should obviously win over +//! the no-spill baseline. +//! +//! Cancellation pattern, RSS sampler thread, and key-scrambling +//! (`mix()` so post-sort columnar bytes look incompressible) all +//! mirror `differential-dataflow/examples/columnar_spill.rs`. The +//! pager indirection swaps DD's `Spill`/`SpillPolicy`/`Fetch`/`Threshold` +//! plumbing for our existing `ColumnPager` + `TieredPolicy`. +//! +//! ```text +//! cargo run --release --example column_paged_spill -- --help +//! cargo run --release --example column_paged_spill -- --mode both --workers 4 \ +//! --times 64 --keys 24000000 --per-worker 33554432 --shared 536870912 \ +//! --sample-secs 30 +//! ``` + +use std::rc::Rc; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, Instant}; + +use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::trace::implementations::Vector; +use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder}; +use differential_dataflow::trace::implementations::spine_fueled::Spine; +use differential_dataflow::trace::rc_blanket_impls::RcBuilder; +use mz_ore::cast::{CastFrom, CastLossy, ReinterpretCast}; +use mz_ore::pager::{self, Backend}; +use mz_timely_util::column_pager::policy::TieredPolicy; +use mz_timely_util::column_pager::{ColumnPager, set_global_pager}; +use mz_timely_util::columnar::Col2ValPagedBatcher; +use mz_timely_util::columnar::Column; +use mz_timely_util::columnar::builder::ColumnBuilder; +use timely::dataflow::InputHandle; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::Input; +use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe}; + +type Update = ((u64, u64), u64, i64); + +type MyBatcher = Col2ValPagedBatcher; +type MyBuilder = RcBuilder, Column>>; +type MySpine = Spine>>>; + +#[derive(Debug, Clone, Copy, PartialEq)] +enum Mode { + Both, + Spill, + Baseline, +} + +struct Config { + times: u64, + keys_per_time: u64, + per_worker_bytes: usize, + shared_bytes: usize, + workers: usize, + sample_secs: u64, + mode: Mode, +} + +fn install_pager(spill: bool, budget: usize) { + if spill { + // Each process keeps a single `mz-pager-{pid}-{nonce}` subdir under + // this root; reused across `set_global_pager` reinstalls. + pager::set_scratch_dir(std::env::temp_dir()); + let policy = Arc::new(TieredPolicy::new(budget, Backend::File, None)); + set_global_pager(ColumnPager::new(policy)); + } else { + set_global_pager(ColumnPager::disabled()); + } +} + +fn run_dataflow(cfg: &Config, label: &str) -> Duration { + let stop = Arc::new(AtomicBool::new(false)); + + // RSS sampler thread. `ps -o rss=` is portable across Linux + macOS + // and doesn't add a dep just to read /proc/self/status. + let sampler = if cfg.sample_secs > 0 { + let stop = Arc::clone(&stop); + let label_owned = label.to_string(); + let interval = Duration::from_secs(cfg.sample_secs); + let start = Instant::now(); + Some(std::thread::spawn(move || { + while !stop.load(Ordering::Relaxed) { + if let Some(rss) = rss_kb() { + println!( + " [{}] +{:>5.0}s RSS {:>9} kB", + label_owned, + start.elapsed().as_secs_f64(), + rss, + ); + } + std::thread::sleep(interval); + } + })) + } else { + None + }; + + let times = cfg.times; + let keys_per_time = cfg.keys_per_time; + let timer = Instant::now(); + + timely::execute(timely::Config::process(cfg.workers), move |worker| { + let index = worker.index(); + let peers = worker.peers(); + + let mut input = >>::new_with_builder(); + let probe: ProbeHandle = ProbeHandle::new(); + + worker.dataflow::(|scope| { + let stream = scope.input_from(&mut input); + // Demo wires the raw DD operator; production paths go through + // `MzArrange::mz_arrange_core` in `mz-compute`. + #[allow(clippy::disallowed_methods)] + let arranged = arrange_core::<_, MyBatcher, MyBuilder, MySpine>( + stream, + Pipeline, + "ColumnPagedSpillArrange", + ); + arranged.stream.probe_with(&probe); + }); + + // Push positives then negatives at the same logical time so they + // cancel inside the merger rather than producing two giant sealed + // batches that cancel only at the spine. `mix` scrambles the keys + // so the post-sort columnar bytes look incompressible — without + // this macOS' page compressor crushes the sequential-u64 pattern + // and skews the comparison toward baseline. + const STEP_EVERY: usize = 1 << 16; + let mut sent_since_step = 0usize; + for sign in [1i64, -1] { + for t in 0..times { + let mut k = u64::cast_from(index); + while k < keys_per_time { + let kh = mix(k); + let d = (i64::reinterpret_cast(kh) >> 1) | 1; + input.send(((kh, kh & 0x3), t, sign * d)); + k += u64::cast_from(peers); + sent_since_step += 1; + if sent_since_step >= STEP_EVERY { + worker.step(); + sent_since_step = 0; + } + } + } + } + input.advance_to(1); + input.flush(); + + while probe.less_than(input.time()) { + worker.step(); + } + }) + .expect("timely::execute failed"); + + let elapsed = timer.elapsed(); + stop.store(true, Ordering::Relaxed); + if let Some(s) = sampler { + let _ = s.join(); + } + elapsed +} + +/// Reversible bijection that destroys spatial locality of sequential keys. +/// `xorshift*` mixing — output is determined by `k` so cancellation still +/// pairs the same `(k, v, t, +d)` with `(k, v, t, -d)`. +fn mix(k: u64) -> u64 { + let x = k.wrapping_mul(0x9E37_79B9_7F4A_7C15); + x ^ (x >> 32) +} + +fn rss_kb() -> Option { + let pid = std::process::id(); + let output = std::process::Command::new("ps") + .args(["-o", "rss=", "-p", &pid.to_string()]) + .output() + .ok()?; + let s = std::str::from_utf8(&output.stdout).ok()?; + s.trim().parse::().ok() +} + +fn main() { + let cfg = match parse_args() { + Some(cfg) => cfg, + None => return, + }; + + let total_records = usize::cast_from(cfg.times * cfg.keys_per_time) * 2; + let bytes_per_record = std::mem::size_of::(); + let raw_gb = f64::cast_lossy(total_records * bytes_per_record) / f64::cast_lossy(1u64 << 30); + println!( + "config: times={} keys={} workers={} per_worker={} shared={} mode={:?} sample_secs={}", + cfg.times, + cfg.keys_per_time, + cfg.workers, + cfg.per_worker_bytes, + cfg.shared_bytes, + cfg.mode, + cfg.sample_secs, + ); + println!( + "workload: {} records ({:.2} GB raw, {} bytes/record) — cancellation, spine stays empty", + total_records, raw_gb, bytes_per_record, + ); + + if cfg.mode != Mode::Baseline { + install_pager(true, cfg.shared_bytes); + let elapsed = run_dataflow(&cfg, "spill"); + println!( + "spill: {:.2}s | {:.2} M records/s | {:.2} GB/s", + elapsed.as_secs_f64(), + f64::cast_lossy(total_records) / elapsed.as_secs_f64() / 1e6, + raw_gb / elapsed.as_secs_f64(), + ); + } + + if cfg.mode != Mode::Spill { + install_pager(false, 0); + let elapsed = run_dataflow(&cfg, "baseline"); + println!( + "baseline: {:.2}s | {:.2} M records/s | {:.2} GB/s", + elapsed.as_secs_f64(), + f64::cast_lossy(total_records) / elapsed.as_secs_f64() / 1e6, + raw_gb / elapsed.as_secs_f64(), + ); + } +} + +fn parse_args() -> Option { + let mut cfg = Config { + times: 8, + keys_per_time: 500_000, + per_worker_bytes: 32 * 1024 * 1024, + shared_bytes: 512 * 1024 * 1024, + workers: 1, + sample_secs: 0, + mode: Mode::Both, + }; + let mut it = std::env::args().skip(1); + while let Some(a) = it.next() { + let take = |it: &mut dyn Iterator, name: &str| -> String { + it.next().unwrap_or_else(|| { + print_usage(); + panic!("--{} requires a value", name) + }) + }; + match a.as_str() { + "-h" | "--help" => { + print_usage(); + return None; + } + "--times" => cfg.times = take(&mut it, "times").parse().expect("times: u64"), + "--keys" => { + cfg.keys_per_time = take(&mut it, "keys").parse().expect("keys: u64"); + } + "--per-worker" => { + cfg.per_worker_bytes = take(&mut it, "per-worker") + .parse() + .expect("per-worker: usize"); + } + "--shared" => { + cfg.shared_bytes = take(&mut it, "shared").parse().expect("shared: usize"); + } + "--workers" => { + cfg.workers = take(&mut it, "workers").parse().expect("workers: usize"); + } + "--sample-secs" => { + cfg.sample_secs = take(&mut it, "sample-secs") + .parse() + .expect("sample-secs: u64"); + } + "--mode" => { + cfg.mode = match take(&mut it, "mode").as_str() { + "both" => Mode::Both, + "spill" => Mode::Spill, + "baseline" => Mode::Baseline, + other => { + print_usage(); + panic!("unknown mode: {other}"); + } + }; + } + other => { + print_usage(); + panic!("unknown arg: {other}"); + } + } + } + Some(cfg) +} + +fn print_usage() { + eprintln!("Usage: column_paged_spill [OPTIONS]"); + eprintln!(); + eprintln!(" --times N distinct data timestamps (default 8)"); + eprintln!(" --keys N keys per timestamp (default 500000)"); + eprintln!(" --per-worker BYTES TieredPolicy per-worker budget (default 32 MiB)"); + eprintln!(" --shared BYTES TieredPolicy shared budget (default 512 MiB)"); + eprintln!(" --workers N timely worker threads (default 1)"); + eprintln!(" --sample-secs N print RSS every N seconds (default 0 = off)"); + eprintln!(" --mode MODE spill | baseline | both (default both)"); + eprintln!(); + eprintln!("Total records pushed = 2 * times * keys (positives + negatives that cancel)."); + eprintln!("Records partitioned across workers by `k % workers` after `mix()` scramble."); + eprintln!(); + eprintln!("Examples:"); + eprintln!(" # quick smoke — 8M records, both modes, 1 worker"); + eprintln!(" column_paged_spill"); + eprintln!(); + eprintln!(" # 100 GB workload on 4 workers, RSS every 30s, spill-only"); + eprintln!(" column_paged_spill --mode spill --workers 4 \\"); + eprintln!(" --times 64 --keys 24000000 --sample-secs 30"); +} diff --git a/src/timely-util/src/column_pager.rs b/src/timely-util/src/column_pager.rs index ed6e3e8740167..699d4b2cc71b6 100644 --- a/src/timely-util/src/column_pager.rs +++ b/src/timely-util/src/column_pager.rs @@ -32,10 +32,11 @@ #![deny(missing_docs)] +pub mod metrics; pub mod policy; use std::io::{self, Read}; -use std::sync::Arc; +use std::sync::{Arc, LazyLock, RwLock}; use columnar::Columnar; use lz4_flex::frame::{FrameDecoder, FrameEncoder}; @@ -186,6 +187,7 @@ pub struct ResidentTicket { impl Drop for ResidentTicket { fn drop(&mut self) { + metrics::observe_resident_released(self.bytes); self.policy .record(PageEvent::ResidentReleased { bytes: self.bytes }); } @@ -215,6 +217,113 @@ impl ColumnPager { Self { policy } } + /// Constructs a pager that never pages out: every [`page`] returns a + /// [`PagedColumn::Resident`] whose ticket discards release events. Useful + /// as a default when callers want a placeholder pager before injecting a + /// real policy. + /// + /// [`page`]: ColumnPager::page + pub fn disabled() -> Self { + Self::new(Arc::new(AlwaysResidentPolicy)) + } +} + +/// Policy that keeps every column resident and discards events. Backs +/// [`ColumnPager::disabled`]. +struct AlwaysResidentPolicy; + +impl PagingPolicy for AlwaysResidentPolicy { + fn decide(&self, _hint: PageHint) -> PageDecision { + PageDecision::Skip + } + fn record(&self, _event: PageEvent) {} +} + +// +// Following the pager design doc's spirit (`doc/developer/design/20260504_pager.md`): +// "the cluster runs on swap or file, not both at once; a global atomic +// encodes that operational reality directly. A per-pager design would +// either duplicate the global flag at the struct level or invite confusion +// about which configuration wins." +// +// The lower-level `mz_ore::pager` already uses a global atomic for backend +// selection. This module's policy/budget layer mirrors that shape: one +// `ColumnPager` per process, swapped atomically when the controller changes +// the configuration. Merge batchers clone the `Arc` inside on use; live +// reinstalls take effect on the next call without per-thread coordination. + +/// Process-global active pager. Defaults to [`ColumnPager::disabled`] +/// until worker init calls [`set_global_pager`]. +static GLOBAL_PAGER: LazyLock> = + LazyLock::new(|| RwLock::new(ColumnPager::disabled())); + +/// Install `pager` as the process-wide active pager. Subsequent +/// [`global_pager`] calls return a clone of this value across all threads. +/// +/// Prefer [`apply_tiered_config`] for the production path so the +/// `TieredPolicy` budget atomic stays stable across reconfigures. Direct +/// `set_global_pager` use is appropriate for tests, the disabled pager, or +/// callers that intentionally want a fresh policy. +pub fn set_global_pager(pager: ColumnPager) { + *GLOBAL_PAGER.write().expect("global pager poisoned") = pager; +} + +/// Process-wide [`policy::TieredPolicy`] singleton. +/// +/// Why a singleton: every `ResidentTicket` keeps an `Arc` +/// pointing at the policy that decided to keep the column resident. +/// Replacing the global `TieredPolicy` would orphan in-flight tickets onto +/// the previous instance — they would credit a budget atomic that the new +/// policy can no longer see, draining the new pool monotonically until it +/// locks up on Page decisions. A persistent singleton with in-place +/// [`policy::TieredPolicy::reconfigure`] sidesteps the issue: all tickets, +/// past and present, share the same atomic. +/// +/// Initialized eagerly with zero budget so [`metrics::register`] can read +/// it during compute startup, before any [`apply_tiered_config`] call. The +/// first config apply resizes the pool via `reconfigure`, which is the same +/// path operator-driven tunes take. +static TIERED_POLICY: LazyLock> = + LazyLock::new(|| Arc::new(policy::TieredPolicy::new(0, Backend::Swap, None))); + +/// Returns a reference to the process-wide [`policy::TieredPolicy`] singleton. +pub fn tiered_policy() -> &'static policy::TieredPolicy { + &TIERED_POLICY +} + +/// Apply a tiered-pager configuration. Reuses the singleton +/// [`policy::TieredPolicy`] so in-flight `ResidentTicket`s remain coherent +/// with the running budget after the operator tunes any of the inputs. +/// +/// When `enabled` is true, installs a [`ColumnPager`] backed by the +/// singleton policy. When false, installs [`ColumnPager::disabled`] — +/// in-flight tickets still credit the singleton, which is harmless: the +/// budget grows above the configured total until the next enable reconciles +/// it via `reconfigure`. +pub fn apply_tiered_config( + enabled: bool, + total_budget: usize, + backend: Backend, + codec: Option, +) { + let p: &Arc = &TIERED_POLICY; + p.reconfigure(total_budget, backend, codec); + if enabled { + #[allow(clippy::clone_on_ref_ptr)] + let dyn_policy: Arc = p.clone(); + set_global_pager(ColumnPager::new(dyn_policy)); + } else { + set_global_pager(ColumnPager::disabled()); + } +} + +/// Returns the current global pager. Cheap: clones the inner `Arc`. +pub fn global_pager() -> ColumnPager { + GLOBAL_PAGER.read().expect("global pager poisoned").clone() +} + +impl ColumnPager { /// Drains `col` into a [`PagedColumn`]. After return `col` is left as a /// fresh `Column::default()` (typed, empty), ready to be refilled by the /// caller on the next loop iteration. @@ -236,6 +345,7 @@ impl ColumnPager { let (backend, codec) = match self.policy.decide(hint) { PageDecision::Skip => { + metrics::observe_skip(len_bytes); let ticket = ResidentTicket { bytes: len_bytes, policy: Arc::clone(&self.policy), @@ -270,9 +380,11 @@ impl ColumnPager { } }; let handle = pager::pageout_with(backend, &mut [body]); + let bytes_out = handle.len_bytes(); + metrics::observe_pageout(len_bytes, bytes_out); self.policy.record(PageEvent::PagedOut { bytes_in: len_bytes, - bytes_out: handle.len_bytes(), + bytes_out, backend, codec: None, }); @@ -292,6 +404,7 @@ impl ColumnPager { // `Typed` allocation so the caller can refill it, rather than // dropping a buffer it may want to reuse. col.clear(); + metrics::observe_pageout(len_bytes, out.len()); self.policy.record(PageEvent::PagedOut { bytes_in: len_bytes, bytes_out: out.len(), @@ -327,6 +440,7 @@ impl ColumnPager { let mut body: Vec = Vec::with_capacity(handle.len()); pager::take(handle, &mut body); debug_assert_eq!(body.len() * 8, meta.len_bytes); + metrics::observe_pagein(meta.len_bytes); self.policy.record(PageEvent::PagedIn { bytes: meta.len_bytes, }); @@ -350,6 +464,7 @@ impl ColumnPager { } } debug_assert_eq!(decoded.len(), meta.len_bytes); + metrics::observe_pagein(decoded.len()); self.policy.record(PageEvent::PagedIn { bytes: decoded.len(), }); diff --git a/src/timely-util/src/column_pager/metrics.rs b/src/timely-util/src/column_pager/metrics.rs new file mode 100644 index 0000000000000..648b524e74cf7 --- /dev/null +++ b/src/timely-util/src/column_pager/metrics.rs @@ -0,0 +1,160 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Prometheus metrics for the column pager. +//! +//! One process-wide [`PagerMetrics`] singleton, installed by compute init via +//! [`register`]. Counter observers (`observe_*`) are no-ops until that call +//! lands; the lazy initialization keeps tests and benches that don't wire a +//! [`MetricsRegistry`] free of bookkeeping. + +use std::sync::OnceLock; + +use mz_ore::metric; +use mz_ore::metrics::{ComputedUIntGauge, IntCounter, MetricsRegistry}; + +use crate::column_pager::policy::TieredPolicy; + +/// Process-wide pager metrics. Counters track cumulative observations since +/// process start; gauges read the live policy atomics at scrape time. +#[derive(Debug)] +pub struct PagerMetrics { + /// Number of decisions that kept the chunk resident. + pub skip_decisions_total: IntCounter, + /// Total bytes kept resident by skip decisions. + pub skip_bytes_total: IntCounter, + /// Number of decisions that paged the chunk out. + pub pageouts_total: IntCounter, + /// Uncompressed body bytes handed to the pager for pageout. + pub paged_bytes_in_total: IntCounter, + /// On-storage payload bytes after codec / padding. + pub paged_bytes_out_total: IntCounter, + /// Number of page-ins from `ColumnPager::take`. + pub pageins_total: IntCounter, + /// Total uncompressed bytes delivered by page-in. + pub pagein_bytes_total: IntCounter, + /// Resident-ticket drops returning bytes to the budget. + pub resident_released_total: IntCounter, + /// Total bytes returned to the budget by ticket drops. + pub resident_released_bytes_total: IntCounter, + // Computed gauges are registered with the registry but not held here — + // their collectors are owned by the prometheus registry. +} + +static METRICS: OnceLock = OnceLock::new(); + +/// Install the pager metrics into `registry`. Idempotent — repeated calls +/// after the first one are no-ops. Computed gauges read the singleton +/// [`TieredPolicy`] atomics at scrape time; their values reflect the live +/// policy whether or not the column-paged batcher is currently enabled. +pub fn register(registry: &MetricsRegistry, policy: &'static TieredPolicy) { + let _ = METRICS.get_or_init(|| { + // Computed gauges: closures hold the &'static policy reference. + let _budget_remaining: ComputedUIntGauge = registry.register_computed_gauge( + metric!( + name: "mz_column_pager_budget_remaining_bytes", + help: "Bytes the column-pager tiered policy currently has \ + available for resident columns.", + ), + move || u64::try_from(policy.budget_remaining()).unwrap_or(u64::MAX), + ); + let _budget_configured: ComputedUIntGauge = registry.register_computed_gauge( + metric!( + name: "mz_column_pager_budget_configured_bytes", + help: "Most-recently-configured total budget for the \ + column-pager tiered policy.", + ), + move || u64::try_from(policy.configured_total()).unwrap_or(u64::MAX), + ); + + PagerMetrics { + skip_decisions_total: registry.register(metric!( + name: "mz_column_pager_skip_decisions_total", + help: "Pager decisions that kept the chunk resident.", + )), + skip_bytes_total: registry.register(metric!( + name: "mz_column_pager_skip_bytes_total", + help: "Total bytes kept resident by skip decisions.", + )), + pageouts_total: registry.register(metric!( + name: "mz_column_pager_pageouts_total", + help: "Pager decisions that paged the chunk out.", + )), + paged_bytes_in_total: registry.register(metric!( + name: "mz_column_pager_paged_bytes_in_total", + help: "Total uncompressed bytes handed to the pager for \ + pageout, before any codec is applied.", + )), + paged_bytes_out_total: registry.register(metric!( + name: "mz_column_pager_paged_bytes_out_total", + help: "Total on-storage bytes after codec / padding.", + )), + pageins_total: registry.register(metric!( + name: "mz_column_pager_pageins_total", + help: "Successful page-ins from `ColumnPager::take`.", + )), + pagein_bytes_total: registry.register(metric!( + name: "mz_column_pager_pagein_bytes_total", + help: "Total uncompressed bytes delivered by page-in.", + )), + resident_released_total: registry.register(metric!( + name: "mz_column_pager_resident_released_total", + help: "Resident-ticket drops returning budget.", + )), + resident_released_bytes_total: registry.register(metric!( + name: "mz_column_pager_resident_released_bytes_total", + help: "Total bytes returned to the budget by ticket drops.", + )), + } + }); +} + +#[inline] +fn metrics() -> Option<&'static PagerMetrics> { + METRICS.get() +} + +pub(crate) fn observe_skip(bytes: usize) { + if let Some(m) = metrics() { + m.skip_decisions_total.inc(); + m.skip_bytes_total.inc_by(bytes_to_u64(bytes)); + } +} + +pub(crate) fn observe_pageout(bytes_in: usize, bytes_out: usize) { + if let Some(m) = metrics() { + m.pageouts_total.inc(); + m.paged_bytes_in_total.inc_by(bytes_to_u64(bytes_in)); + m.paged_bytes_out_total.inc_by(bytes_to_u64(bytes_out)); + } +} + +pub(crate) fn observe_pagein(bytes: usize) { + if let Some(m) = metrics() { + m.pageins_total.inc(); + m.pagein_bytes_total.inc_by(bytes_to_u64(bytes)); + } +} + +pub(crate) fn observe_resident_released(bytes: usize) { + if let Some(m) = metrics() { + m.resident_released_total.inc(); + m.resident_released_bytes_total.inc_by(bytes_to_u64(bytes)); + } +} + +fn bytes_to_u64(b: usize) -> u64 { + u64::try_from(b).unwrap_or(u64::MAX) +} diff --git a/src/timely-util/src/column_pager/policy.rs b/src/timely-util/src/column_pager/policy.rs index 61751be3b163e..502b05b99d3bb 100644 --- a/src/timely-util/src/column_pager/policy.rs +++ b/src/timely-util/src/column_pager/policy.rs @@ -21,12 +21,46 @@ //! [`AtomicUsize`] and credited back from whichever thread happens to drop //! the column. -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; use mz_ore::pager::Backend; use crate::column_pager::{Codec, PageDecision, PageEvent, PageHint, PagingPolicy}; +const BACKEND_SWAP: u8 = 0; +const BACKEND_FILE: u8 = 1; + +const CODEC_NONE: u8 = 0; +const CODEC_LZ4: u8 = 1; + +fn encode_backend(b: Backend) -> u8 { + match b { + Backend::Swap => BACKEND_SWAP, + Backend::File => BACKEND_FILE, + } +} + +fn decode_backend(v: u8) -> Backend { + match v { + BACKEND_FILE => Backend::File, + _ => Backend::Swap, + } +} + +fn encode_codec(c: Option) -> u8 { + match c { + None => CODEC_NONE, + Some(Codec::Lz4) => CODEC_LZ4, + } +} + +fn decode_codec(v: u8) -> Option { + match v { + CODEC_LZ4 => Some(Codec::Lz4), + _ => None, + } +} + /// A single-pool byte budget for resident columns. /// /// Each call to [`PagingPolicy::decide`] tries to reserve `len_bytes` from a @@ -58,10 +92,15 @@ use crate::column_pager::{Codec, PageDecision, PageEvent, PageHint, PagingPolicy /// granularity; if profiles show contention we can switch to chunk-sized /// reservations. pub struct TieredPolicy { - /// Remaining budget, in bytes, available for resident columns. + /// Remaining budget, in bytes, available for resident columns. Drains + /// on `decide` (Skip), refills on `record(ResidentReleased)`. budget: AtomicUsize, - backend: Backend, - codec: Option, + /// Last-configured total. `reconfigure` adjusts `budget` by the delta + /// against this value so existing `ResidentTicket`s stay coherent with + /// the running budget after an operator-driven tune. + configured: AtomicUsize, + backend: AtomicU8, + codec: AtomicU8, } impl TieredPolicy { @@ -71,9 +110,36 @@ impl TieredPolicy { pub fn new(total_budget: usize, backend: Backend, codec: Option) -> Self { Self { budget: AtomicUsize::new(total_budget), - backend, - codec, + configured: AtomicUsize::new(total_budget), + backend: AtomicU8::new(encode_backend(backend)), + codec: AtomicU8::new(encode_codec(codec)), + } + } + + /// Adjust this policy in place. Budget moves by `new_total - prev_total` + /// so in-flight `ResidentTicket`s — which still credit this same atomic + /// when they drop — stay coherent with the resized pool. Backend and + /// codec selection take effect on the next [`PagingPolicy::decide`] + /// call. + /// + /// Shrinking the configured total below the in-flight resident set + /// saturates the available budget at zero; subsequent `decide` calls + /// page out until releases bring the pool back above zero. + pub fn reconfigure(&self, new_total: usize, backend: Backend, codec: Option) { + let prev = self.configured.swap(new_total, Ordering::Relaxed); + if new_total > prev { + self.budget.fetch_add(new_total - prev, Ordering::Relaxed); + } else if prev > new_total { + let shrink = prev - new_total; + let _ = self + .budget + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| { + Some(cur.saturating_sub(shrink)) + }); } + self.backend + .store(encode_backend(backend), Ordering::Relaxed); + self.codec.store(encode_codec(codec), Ordering::Relaxed); } /// Returns the current remaining budget in bytes. Useful for metrics or @@ -81,6 +147,11 @@ impl TieredPolicy { pub fn budget_remaining(&self) -> usize { self.budget.load(Ordering::Relaxed) } + + /// Returns the most-recently-configured total. Useful for tests. + pub fn configured_total(&self) -> usize { + self.configured.load(Ordering::Relaxed) + } } impl PagingPolicy for TieredPolicy { @@ -89,8 +160,8 @@ impl PagingPolicy for TieredPolicy { PageDecision::Skip } else { PageDecision::Page { - backend: self.backend, - codec: self.codec, + backend: decode_backend(self.backend.load(Ordering::Relaxed)), + codec: decode_codec(self.codec.load(Ordering::Relaxed)), } } } @@ -218,6 +289,69 @@ mod tests { ); } + /// Reconfigure preserves the in-flight resident set: tickets minted + /// against the old configured total still credit the same atomic when + /// they drop. Growing the configured total adds the delta; shrinking + /// subtracts saturating at zero. + #[mz_ore::test] + fn reconfigure_preserves_in_flight() { + let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None)); + let cp = ColumnPager::new(as_dyn(&policy)); + + // Hold one resident, consuming some budget. + let mut col = sample(256); + let p = cp.page(&mut col); + assert!(matches!(p, PagedColumn::Resident(_, _))); + let consumed = 4 * 1024 - policy.budget_remaining(); + assert!(consumed > 0); + + // Grow the pool by 8 KiB. Available budget should rise by the delta; + // configured total reflects the new size. + let before = policy.budget_remaining(); + policy.reconfigure(4 * 1024 + 8 * 1024, Backend::Swap, None); + assert_eq!(policy.budget_remaining(), before + 8 * 1024); + assert_eq!(policy.configured_total(), 4 * 1024 + 8 * 1024); + + // Drop the resident; the ticket credits the same atomic, even + // though the pool was resized in between. + drop(p); + assert_eq!(policy.budget_remaining(), 4 * 1024 + 8 * 1024); + } + + /// Shrinking the configured total below available budget saturates at + /// zero rather than wrapping. + #[mz_ore::test] + fn reconfigure_shrink_saturates() { + let policy = Arc::new(TieredPolicy::new(1024, Backend::Swap, None)); + // Shrink by more than the current available budget. + policy.reconfigure(0, Backend::Swap, None); + assert_eq!(policy.budget_remaining(), 0); + assert_eq!(policy.configured_total(), 0); + } + + /// Backend / codec selection takes effect on the next decide. + #[mz_ore::test] + fn reconfigure_swaps_backend_and_codec() { + let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None)); + let initial = policy.decide(PageHint { len_bytes: 1 }); + assert!(matches!( + initial, + PageDecision::Page { + backend: Backend::Swap, + codec: None + } + )); + policy.reconfigure(0, Backend::File, Some(Codec::Lz4)); + let updated = policy.decide(PageHint { len_bytes: 1 }); + assert!(matches!( + updated, + PageDecision::Page { + backend: Backend::File, + codec: Some(Codec::Lz4), + } + )); + } + #[mz_ore::test] fn try_consume_atomicity() { let a = AtomicUsize::new(10); diff --git a/src/timely-util/src/columnar.rs b/src/timely-util/src/columnar.rs index 41478b25d5bf1..85e3c3cebb373 100644 --- a/src/timely-util/src/columnar.rs +++ b/src/timely-util/src/columnar.rs @@ -19,7 +19,9 @@ pub mod batcher; pub mod builder; +pub mod builder_input; pub mod consolidate; +pub mod merge_batcher; use std::hash::Hash; @@ -46,6 +48,19 @@ pub type Col2ValBatcher = MergeBatcher< /// A batcher for columnar storage with unit values. pub type Col2KeyBatcher = Col2ValBatcher; +/// Pageable counterpart to [`Col2ValBatcher`]. Routes every chunk produced +/// by chunking, merging, or extract through a [`crate::column_pager::ColumnPager`], +/// so memory pressure can spill chains to a backing store without touching +/// the merge / extract bodies. +/// +/// Drop-in shape at the type level: both aliases take `(K, V, T, R)` and +/// produce a `Batcher, Output = Column<((K, +/// V), T, R)>>`. Call sites can swap with `cargo fix`–style renaming once +/// downstream `Trace`/`Builder` impls have been wired up. The pager itself +/// defaults to [`crate::column_pager::ColumnPager::disabled`]; inject a +/// real one via [`merge_batcher::ColumnMergeBatcher::set_pager`]. +pub type Col2ValPagedBatcher = merge_batcher::ColumnMergeBatcher<(K, V), T, R>; + /// A container based on a columnar store, encoded in aligned bytes. /// /// The type can represent typed data, bytes from Timely, or an aligned allocation. The name diff --git a/src/timely-util/src/columnar/batcher.rs b/src/timely-util/src/columnar/batcher.rs index 6bad47e20681f..d1489e74c9fc3 100644 --- a/src/timely-util/src/columnar/batcher.rs +++ b/src/timely-util/src/columnar/batcher.rs @@ -314,18 +314,11 @@ impl Default for ColumnMerger { /// so the merger can call them without going through any wrapper indirection. impl Column<(D, T, R)> where - D: Columnar + Default, + D: Columnar, for<'a> columnar::Ref<'a, D>: Copy + Ord, T: Columnar + Default + Clone + PartialOrder, for<'a> columnar::Ref<'a, T>: Copy + Ord, R: Columnar + Default + Semigroup + for<'a> Semigroup>, - for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a D>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a T>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a R>, { /// Merge items from sorted inputs into `self`, advancing positions. /// @@ -572,7 +565,6 @@ where let self_view = self.borrow(); let len = self_view.len(); - let mut owned_t = T::default(); // Yield to the framework when either output buffer reaches the // ship threshold, so it can ship a full chunk and hand back a // fresh one. Required by the merger's extract contract: the @@ -580,6 +572,7 @@ where // an inner-loop yield a single call can fill an output well past // threshold. use columnar::Borrow as _; + let mut owned_t = T::default(); while *position < len && !crate::columnar::at_serialized_capacity(&keep_c.borrow()) && !crate::columnar::at_serialized_capacity(&ship_c.borrow()) @@ -616,18 +609,11 @@ where /// [`MergeBatcher`]: differential_dataflow::trace::implementations::merge_batcher::MergeBatcher impl Merger for ColumnMerger where - D: Columnar + Default + 'static, + D: Columnar, for<'a> columnar::Ref<'a, D>: Copy + Ord, - T: Columnar + Default + Clone + Ord + PartialOrder + 'static, + T: Columnar + Default + Clone + Ord + PartialOrder, for<'a> columnar::Ref<'a, T>: Copy + Ord, - R: Columnar + Default + Semigroup + for<'a> Semigroup> + 'static, - for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a D>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a T>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a R>, + R: Columnar + Default + Semigroup + for<'a> Semigroup>, { type Time = T; type Chunk = Column<(D, T, R)>; @@ -804,7 +790,7 @@ where /// Pop a chunk from `stash` or allocate a fresh one. Stashed chunks are /// already cleared via `recycle_chunk`, so they're ready for push. #[inline] -fn empty_chunk(stash: &mut Vec>) -> Column { +pub(crate) fn empty_chunk(stash: &mut Vec>) -> Column { stash.pop().unwrap_or_default() } @@ -817,7 +803,7 @@ fn empty_chunk(stash: &mut Vec>) -> Column { /// cheaply, and pushing them onto `stash` would only displace useful /// recycled allocations. #[inline] -fn recycle_chunk(mut chunk: Column, stash: &mut Vec>) { +pub(crate) fn recycle_chunk(mut chunk: Column, stash: &mut Vec>) { if let Column::Typed(c) = &mut chunk { c.clear(); stash.push(chunk); @@ -837,18 +823,11 @@ fn drain_side( output: &mut Vec>, stash: &mut Vec>, ) where - D: Columnar + Default, + D: Columnar, for<'a> columnar::Ref<'a, D>: Copy + Ord, T: Columnar + Default + Clone + PartialOrder, for<'a> columnar::Ref<'a, T>: Copy + Ord, R: Columnar + Default + Semigroup + for<'a> Semigroup>, - for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a D>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a T>, - for<'a> ::Container: columnar::Push>, - for<'a> ::Container: columnar::Push<&'a R>, { if *pos < head.borrow().len() { // 1-input dispatch — bulk copy that runs to completion; the yield diff --git a/src/timely-util/src/columnar/builder_input.rs b/src/timely-util/src/columnar/builder_input.rs new file mode 100644 index 0000000000000..8dbd0e8347339 --- /dev/null +++ b/src/timely-util/src/columnar/builder_input.rs @@ -0,0 +1,111 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! `BuilderInput` impl for [`Column`] so DD `Builder`s can drain our paged +//! batcher's output without an extra container conversion. +//! +//! Mirrors the impl on [`ColumnationStack`](crate::columnation::ColumnationStack) +//! at `columnation.rs`, but the `Item<'a>` here is a columnar `Ref` tuple +//! rather than a borrowed owned tuple, so: +//! +//! - `Key<'a>` / `Val<'a>` are `Ref<'a, K>` / `Ref<'a, V>` — no `Owned` +//! round-trip on the read side. +//! - `Time` / `Diff` materialize as owned on `into_parts` (the trait +//! contract requires owned for these). +//! +//! Distinct-counts (`key_val_upd_counts`) tally per chunk and sum, accepting +//! at most `chain.len()` over-counts at chunk boundaries. The downstream +//! consumer uses these as capacity hints, so a small over-estimate is +//! cheaper than the alternative (snapshotting `K::Owned` / `V::Owned` +//! across chunk boundaries). + +use columnar::{Columnar, Index, Len}; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput}; +use timely::progress::Timestamp; + +use crate::columnar::Column; + +impl BuilderInput for Column<((K, V), T, R)> +where + K: Columnar, + V: Columnar, + T: Columnar + Timestamp + Lattice + Clone, + R: Columnar + Ord + Semigroup + Clone, + for<'a> columnar::Ref<'a, K>: Copy + Ord, + for<'a> columnar::Ref<'a, V>: Copy + Ord, + KBC: BatchContainer, + VBC: BatchContainer, + for<'a, 'b> KBC::ReadItem<'a>: PartialEq>, + for<'a, 'b> VBC::ReadItem<'a>: PartialEq>, +{ + type Key<'a> = columnar::Ref<'a, K>; + type Val<'a> = columnar::Ref<'a, V>; + type Time = T; + type Diff = R; + + fn into_parts<'a>( + item: Self::Item<'a>, + ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + let ((key, val), time, diff) = item; + (key, val, T::into_owned(time), R::into_owned(diff)) + } + + fn key_eq(this: &columnar::Ref<'_, K>, other: KBC::ReadItem<'_>) -> bool { + KBC::reborrow(other) == *this + } + + fn val_eq(this: &columnar::Ref<'_, V>, other: VBC::ReadItem<'_>) -> bool { + VBC::reborrow(other) == *this + } + + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { + // Per-chunk dedup, summed. Skips cross-chunk equality checks; the + // counts may over-count by up to `chain.len()` (one boundary per + // chunk). Capacity-hint consumers tolerate over-estimates. + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + for col in chain.iter() { + let view = col.borrow(); + let len = view.len(); + if len == 0 { + continue; + } + let mut prev: Option<(columnar::Ref<'_, K>, columnar::Ref<'_, V>)> = None; + for i in 0..len { + let ((k, v), _, _) = view.get(i); + match prev { + None => { + keys += 1; + vals += 1; + } + Some((pk, pv)) => { + if pk != k { + keys += 1; + vals += 1; + } else if pv != v { + vals += 1; + } + } + } + upds += 1; + prev = Some((k, v)); + } + } + (keys, vals, upds) + } +} diff --git a/src/timely-util/src/columnar/merge_batcher.rs b/src/timely-util/src/columnar/merge_batcher.rs new file mode 100644 index 0000000000000..36494df46b71f --- /dev/null +++ b/src/timely-util/src/columnar/merge_batcher.rs @@ -0,0 +1,1061 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Merge-batcher for [`Column`] chunks with per-chunk paging. +//! +//! Forks the [`differential_dataflow`] merge-batcher framework so chains can +//! hold [`PagedColumn`] entries — letting the [`ColumnPager`] page chunks +//! out as they're produced and fetch them back lazily during merge / extract. +//! +//! Reuses the resident building blocks from [`super::batcher`]: +//! [`ColumnChunker`] (input consolidation) and the inherent +//! `Column::merge_from` / `Column::extract` methods (per-chunk merge / split). +//! +//! [`differential_dataflow`]: differential_dataflow::trace::implementations::merge_batcher + +use std::collections::VecDeque; + +use columnar::{Columnar, Index, Len}; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::logging::{BatcherEvent, Logger}; +use differential_dataflow::trace::{Batcher, Builder, Description}; +use timely::Accountable; +use timely::PartialOrder; +use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::dataflow::channels::ContainerBytes; +use timely::progress::Timestamp; +use timely::progress::frontier::{Antichain, AntichainRef}; + +use crate::column_pager::{self, ColumnPager, PagedColumn}; +use crate::columnar::Column; +use crate::columnar::batcher::{ColumnChunker, empty_chunk, recycle_chunk}; + +/// Max recycled empty chunks held in the per-batcher stash. Deliberately +/// tight: the stash is a hot-buffer cache for the result/keep/ship churn, +/// not a hoard. Stash entries are cleared `Column::Typed` allocations that +/// retain capacity but are *not* tracked by [`ColumnPager`]'s +/// `ResidentTicket` accounting, so each one is a chunk's worth of resident +/// bytes the pager's budget doesn't see. There's one stash per arrange +/// batcher per worker, so this multiplies fast. +/// +/// 2 covers steady-state reuse for both code paths: `merge_chains` ships +/// `result` and immediately pulls a refill; `extract_chain` ships `keep` / +/// `ship` and pulls a refill for whichever was at capacity. Heads that +/// drain mid-loop arrive resident from `FetchIter`, so the whole-chunk +/// passthrough fast path keeps most of them off the merge inner loop +/// entirely — only a small minority ever flow back through the stash. +const STASH_CAP: usize = 2; + +/// Don't park a buffer larger than this in the free-list. A transiently +/// oversize merge buffer (post-explosion, past the natural ship threshold) +/// held resident would compete with the pager's budget; drop it and let a +/// fresh default regrow. 2 × the natural ship word count (≈ 4 MiB +/// serialized) keeps normal ship-sized chunks while excluding pathological +/// ones. +const MAX_RECYCLE_BYTES: usize = 1 << 22; + +/// Recycle `chunk` only if the stash isn't already at [`STASH_CAP`] and the +/// chunk isn't oversize per [`MAX_RECYCLE_BYTES`]. `length_in_bytes` is +/// measured before clear, so it reflects the data the chunk was carrying +/// (a proxy for the capacity we'd park). +fn recycle_capped(chunk: Column, stash: &mut Vec>) { + if stash.len() < STASH_CAP && chunk.length_in_bytes() <= MAX_RECYCLE_BYTES { + recycle_chunk(chunk, stash); + } +} + +/// Drives the merge-batcher over [`Column`] chunks routed through a +/// [`ColumnPager`]. +/// +/// Chains hold [`PagedColumn`] entries rather than resident [`Column`]s, so +/// each insert / merge / extract step can hand its output to the pager and +/// store whatever the policy returns (resident, paged, or compressed). Reads +/// during merge materialize lazily via [`FetchIter`]. +/// +/// Resolves its pager lazily per call via [`column_pager::global_pager`], so +/// late-arriving dyncfg updates (e.g. `enable_column_paged_batcher` flipping +/// on after the batcher was constructed) take effect without rebuilding the +/// operator. Tests may override that lookup via [`Self::set_pager`]. +pub struct ColumnMergeBatcher +where + D: Columnar, + T: Columnar, + R: Columnar, +{ + chunker: ColumnChunker<(D, T, R)>, + chains: Vec>>, + lower: Antichain, + frontier: Antichain, + /// Recycled empty `Column::Typed` chunks. Drained heads and shipped result + /// buffers feed in here; subsequent merge / extract calls pop from here + /// instead of starting from a zero-capacity `Column::default()`. Mirrors + /// the stash carried by the upstream `differential_dataflow` merge-batcher + /// framework, which this type forks. Without it, each shipped chunk + /// triggers a fresh per-leaf grow cycle and per-merge-round allocation + /// dominates the inner loop. + stash: Vec>, + /// Optional override. `None` means "read [`column_pager::global_pager`] + /// fresh on every use" — the production path, so worker_config dyncfg + /// changes that re-install the process-global pager take effect on the + /// very next chunk this batcher processes. + pager_override: Option, + logger: Option, + operator_id: usize, +} + +impl ColumnMergeBatcher +where + D: Columnar, + T: Columnar, + R: Columnar, +{ + /// Pin the pager this batcher uses, overriding the thread-local lookup. + /// Mainly for tests; production should leave the override unset so + /// dyncfg-driven re-installs take effect immediately. + pub fn set_pager(&mut self, pager: ColumnPager) { + self.pager_override = Some(pager); + } + + /// Current pager — override if set, else the process-global pager + /// installed by `apply_worker_config`. `ColumnPager` is cheaply + /// cloneable (Arc inside). + fn pager(&self) -> ColumnPager { + self.pager_override + .clone() + .unwrap_or_else(column_pager::global_pager) + } + + /// Push a chain into `self.chains`, emitting a positive `BatcherEvent` + /// covering its resident entries. + fn chain_push(&mut self, chain: VecDeque>) { + self.emit_account(&chain, 1); + self.chains.push(chain); + } + + /// Pop a chain from `self.chains`, emitting a negative `BatcherEvent` + /// retracting its resident entries. + /// + /// Invariant for the retract to reconcile against the matching + /// `chain_push`: chain entries are never mutated in place between push + /// and pop. The only allowed mutation is a full pop / push pair (see + /// `insert_chain` and `merge_by`), so each entry's accounting category + /// — `Resident` vs `Paged` vs `Compressed` — is the same at both ends. + /// If a future change ever pages an entry out in place after push, this + /// path silently double-counts. + fn chain_pop(&mut self) -> Option>> { + let chain = self.chains.pop()?; + self.emit_account(&chain, -1); + Some(chain) + } + + /// Emit a single `BatcherEvent` summing resident accounting across + /// `chain` with the given sign. No-op when no logger is attached. + fn emit_account(&self, chain: &VecDeque>, diff: isize) { + let Some(logger) = &self.logger else { + return; + }; + let (mut records, mut size, mut capacity, mut allocations) = + (0isize, 0isize, 0isize, 0isize); + for entry in chain { + let (r, s, c, a) = account_chunk(entry); + records = records.saturating_add_unsigned(r); + size = size.saturating_add_unsigned(s); + capacity = capacity.saturating_add_unsigned(c); + allocations = allocations.saturating_add_unsigned(a); + } + logger.log(BatcherEvent { + operator: self.operator_id, + records_diff: records.saturating_mul(diff), + size_diff: size.saturating_mul(diff), + capacity_diff: capacity.saturating_mul(diff), + allocations_diff: allocations.saturating_mul(diff), + }); + } +} + +impl Drop for ColumnMergeBatcher +where + D: Columnar, + T: Columnar, + R: Columnar, +{ + fn drop(&mut self) { + // Retract accounting for any chains still resident at drop time so + // the BatcherEvent counters end at zero per-operator. + while self.chain_pop().is_some() {} + } +} + +/// Resident-only accounting. Returns `(records, size_bytes, capacity_bytes, +/// allocations)` for a single chain entry; paged-out entries contribute 0 +/// across the board. +/// +/// `BatcherEvent` feeds the `mz_arrangement_batcher_*_raw` introspection +/// tables, which downstream surface as memory-resource dashboards. Bytes +/// living on swap or in a pager file aren't part of RSS and shouldn't be +/// reported there. +fn account_chunk(entry: &PagedColumn) -> (usize, usize, usize, usize) { + match entry { + PagedColumn::Resident(col, _) => { + let records = usize::try_from(col.record_count()).expect("non-negative"); + let bytes = col.length_in_bytes(); + (records, bytes, bytes, 1) + } + PagedColumn::Paged { .. } | PagedColumn::Compressed { .. } => (0, 0, 0, 0), + } +} + +impl Batcher for ColumnMergeBatcher +where + D: Columnar, + for<'a> columnar::Ref<'a, D>: Copy + Ord, + T: Columnar + Default + Timestamp + PartialOrder, + for<'a> columnar::Ref<'a, T>: Copy + Ord, + R: Columnar + Default + Semigroup + for<'a> Semigroup>, + for<'a> columnar::Ref<'a, R>: Ord, +{ + type Input = Column<(D, T, R)>; + type Output = Column<(D, T, R)>; + type Time = T; + + fn new(logger: Option, operator_id: usize) -> Self { + // No pager snapshot taken here — `self.pager()` reads + // `column_pager::global_pager` per call, so dyncfg-driven re-installs + // take effect on the next chunk. + Self { + chunker: ColumnChunker::default(), + chains: Vec::new(), + lower: Antichain::from_elem(T::minimum()), + frontier: Antichain::new(), + stash: Vec::new(), + pager_override: None, + logger, + operator_id, + } + } + + fn push_container(&mut self, container: &mut Self::Input) { + let pager = self.pager(); + self.chunker.push_into(container); + while let Some(chunk) = self.chunker.extract() { + let paged = pager.page(chunk); + self.insert_chain(VecDeque::from([paged])); + } + } + + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + let pager = self.pager(); + // Finish chunker, fold any tail chunks in. + while let Some(chunk) = self.chunker.finish() { + let paged = pager.page(chunk); + self.insert_chain(VecDeque::from([paged])); + } + + // Merge all remaining chains into one. + while self.chains.len() > 1 { + let a = self.chain_pop().unwrap(); + let b = self.chain_pop().unwrap(); + let merged = self.merge_by(a, b); + self.chain_push(merged); + } + let merged = self.chain_pop().unwrap_or_default(); + + // Extract `merged` into `readied` (ship side, materialized for the + // builder) and `kept_chain` (keep side, stays paged for the next + // round). + let mut readied: Vec> = Vec::new(); + let mut kept_chain: VecDeque> = VecDeque::new(); + self.frontier.clear(); + { + let pager = &pager; + let frontier = &mut self.frontier; + let stash = &mut self.stash; + extract_chain( + FetchIter::new(merged, pager), + upper.borrow(), + frontier, + |paged| readied.push(pager.take(paged)), + |paged| kept_chain.push_back(paged), + stash, + ); + } + + if !kept_chain.is_empty() { + self.chain_push(kept_chain); + } + + let description = Description::new( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(T::minimum()), + ); + let seal = B::seal(&mut readied, description); + self.lower = upper; + + // Drop the recycle stash now that this round's hot work is done. + // The next merge after the next `push_container` will re-pay one + // chunk's worth of leaf-`Vec` grow tax, but that's a few hundred µs + // amortized over a seal cycle, well worth handing the leaf bytes + // back to the allocator so they're not held resident across what + // may be a quiet stretch. + self.stash.clear(); + + seal + } + + fn frontier(&mut self) -> AntichainRef<'_, Self::Time> { + self.frontier.borrow() + } +} + +impl ColumnMergeBatcher +where + D: Columnar, + for<'a> columnar::Ref<'a, D>: Copy + Ord, + T: Columnar + Default + Clone + PartialOrder, + for<'a> columnar::Ref<'a, T>: Copy + Ord, + R: Columnar + Default + Semigroup + for<'a> Semigroup>, +{ + /// Insert `chain` and rebalance: while the youngest chain is at least + /// half the size of its predecessor, merge them. + fn insert_chain(&mut self, chain: VecDeque>) { + if chain.is_empty() { + return; + } + self.chain_push(chain); + while self.chains.len() > 1 + && self.chains[self.chains.len() - 1].len() + >= self.chains[self.chains.len() - 2].len() / 2 + { + let a = self.chain_pop().unwrap(); + let b = self.chain_pop().unwrap(); + let merged = self.merge_by(a, b); + self.chain_push(merged); + } + } + + /// Merge two sorted chains. Outputs are routed through `self.pager.page` + /// per chunk produced, so the result chain holds `PagedColumn`s and the + /// caller never sees a fully materialized merge result. + fn merge_by( + &mut self, + a: VecDeque>, + b: VecDeque>, + ) -> VecDeque> { + let mut output: VecDeque> = VecDeque::new(); + let pager = self.pager(); + let pager = &pager; + let stash = &mut self.stash; + merge_chains( + FetchIter::new(a, pager), + FetchIter::new(b, pager), + |paged| output.push_back(paged), + stash, + ); + output + } +} + +/// Streaming materializer over a chain of [`PagedColumn`] entries. +/// +/// `next` consumes one entry and calls [`ColumnPager::take`] to produce a +/// resident [`Column`]. Bounds materialized chunks to whatever the consumer +/// holds (typically one head per chain in [`merge_chains`]). +pub struct FetchIter<'a, D, T, R> +where + (D, T, R): Columnar, +{ + queue: VecDeque>, + pager: &'a ColumnPager, +} + +impl<'a, D, T, R> FetchIter<'a, D, T, R> +where + (D, T, R): Columnar, +{ + /// Wraps `queue` for streaming materialization through `pager`. + pub fn new(queue: VecDeque>, pager: &'a ColumnPager) -> Self { + Self { queue, pager } + } + + /// Borrow the pager backing this iter so drivers can route output chunks + /// back through `page()` without threading a separate `&pager`. The + /// returned reference is tied to the outer `'a`, not to `&self`, so it + /// stays valid across subsequent `next()` calls. + pub fn pager(&self) -> &'a ColumnPager { + self.pager + } + + /// Drain remaining queued entries as `PagedColumn`s without materializing. + /// Used by `merge_chains`'s drain-tail phase: once the other side is + /// exhausted, the remaining entries on this side can pass straight to the + /// output sink. + pub fn into_paged(self) -> std::collections::vec_deque::IntoIter> { + self.queue.into_iter() + } +} + +impl Iterator for FetchIter<'_, D, T, R> +where + (D, T, R): Columnar, +{ + type Item = Column<(D, T, R)>; + + fn next(&mut self) -> Option { + self.queue.pop_front().map(|p| self.pager.take(p)) + } +} + +/// Two-way merge driver. Reuses today's per-chunk gallop / ship-threshold +/// logic from `Column::merge_from`, but pulls heads from [`FetchIter`] and +/// emits finished output chunks through `sink` after routing them through +/// the pager exposed by [`FetchIter::pager`]. +/// +/// `stash` is a pool of empty `Column::Typed` chunks. Drained heads and +/// shipped result buffers get recycled into it; the next result chunk is +/// pulled from it instead of starting from a zero-capacity default. This +/// matches the recycling discipline the upstream `differential_dataflow` +/// merge-batcher carries via `Merger::merge`'s `stash` parameter. +/// +/// Whole-chunk passthrough: heads arrive materialized from [`FetchIter`], so +/// peeking endpoints is free. When the current head on one side sorts +/// entirely before the current record on the other side, ship it wholesale +/// and skip the per-record merge. Gated on `positions[i] == 0` so we hand +/// the head off intact — partial-tail passthrough would need a 1-input +/// `merge_from` to copy the tail, which is what the inner loop's gallop +/// already covers. +pub fn merge_chains( + list1: FetchIter<'_, D, T, R>, + list2: FetchIter<'_, D, T, R>, + mut sink: Sink, + stash: &mut Vec>, +) where + D: Columnar, + for<'a> columnar::Ref<'a, D>: Copy + Ord, + T: Columnar + Default + Clone + PartialOrder, + for<'a> columnar::Ref<'a, T>: Copy + Ord, + R: Columnar + Default + Semigroup + for<'a> Semigroup>, + Sink: FnMut(PagedColumn<(D, T, R)>), +{ + let pager = list1.pager(); + let mut list1 = list1; + let mut list2 = list2; + + let mut heads = [ + list1.next().unwrap_or_default(), + list2.next().unwrap_or_default(), + ]; + let mut positions = [0usize, 0usize]; + let mut result: Column<(D, T, R)> = empty_chunk(stash); + + loop { + let upper_l = heads[0].borrow().len(); + let upper_r = heads[1].borrow().len(); + if positions[0] >= upper_l || positions[1] >= upper_r { + break; + } + + // Whole-chunk passthrough. Two probes on already-resident heads. + let lhs_passthrough = positions[0] == 0 && upper_l > 0 && { + let lhs = heads[0].borrow(); + let rhs = heads[1].borrow(); + let last_l = (lhs.0.get(upper_l - 1), lhs.1.get(upper_l - 1)); + let cur_r = (rhs.0.get(positions[1]), rhs.1.get(positions[1])); + last_l < cur_r + }; + if lhs_passthrough { + if !result.is_empty() { + sink(pager.page(&mut result)); + if let Some(reuse) = stash.pop() { + result = reuse; + } + } + let mut head = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); + sink(pager.page(&mut head)); + positions[0] = 0; + continue; + } + + let rhs_passthrough = positions[1] == 0 && upper_r > 0 && { + let lhs = heads[0].borrow(); + let rhs = heads[1].borrow(); + let last_r = (rhs.0.get(upper_r - 1), rhs.1.get(upper_r - 1)); + let cur_l = (lhs.0.get(positions[0]), lhs.1.get(positions[0])); + last_r < cur_l + }; + if rhs_passthrough { + if !result.is_empty() { + sink(pager.page(&mut result)); + if let Some(reuse) = stash.pop() { + result = reuse; + } + } + let mut head = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); + sink(pager.page(&mut head)); + positions[1] = 0; + continue; + } + + let yielded = result.merge_from(&mut heads, &mut positions); + + if positions[0] >= heads[0].borrow().len() { + let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); + recycle_capped(old, stash); + positions[0] = 0; + } + if positions[1] >= heads[1].borrow().len() { + let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); + recycle_capped(old, stash); + positions[1] = 0; + } + if yielded || result.at_capacity() { + sink(pager.page(&mut result)); + // `pager.page` either took `result`'s allocation (Skip path leaves + // a zero-cap default) or kept the Typed buffer (Paged / Compressed + // paths clear in place). Pull a fresh chunk from the stash so the + // next `merge_from` starts with retained capacity; if the stash is + // empty, fall back to whatever `result` already is. + if let Some(reuse) = stash.pop() { + result = reuse; + } + } + } + + // Drain remaining: copy partial head through `merge_from`'s 1-input + // dispatch, then hand the rest of the chain's `PagedColumn`s straight to + // the sink without materializing. + drain_side( + &mut heads[0], + &mut positions[0], + list1, + &mut result, + &mut sink, + pager, + stash, + ); + drain_side( + &mut heads[1], + &mut positions[1], + list2, + &mut result, + &mut sink, + pager, + stash, + ); + + if !result.is_empty() { + sink(pager.page(&mut result)); + } else { + // Empty `result` may still carry a useful Typed allocation; recycle + // so subsequent calls (next `merge_by`, the seal `extract_chain`) + // can pick it up. + recycle_capped(result, stash); + } + // Recycle the now-exhausted (or default) head slots too — for `Resident` + // heads that finished naturally, this preserves their Typed allocation + // for the next call. + let [h0, h1] = heads; + recycle_capped(h0, stash); + recycle_capped(h1, stash); +} + +/// Helper for `merge_chains`'s drain phase: copy a partially-consumed head +/// into `result` (via 1-input `merge_from`), ship `result` if non-empty, then +/// pass the remaining queued `PagedColumn`s straight through. +fn drain_side( + head: &mut Column<(D, T, R)>, + pos: &mut usize, + rest: FetchIter<'_, D, T, R>, + result: &mut Column<(D, T, R)>, + sink: &mut Sink, + pager: &ColumnPager, + stash: &mut Vec>, +) where + D: Columnar, + for<'a> columnar::Ref<'a, D>: Copy + Ord, + T: Columnar + Default + Clone + PartialOrder, + for<'a> columnar::Ref<'a, T>: Copy + Ord, + R: Columnar + Default + Semigroup + for<'a> Semigroup>, + Sink: FnMut(PagedColumn<(D, T, R)>), +{ + if *pos < head.borrow().len() { + // 1-input dispatch — bulk copy that runs to completion. + let _ = result.merge_from(std::slice::from_mut(head), std::slice::from_mut(pos)); + } + if !result.is_empty() { + sink(pager.page(result)); + if let Some(reuse) = stash.pop() { + *result = reuse; + } + } + for paged in rest.into_paged() { + sink(paged); + } +} + +/// Streaming extract: walks `merged` chunk-by-chunk via `Column::extract`, +/// routing each filled keep/ship chunk through its sink after pageing. +/// Mirrors the per-chunk ship-threshold yield already inside +/// `Column::extract`. +/// +/// `stash` carries recycled `Column::Typed` buffers in and out so the +/// per-chunk extract loop doesn't restart from zero capacity each time +/// `keep_buf` / `ship_buf` ships and the source `buffer` is dropped. +pub fn extract_chain( + merged: FetchIter<'_, D, T, R>, + upper: AntichainRef, + frontier: &mut Antichain, + mut ship: SinkShip, + mut keep: SinkKeep, + stash: &mut Vec>, +) where + D: Columnar, + for<'a> columnar::Ref<'a, D>: Copy + Ord, + T: Columnar + Default + Clone + PartialOrder, + for<'a> columnar::Ref<'a, T>: Copy + Ord, + R: Columnar + Default + Semigroup + for<'a> Semigroup>, + SinkShip: FnMut(PagedColumn<(D, T, R)>), + SinkKeep: FnMut(PagedColumn<(D, T, R)>), +{ + let pager = merged.pager(); + let mut keep_buf: Column<(D, T, R)> = empty_chunk(stash); + let mut ship_buf: Column<(D, T, R)> = empty_chunk(stash); + + for mut buffer in merged { + let mut position = 0; + let len = buffer.borrow().len(); + while position < len { + buffer.extract(&mut position, upper, frontier, &mut keep_buf, &mut ship_buf); + if keep_buf.at_capacity() { + keep(pager.page(&mut keep_buf)); + if let Some(reuse) = stash.pop() { + keep_buf = reuse; + } + } + if ship_buf.at_capacity() { + ship(pager.page(&mut ship_buf)); + if let Some(reuse) = stash.pop() { + ship_buf = reuse; + } + } + } + // Buffer fully consumed; recycle whatever Typed allocation it had. + recycle_capped(buffer, stash); + } + if !keep_buf.is_empty() { + keep(pager.page(&mut keep_buf)); + } else { + recycle_capped(keep_buf, stash); + } + if !ship_buf.is_empty() { + ship(pager.page(&mut ship_buf)); + } else { + recycle_capped(ship_buf, stash); + } +} + +#[cfg(test)] +#[allow(clippy::clone_on_ref_ptr)] +mod tests { + use std::sync::Arc; + + use columnar::Index; + use timely::container::PushInto as _; + + use super::*; + use crate::column_pager::{PageDecision, PageEvent, PageHint, PagingPolicy}; + + // ----- helpers ----------------------------------------------------------- + + type KvUpdate = ((u64, u64), u64, i64); + + fn col(rows: &[KvUpdate]) -> Column { + let mut c: Column = Default::default(); + for &t in rows { + c.push_into(t); + } + c + } + + fn collect_pc(chunks: &[PagedColumn], pager: &ColumnPager) -> Vec { + // `collect_pc` peeks via materialization on a side path so the test's + // assertions don't consume the chain. + chunks + .iter() + .flat_map(|p| { + let view: Column = match p { + PagedColumn::Resident(c, _) => clone_column(c), + _ => pager.take(clone_paged(p)), + }; + collect_column(&view).into_iter() + }) + .collect() + } + + fn collect_column(c: &Column) -> Vec { + c.borrow() + .into_index_iter() + .map(|((k, v), t, r)| { + ( + (u64::into_owned(k), u64::into_owned(v)), + u64::into_owned(t), + i64::into_owned(r), + ) + }) + .collect() + } + + fn clone_column(c: &Column) -> Column { + // `Column` is `Clone` when `C::Container: Clone`, which is true for + // tuple-of-primitive containers. Used so test helpers can peek at a + // chain without consuming it. + c.clone() + } + + /// Helper that bypasses `pager.take` for non-`Resident` variants by + /// taking and re-pageing. Only used in test inspection paths where the + /// extra round-trip is acceptable. + fn clone_paged(p: &PagedColumn) -> PagedColumn { + match p { + PagedColumn::Resident(c, _) => { + // Wrap via a disabled pager so the ticket is fresh. + let mut c = c.clone(); + ColumnPager::disabled().page(&mut c) + } + // For paged/compressed variants we can't clone without + // re-reading; the tests below only inspect Resident chains. + _ => panic!("clone_paged only supports Resident"), + } + } + + /// Always-page policy: bypasses any resident shortcut so we can assert + /// the chains remain in `Paged` form regardless of memory pressure. + struct ForcePagePolicy { + out: std::sync::atomic::AtomicUsize, + r#in: std::sync::atomic::AtomicUsize, + } + impl ForcePagePolicy { + fn new() -> Arc { + Arc::new(Self { + out: std::sync::atomic::AtomicUsize::new(0), + r#in: std::sync::atomic::AtomicUsize::new(0), + }) + } + } + impl PagingPolicy for ForcePagePolicy { + fn decide(&self, _hint: PageHint) -> PageDecision { + PageDecision::Page { + backend: mz_ore::pager::Backend::Swap, + codec: None, + } + } + fn record(&self, event: PageEvent) { + use std::sync::atomic::Ordering; + match event { + PageEvent::PagedOut { .. } => { + self.out.fetch_add(1, Ordering::Relaxed); + } + PageEvent::PagedIn { .. } => { + self.r#in.fetch_add(1, Ordering::Relaxed); + } + _ => {} + } + } + } + + /// Wrap a Vec as a paged chain for `FetchIter`. + fn to_chain( + cols: Vec>, + pager: &ColumnPager, + ) -> VecDeque> { + cols.into_iter().map(|mut c| pager.page(&mut c)).collect() + } + + /// Drive `merge_chains` with a disabled pager and return owned tuples. + fn drive_merge(chain1: Vec>, chain2: Vec>) -> Vec { + let pager = ColumnPager::disabled(); + let q1 = to_chain(chain1, &pager); + let q2 = to_chain(chain2, &pager); + let mut output: Vec> = Vec::new(); + let mut stash: Vec> = Vec::new(); + merge_chains( + FetchIter::new(q1, &pager), + FetchIter::new(q2, &pager), + |paged| output.push(paged), + &mut stash, + ); + collect_pc(&output, &pager) + } + + // ----- merge_chains correctness ----------------------------------------- + + /// Disjoint chains: same data as the legacy passthrough test. Without + /// passthrough, the merger runs per-record but should still produce the + /// fully ordered output. + #[mz_ore::test] + fn merge_chains_disjoint_ranges() { + let out = drive_merge( + vec![ + col(&[((0, 0), 0, 1), ((1, 0), 0, 1)]), + col(&[((2, 0), 0, 1), ((3, 0), 0, 1)]), + ], + vec![ + col(&[((10, 0), 0, 1), ((11, 0), 0, 1)]), + col(&[((12, 0), 0, 1), ((13, 0), 0, 1)]), + ], + ); + let expected: Vec<_> = (0..4u64) + .map(|d| ((d, 0u64), 0u64, 1i64)) + .chain((10..14u64).map(|d| ((d, 0u64), 0u64, 1i64))) + .collect(); + assert_eq!(out, expected); + } + + /// Interleaved chains: every record alternates between the two chains. + #[mz_ore::test] + fn merge_chains_interleaved() { + let out = drive_merge( + vec![ + col(&[((0, 0), 0, 1), ((2, 0), 0, 1)]), + col(&[((4, 0), 0, 1), ((6, 0), 0, 1)]), + ], + vec![ + col(&[((1, 0), 0, 1), ((3, 0), 0, 1)]), + col(&[((5, 0), 0, 1), ((7, 0), 0, 1)]), + ], + ); + let expected: Vec<_> = (0..8u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect(); + assert_eq!(out, expected); + } + + /// Equal-key consolidation across chunk boundaries: chain1's last record + /// shares `(d, t)` with chain2's first; sum of diffs should land on a + /// single output record. + #[mz_ore::test] + fn merge_chains_equal_boundary() { + let out = drive_merge( + vec![col(&[((0, 0), 0, 1), ((5, 0), 0, 1)])], + vec![col(&[((5, 0), 0, 1), ((10, 0), 0, 1)])], + ); + assert_eq!(out, vec![((0, 0), 0, 1), ((5, 0), 0, 2), ((10, 0), 0, 1)]); + } + + /// Same merge, force-paged: chains stay in `Paged` form throughout, and + /// the consolidated result still matches. + #[mz_ore::test] + fn merge_chains_force_paged_round_trip() { + let policy = ForcePagePolicy::new(); + let pager = ColumnPager::new(policy.clone()); + let q1 = to_chain(vec![col(&[((0, 0), 0, 1), ((2, 0), 0, 1)])], &pager); + let q2 = to_chain(vec![col(&[((1, 0), 0, 1), ((3, 0), 0, 1)])], &pager); + + // Confirm the chains started paged-out (not Resident). + assert!(matches!(q1.front().unwrap(), PagedColumn::Paged { .. })); + assert!(matches!(q2.front().unwrap(), PagedColumn::Paged { .. })); + + let mut output: Vec> = Vec::new(); + let mut stash: Vec> = Vec::new(); + merge_chains( + FetchIter::new(q1, &pager), + FetchIter::new(q2, &pager), + |paged| output.push(paged), + &mut stash, + ); + + // Output entries should also have been routed through the pager. + for p in &output { + assert!(matches!(p, PagedColumn::Paged { .. })); + } + + // Materialize the output and check correctness. + let mut collected = Vec::new(); + for p in output { + let c = pager.take(p); + collected.extend(collect_column(&c)); + } + let expected: Vec<_> = (0..4u64).map(|d| ((d, 0u64), 0u64, 1i64)).collect(); + assert_eq!(collected, expected); + } + + // ----- extract_chain correctness ---------------------------------------- + + #[mz_ore::test] + fn extract_chain_partitions_by_frontier() { + let pager = ColumnPager::disabled(); + let data = vec![ + ((0, 0), 0u64, 1i64), + ((1, 0), 1, 1), + ((2, 0), 2, 1), + ((3, 0), 3, 1), + ]; + let chain = to_chain(vec![col(&data)], &pager); + let upper = Antichain::from_elem(2u64); + let mut frontier: Antichain = Antichain::new(); + let mut ship: Vec> = Vec::new(); + let mut keep: Vec> = Vec::new(); + let mut stash: Vec> = Vec::new(); + + extract_chain( + FetchIter::new(chain, &pager), + upper.borrow(), + &mut frontier, + |p| ship.push(p), + |p| keep.push(p), + &mut stash, + ); + + let shipped = collect_pc(&ship, &pager); + let kept = collect_pc(&keep, &pager); + for (_, t, _) in &shipped { + assert!(*t < 2, "shipped time {t} should be < upper"); + } + for (_, t, _) in &kept { + assert!(*t >= 2, "kept time {t} should be >= upper"); + } + assert_eq!(shipped.len() + kept.len(), data.len()); + } + + // ----- ColumnMergeBatcher end-to-end ------------------------------------ + + /// Trivial Builder used by `seal`: collects inputs into a Vec for the + /// test to inspect. + #[derive(Default)] + struct VecBuilder; + impl differential_dataflow::trace::Builder for VecBuilder { + type Input = Column; + type Time = u64; + type Output = Vec; + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + Self + } + fn push(&mut self, _chunk: &mut Self::Input) {} + fn done( + self, + _description: differential_dataflow::trace::Description, + ) -> Self::Output { + Vec::new() + } + fn seal( + chain: &mut Vec, + _description: differential_dataflow::trace::Description, + ) -> Self::Output { + let mut out = Vec::new(); + for c in chain.drain(..) { + out.extend(collect_column(&c)); + } + out + } + } + + #[mz_ore::test] + fn batcher_seal_round_trip() { + let mut b: ColumnMergeBatcher<(u64, u64), u64, i64> = + differential_dataflow::trace::Batcher::new(None, 0); + // Two pushes; second has an equal-key collision with the first. + let mut input1 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1), ((3, 0), 0, 1)]); + let mut input2 = col(&[((2, 0), 0, 2), ((4, 0), 0, 1)]); + differential_dataflow::trace::Batcher::push_container(&mut b, &mut input1); + differential_dataflow::trace::Batcher::push_container(&mut b, &mut input2); + + // Seal everything (upper = ∞-ish, here just past any time we used). + let upper = Antichain::from_elem(u64::MAX); + let out: Vec = + differential_dataflow::trace::Batcher::seal::(&mut b, upper); + + // (2, 0)@0 was pushed with +1 then +2; sums to +3 after consolidation. + let mut expected = vec![ + ((1u64, 1u64), 0u64, 1i64), + ((2, 0), 0, 3), + ((3, 0), 0, 1), + ((4, 0), 0, 1), + ]; + expected.sort(); + let mut out_sorted = out.clone(); + out_sorted.sort(); + assert_eq!(out_sorted, expected); + } + + #[mz_ore::test] + fn account_chunk_resident_vs_paged() { + let policy = ForcePagePolicy::new(); + let pager_paged = ColumnPager::new(policy.clone()); + let pager_res = ColumnPager::disabled(); + + let mut c1 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1), ((3, 0), 0, 1)]); + let resident = pager_res.page(&mut c1); + let (records, size, capacity, allocations) = account_chunk(&resident); + assert_eq!(records, 3); + assert!(size > 0); + assert_eq!(size, capacity); + assert_eq!(allocations, 1); + + let mut c2 = col(&[((1, 1), 0, 1), ((2, 0), 0, 1)]); + let paged = pager_paged.page(&mut c2); + assert!(matches!(paged, PagedColumn::Paged { .. })); + // Paged variants contribute zero to memory accounting. + assert_eq!(account_chunk(&paged), (0, 0, 0, 0)); + } + + #[mz_ore::test] + fn batcher_seal_keeps_kept_chain_paged() { + // Force-page policy; verify that after seal, the kept chain in + // self.chains contains only Paged entries (no Resident). + let policy = ForcePagePolicy::new(); + let pager = ColumnPager::new(policy.clone()); + + let mut b: ColumnMergeBatcher<(u64, u64), u64, i64> = + differential_dataflow::trace::Batcher::new(None, 0); + b.set_pager(pager); + + // Push records straddling an upper of 5 — half should be kept, half + // shipped. Use enough records to fill at least one chunk. + let n: u64 = 200; + for i in 0..n { + let mut input = col(&[((i, 0), i % 10, 1)]); + differential_dataflow::trace::Batcher::push_container(&mut b, &mut input); + } + let upper = Antichain::from_elem(5u64); + let _ = differential_dataflow::trace::Batcher::seal::(&mut b, upper); + + // Anything kept (times >= 5) should be sitting in b.chains as paged. + let kept_records: usize = b + .chains + .iter() + .flat_map(|c| c.iter()) + .map(|p| match p { + PagedColumn::Paged { meta, .. } => { + // Records aren't directly available here; sanity-check + // that no Resident snuck in. + let _ = meta; + 1 + } + PagedColumn::Compressed { meta, .. } => { + let _ = meta; + 1 + } + PagedColumn::Resident(_, _) => { + panic!("kept chain entry was Resident under ForcePagePolicy"); + } + }) + .sum(); + // We expect *some* kept entries (times in [5..10) loop slot). + assert!(kept_records > 0, "expected at least one kept paged entry"); + assert!(policy.out.load(std::sync::atomic::Ordering::Relaxed) > 0); + let _ = n; + } +} diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index 5181dc7175fa0..d5ac8d52d4fca 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -187,15 +187,27 @@ def run_one_scenario( early_abort = False - for mz_id, instance in enumerate(["this", "other"]): - balancerd, tag, size, params = ( - (args.this_balancerd, args.this_tag, args.this_size, args.this_params) + instances = ["this"] if args.skip_other else ["this", "other"] + for mz_id, instance in enumerate(instances): + balancerd, tag, size, params, memory, memory_swap, mem_swappiness = ( + ( + args.this_balancerd, + args.this_tag, + args.this_size, + args.this_params, + args.this_memory, + args.this_memory_swap, + args.this_mem_swappiness, + ) if instance == "this" else ( args.other_balancerd, args.other_tag, args.other_size, args.other_params, + args.other_memory, + args.other_memory_swap, + args.other_mem_swappiness, ) ) @@ -226,10 +238,18 @@ def run_one_scenario( size, additional_system_parameter_defaults, args.azurite and instance == "this", + memory=memory, + memory_swap=memory_swap, + mem_swappiness=mem_swappiness, ) clusterd_image = f"materialize/clusterd:{tag}" if tag else None clusterd = create_clusterd_service( - clusterd_image, size, additional_system_parameter_defaults + clusterd_image, + size, + additional_system_parameter_defaults, + memory=memory, + memory_swap=memory_swap, + mem_swappiness=mem_swappiness, ) if tag is not None and not c.try_pull_service_image(mz): @@ -243,10 +263,18 @@ def run_one_scenario( size, additional_system_parameter_defaults, args.azurite and instance == "this", + memory=memory, + memory_swap=memory_swap, + mem_swappiness=mem_swappiness, ) clusterd_image = f"materialize/clusterd:{tag}" if tag else None clusterd = create_clusterd_service( - clusterd_image, size, additional_system_parameter_defaults + clusterd_image, + size, + additional_system_parameter_defaults, + memory=memory, + memory_swap=memory_swap, + mem_swappiness=mem_swappiness, ) start_overridden_mz_clusterd_and_cockroach( @@ -358,6 +386,9 @@ def create_mz_service( default_size: int, additional_system_parameter_defaults: dict[str, str] | None, azurite: bool, + memory: str | None = None, + memory_swap: str | None = None, + mem_swappiness: int | None = None, ) -> Materialized: return Materialized( image=mz_image, @@ -372,6 +403,9 @@ def create_mz_service( blob_store_is_azure=azurite, sanity_restart=False, support_external_clusterd=True, + memory=memory, + memory_swap=memory_swap, + mem_swappiness=mem_swappiness, ) @@ -379,8 +413,16 @@ def create_clusterd_service( clusterd_image: str | None, default_size: int, additional_system_parameter_defaults: dict[str, str] | None, + memory: str | None = None, + memory_swap: str | None = None, + mem_swappiness: int | None = None, ) -> Clusterd: - return Clusterd(image=clusterd_image) + return Clusterd( + image=clusterd_image, + memory=memory, + memory_swap=memory_swap, + mem_swappiness=mem_swappiness, + ) def start_overridden_mz_clusterd_and_cockroach( @@ -517,6 +559,71 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: help="SIZE use for 'THIS'", ) + parser.add_argument( + "--this-memory", + metavar="MEM", + type=str, + default=os.getenv("THIS_MEMORY", None), + help="Docker memory limit for the 'THIS' Materialized + Clusterd " + "containers (e.g. '2g', '512m'). Defaults to no limit. Useful for " + "exercising spill paths under realistic pressure.", + ) + + parser.add_argument( + "--other-memory", + metavar="MEM", + type=str, + default=os.getenv("OTHER_MEMORY", None), + help="Docker memory limit for the 'OTHER' Materialized + Clusterd containers.", + ) + + parser.add_argument( + "--this-memory-swap", + metavar="MEM", + type=str, + default=os.getenv("THIS_MEMORY_SWAP", None), + help="Total RAM + swap available to the 'THIS' Materialized + Clusterd " + "containers (e.g. '5g'). Must be >= --this-memory to enable swap. " + "Lets the host kernel swap pages instead of OOM-killing under " + "memory pressure — useful for benchmarking OS swap as a baseline " + "vs application-managed spill.", + ) + + parser.add_argument( + "--this-mem-swappiness", + metavar="N", + type=int, + default=None, + help="`mem_swappiness` (0-100) for the 'THIS' containers. Higher " + "values bias the kernel toward swapping anonymous pages aggressively " + "instead of dropping page cache. Default leaves Docker's default.", + ) + + parser.add_argument( + "--other-memory-swap", + metavar="MEM", + type=str, + default=os.getenv("OTHER_MEMORY_SWAP", None), + help="Total RAM + swap for the 'OTHER' containers.", + ) + + parser.add_argument( + "--other-mem-swappiness", + metavar="N", + type=int, + default=None, + help="`mem_swappiness` (0-100) for the 'OTHER' containers.", + ) + + parser.add_argument( + "--skip-other", + action=argparse.BooleanOptionalAction, + default=False, + help="Run only the 'THIS' side; skip the comparison against 'OTHER'. " + "Useful for iterating on a new scenario without re-running the " + "baseline tag every time.", + ) + parser.add_argument( "--ignore-other-tag-missing", action=argparse.BooleanOptionalAction, @@ -637,7 +744,8 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: scenario_classes_remaining = [ scenario_class for scenario_class in scenario_classes_remaining - if report.has_scenario_regression(scenario_class.__name__) + if report.has_scenario_result(scenario_class.__name__) + and report.has_scenario_regression(scenario_class.__name__) ] if not scenario_classes_remaining: if run_number < args.runs_per_scenario: