Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions misc/python/materialize/feature_benchmark/benchmark_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def this_as_str(self) -> str:
return f"{self.this():>11.3f}"

def other(self) -> T:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); treat the absent baseline as `None` so
# report rendering falls through to its `None` formatting.
if len(self._points) < 2:
return None # type: ignore[return-value]
return self._points[1]

def other_as_str(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def get_threshold(self, metric: BenchmarkScenarioMetric) -> float:
return self.threshold_by_measurement_type[metric.measurement_type]

def ratio(self, metric: BenchmarkScenarioMetric) -> float | None:
if metric._points[0] is None or metric._points[1] is None:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); there's no baseline to compare against.
if (
len(metric._points) < 2
or metric._points[0] is None
or metric._points[1] is None
):
return None
else:
return metric._points[0] / metric._points[1]
Expand Down
151 changes: 151 additions & 0 deletions misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,157 @@ def benchmark(self) -> MeasurementSource:
""")


class DifferentialJoinColumnPaged(Dataflow):
"""Same shape as `DifferentialJoin`, but with the column-paged merge
batcher enabled for the linear-join arrange stage.

Compare against `DifferentialJoin` to gauge the steady-state overhead of
the paged path (resident chunks plus byte-budget bookkeeping) when no
pressure forces spill. To measure spill cost, see
`DifferentialJoinHydrationFile`.
"""

@classmethod
def can_run(cls, version: MzVersion) -> bool:
# Requires `enable_column_paged_batcher`, introduced in the merge
# batcher rework that lands in 26.28.0. Skips on both sides while
# we're still on `26.28.0-dev.0` because `MzVersion` strips the git
# hash and can't distinguish dev builds with the dyncfg from dev
# builds without it.
return version > MzVersion.create(26, 28, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""),
]

def benchmark(self) -> MeasurementSource:
return Td(f"""
> SELECT 1;
/* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
/* B */
{self.n()}
""")


class DifferentialJoinHydration(Dataflow):
"""Non-leaf parent for the linear-join hydration benchmark family.

Holds the shared `init` / `benchmark` (replica-toggle hydration loop) so
Baseline and File variants only need to override `shared()` with the
dyncfgs they want set. Has subclasses, so the feature-benchmark runner
treats it as non-leaf and never executes it directly — pick one of the
leaf classes via `--root-scenario`.

Run both leaves under a memory-capped Materialized (`--this-memory=2g`)
so the baseline has to swap and the paged-file variant has somewhere
predictable to spill.
"""

# SCALE=8 → 100M rows per side, ~1.6 GiB per side input. Two sides plus
# the join arrangement (typically 2–4× input) reliably exceeds a few
# GiB total; a 2g container cap forces real swap pressure on the
# baseline. File variant's 16 MiB per-worker + 128 MiB shared budget
# means almost everything spills under that cap.
SCALE = 8

@classmethod
def can_run(cls, version: MzVersion) -> bool:
# Both leaf variants set `enable_column_paged_batcher` (baseline =
# false, file = true), so gate on the dyncfg's introduction in
# 26.28.0. Comment in `DifferentialJoinColumnPaged.can_run` explains
# why dev versions can't distinguish the dyncfg's presence.
return version > MzVersion.create(26, 28, 0)

def init(self) -> list[Action]:
# `v1` lives on the default cluster, not `join_cluster`, so the
# replication-factor toggle in `benchmark` only tears down `v2`'s
# dataflow. Keeps the measurement scoped to the join-arrangement
# rebuild we're trying to measure.
return [
self.view_ten(),
TdAction(f"""
> CREATE MATERIALIZED VIEW v1
AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) FROM v1
{self.n()}

> CREATE CLUSTER join_cluster SIZE 'scale=1,workers=16', REPLICATION FACTOR 1
"""),
]

def benchmark(self) -> MeasurementSource:
# Match HydrateIndex's pattern: take the cluster offline *before*
# defining the object so the dataflow doesn't pre-hydrate. The
# `REPLICATION FACTOR 1` flip after `/* A */` is the actual
# hydration trigger we want to time.
return Td(f"""
> DROP MATERIALIZED VIEW IF EXISTS v2

> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 0)

> CREATE MATERIALIZED VIEW v2
IN CLUSTER join_cluster
AS SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)

> SELECT 1
/* A */
1
> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = join_cluster
> SELECT * FROM v2
/* B */
{self.n()}
> SET CLUSTER = default
""")


class DifferentialJoinHydrationBaseline(DifferentialJoinHydration):
"""Hydration measurement with the paged batcher disabled (current
production path). Compare against `DifferentialJoinHydrationFile` to
see if user-space file-backed spill beats OS swap under pressure.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = false;
""")


class DifferentialJoinHydrationFile(DifferentialJoinHydration):
"""Hydration time with the column-paged batcher on, file backend, and
a tight budget fraction so the merge-batcher transient spills rather
than competing with the spine for RAM.

`budget_fraction = 0.01` (1% of announced memory limit) lands in the
clamp floors of the worker-init derivation (per-worker 16 MiB,
shared 128 MiB), giving us the same effective sizing we benchmarked
before the fraction-knob refactor.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;
ALTER SYSTEM SET enable_column_paged_batcher_spill = true;
ALTER SYSTEM SET column_paged_batcher_budget_fraction = 0.01;
""")


class FullOuterJoin(Dataflow):
def benchmark(self) -> BenchmarkingSequence:
columns_select = ", ".join(
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ def get_default_system_parameters(
"enable_compute_half_join2",
"enable_mz_join_core",
"linear_join_yielding",
"enable_column_paged_batcher",
"enable_column_paged_batcher_spill",
"column_paged_batcher_budget_fraction",
"enable_lgalloc_eager_reclamation",
"lgalloc_background_interval",
"lgalloc_file_growth_dampener",
Expand Down
7 changes: 7 additions & 0 deletions misc/python/materialize/mzcompose/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ class ServiceConfig(TypedDict, total=False):
security_opt: list[str] | None
"""Additional security options to apply to the container."""

memswap_limit: int | str | None
"""Total memory+swap limit (Docker `memswap_limit`). `-1` disables the swap
limit, leaving only the memory limit in effect."""

mem_swappiness: int | None
"""Container swap tendency, 0–100 (Docker `mem_swappiness`)."""


class Service:
"""A Docker Compose service in a `Composition`.
Expand Down
13 changes: 13 additions & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
environment_id: str | None = None,
environment_extra: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
restart: str = "no",
Expand Down Expand Up @@ -93,6 +95,17 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls aren't part of compose's `deploy.resources` schema; they
# live as top-level compose v2 service keys (`memswap_limit`,
# `mem_swappiness`). Setting `memswap_limit > mem_limit` enables the
# container to use host swap when RAM pressure builds, which lets the
# kernel page out anonymous memory rather than OOM-killing. Useful for
# benchmarking "OS swap" as a baseline vs application-managed spill.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

config.update(
{
"command": options,
Expand Down
11 changes: 11 additions & 0 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(
volumes_extra: list[str] = [],
depends_on: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
persist_blob_url: str | None = None,
Expand Down Expand Up @@ -332,6 +334,15 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls live as top-level compose v2 service keys, not under
# `deploy.resources`. `memswap_limit > mem_limit` lets the container use
# host swap so the kernel can page out anonymous memory rather than OOM.
# `mem_swappiness=100` biases the kernel toward swapping aggressively.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

if sanity_restart:
# Workaround for https://github.com/docker/compose/issues/11133
config["labels"] = {"sanity_restart": True}
Expand Down
10 changes: 10 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,16 @@ def __init__(
self.flags_with_values["enable_upsert_v2"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_coalesce_case_transform"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_compute_sync_mv_sink"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_column_paged_batcher"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_column_paged_batcher_spill"] = (
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["column_paged_batcher_budget_fraction"] = [
"0.0",
"0.01",
"0.05",
"0.25",
]

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down
59 changes: 59 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,62 @@ pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
"Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
);

/// Use the column-paged merge batcher code path at arrange sites. When
/// `true`, arrange operators use `Col2ValPagedBatcher` (in
/// `mz_timely_util::columnar`) and `RowRowColPagedBuilder` (in
/// `mz_row_spine`) — the columnar-native batcher that the pager can
/// spill (gated by [`ENABLE_COLUMN_PAGED_BATCHER_SPILL`]). When `false`
/// (the default), the same arrange sites use the legacy
/// `Col2ValBatcher` / `RowRowBuilder` (columnation-merger) path that
/// shipped before #36627. Read at operator construction time; flips
/// take effect on dataflows created after the change.
///
/// Disabled by default while the new path is stabilizing.
/// `DifferentialJoinHydration*` feature-benchmark scenarios opt in
/// explicitly so the spill path is measured.
pub const ENABLE_COLUMN_PAGED_BATCHER: Config<bool> = Config::new(
"enable_column_paged_batcher",
false,
"Use the columnar-native paged merge batcher at arrange sites. When `false` (default), \
arranges fall back to the legacy columnation `Col2ValBatcher` / `RowRowBuilder` path.",
);

/// Allow the column-paged batcher's pager to actually evict chunks
/// under memory pressure. Only meaningful when
/// [`ENABLE_COLUMN_PAGED_BATCHER`] is `true`; with the spill flag off
/// the pager keeps every chunk resident regardless of budget.
///
/// Off by default, even when the batcher path itself is on, so the
/// no-pressure case stays a pure resident operation. Tune the budget /
/// backend via [`COLUMN_PAGED_BATCHER_BUDGET_FRACTION`].
pub const ENABLE_COLUMN_PAGED_BATCHER_SPILL: Config<bool> = Config::new(
"enable_column_paged_batcher_spill",
false,
"Allow the column-paged batcher's pager to evict chunks under memory pressure. Only \
meaningful when `enable_column_paged_batcher = true`.",
);

/// Total resident-byte budget the column-paged batcher's tiered policy
/// (`mz_timely_util::column_pager::policy::TieredPolicy`) is allowed to
/// hold across all workers in this process, expressed as a fraction of
/// the replica's announced memory limit. A single
/// process-wide pool tracks all resident chunks; allocations beyond the
/// pool spill to the configured backend.
///
/// `0.05` (5%) is a reasonable starting point: large enough that the
/// per-call ColumnBuilder ship-threshold (~2 MiB) fits multiple chunks
/// per worker, small enough that the merge-batcher's transient state
/// doesn't crowd out the spine. Set lower to spill more aggressively
/// under pressure. The computed budget is floored at 128 MiB so the
/// no-pressure case doesn't page per chunk. Ignored when
/// `enable_column_paged_batcher_spill` is `false`.
pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config<f64> = Config::new(
"column_paged_batcher_budget_fraction",
0.05,
"Fraction of replica memory the column-paged batcher's tiered policy may hold resident \
before spilling to the backend. Total budget = max(mem_limit * fraction, 128 MiB).",
);

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
"enable_mz_join_core",
Expand Down Expand Up @@ -424,4 +480,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL)
.add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION)
.add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS)
.add(&ENABLE_COLUMN_PAGED_BATCHER)
.add(&ENABLE_COLUMN_PAGED_BATCHER_SPILL)
.add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION)
}
Loading
Loading