Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions misc/python/materialize/feature_benchmark/benchmark_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def this_as_str(self) -> str:
return f"{self.this():>11.3f}"

def other(self) -> T:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); treat the absent baseline as `None` so
# report rendering falls through to its `None` formatting.
if len(self._points) < 2:
return None # type: ignore[return-value]
return self._points[1]

def other_as_str(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def get_threshold(self, metric: BenchmarkScenarioMetric) -> float:
return self.threshold_by_measurement_type[metric.measurement_type]

def ratio(self, metric: BenchmarkScenarioMetric) -> float | None:
if metric._points[0] is None or metric._points[1] is None:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); there's no baseline to compare against.
if (
len(metric._points) < 2
or metric._points[0] is None
or metric._points[1] is None
):
return None
else:
return metric._points[0] / metric._points[1]
Expand Down
151 changes: 151 additions & 0 deletions misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,157 @@ def benchmark(self) -> MeasurementSource:
""")


class DifferentialJoinColumnPaged(Dataflow):
"""Same shape as `DifferentialJoin`, but with the column-paged merge
batcher enabled for the linear-join arrange stage.

Compare against `DifferentialJoin` to gauge the steady-state overhead of
the paged path (resident chunks plus byte-budget bookkeeping) when no
pressure forces spill. To measure spill cost, see
`DifferentialJoinHydrationFile`.
"""

@classmethod
def can_run(cls, version: MzVersion) -> bool:
# Requires `enable_column_paged_batcher`, introduced in the merge
# batcher rework that lands in 26.28.0. Skips on both sides while
# we're still on `26.28.0-dev.0` because `MzVersion` strips the git
# hash and can't distinguish dev builds with the dyncfg from dev
# builds without it.
return version > MzVersion.create(26, 28, 0)

def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""),
]

def benchmark(self) -> MeasurementSource:
return Td(f"""
> SELECT 1;
/* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
/* B */
{self.n()}
""")


class DifferentialJoinHydration(Dataflow):
"""Non-leaf parent for the linear-join hydration benchmark family.

Holds the shared `init` / `benchmark` (replica-toggle hydration loop) so
Baseline and File variants only need to override `shared()` with the
dyncfgs they want set. Has subclasses, so the feature-benchmark runner
treats it as non-leaf and never executes it directly — pick one of the
leaf classes via `--root-scenario`.

Run both leaves under a memory-capped Materialized (`--this-memory=2g`)
so the baseline has to swap and the paged-file variant has somewhere
predictable to spill.
"""

# SCALE=8 → 100M rows per side, ~1.6 GiB per side input. Two sides plus
# the join arrangement (typically 2–4× input) reliably exceeds a few
# GiB total; a 2g container cap forces real swap pressure on the
# baseline. File variant's 16 MiB per-worker + 128 MiB shared budget
# means almost everything spills under that cap.
SCALE = 8

@classmethod
def can_run(cls, version: MzVersion) -> bool:
# Both leaf variants set `enable_column_paged_batcher` (baseline =
# false, file = true), so gate on the dyncfg's introduction in
# 26.28.0. Comment in `DifferentialJoinColumnPaged.can_run` explains
# why dev versions can't distinguish the dyncfg's presence.
return version > MzVersion.create(26, 28, 0)

def init(self) -> list[Action]:
# `v1` lives on the default cluster, not `join_cluster`, so the
# replication-factor toggle in `benchmark` only tears down `v2`'s
# dataflow. Keeps the measurement scoped to the join-arrangement
# rebuild we're trying to measure.
return [
self.view_ten(),
TdAction(f"""
> CREATE MATERIALIZED VIEW v1
AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) FROM v1
{self.n()}

> CREATE CLUSTER join_cluster SIZE 'scale=1,workers=16', REPLICATION FACTOR 1
"""),
]

def benchmark(self) -> MeasurementSource:
# Match HydrateIndex's pattern: take the cluster offline *before*
# defining the object so the dataflow doesn't pre-hydrate. The
# `REPLICATION FACTOR 1` flip after `/* A */` is the actual
# hydration trigger we want to time.
return Td(f"""
> DROP MATERIALIZED VIEW IF EXISTS v2

> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 0)

> CREATE MATERIALIZED VIEW v2
IN CLUSTER join_cluster
AS SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)

> SELECT 1
/* A */
1
> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = join_cluster
> SELECT * FROM v2
/* B */
{self.n()}
> SET CLUSTER = default
""")


class DifferentialJoinHydrationBaseline(DifferentialJoinHydration):
"""Hydration measurement with the paged batcher disabled (current
production path). Compare against `DifferentialJoinHydrationFile` to
see if user-space file-backed spill beats OS swap under pressure.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = false;
""")


class DifferentialJoinHydrationFile(DifferentialJoinHydration):
"""Hydration time with the column-paged batcher on, file backend, and
a tight budget fraction so the merge-batcher transient spills rather
than competing with the spine for RAM.

`budget_fraction = 0.01` (1% of announced memory limit) lands in the
clamp floors of the worker-init derivation (per-worker 16 MiB,
shared 128 MiB), giving us the same effective sizing we benchmarked
before the fraction-knob refactor.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;
ALTER SYSTEM SET enable_column_paged_batcher_spill = true;
ALTER SYSTEM SET column_paged_batcher_budget_fraction = 0.01;
""")


class FullOuterJoin(Dataflow):
def benchmark(self) -> BenchmarkingSequence:
columns_select = ", ".join(
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ def get_default_system_parameters(
"enable_compute_half_join2",
"enable_mz_join_core",
"linear_join_yielding",
"enable_column_paged_batcher",
"enable_column_paged_batcher_spill",
"column_paged_batcher_budget_fraction",
"enable_lgalloc_eager_reclamation",
"lgalloc_background_interval",
"lgalloc_file_growth_dampener",
Expand Down
7 changes: 7 additions & 0 deletions misc/python/materialize/mzcompose/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ class ServiceConfig(TypedDict, total=False):
security_opt: list[str] | None
"""Additional security options to apply to the container."""

memswap_limit: int | str | None
"""Total memory+swap limit (Docker `memswap_limit`). `-1` disables the swap
limit, leaving only the memory limit in effect."""

mem_swappiness: int | None
"""Container swap tendency, 0–100 (Docker `mem_swappiness`)."""


class Service:
"""A Docker Compose service in a `Composition`.
Expand Down
13 changes: 13 additions & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
environment_id: str | None = None,
environment_extra: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
restart: str = "no",
Expand Down Expand Up @@ -93,6 +95,17 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls aren't part of compose's `deploy.resources` schema; they
# live as top-level compose v2 service keys (`memswap_limit`,
# `mem_swappiness`). Setting `memswap_limit > mem_limit` enables the
# container to use host swap when RAM pressure builds, which lets the
# kernel page out anonymous memory rather than OOM-killing. Useful for
# benchmarking "OS swap" as a baseline vs application-managed spill.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

config.update(
{
"command": options,
Expand Down
11 changes: 11 additions & 0 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(
volumes_extra: list[str] = [],
depends_on: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
persist_blob_url: str | None = None,
Expand Down Expand Up @@ -332,6 +334,15 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls live as top-level compose v2 service keys, not under
# `deploy.resources`. `memswap_limit > mem_limit` lets the container use
# host swap so the kernel can page out anonymous memory rather than OOM.
# `mem_swappiness=100` biases the kernel toward swapping aggressively.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

if sanity_restart:
# Workaround for https://github.com/docker/compose/issues/11133
config["labels"] = {"sanity_restart": True}
Expand Down
10 changes: 10 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,16 @@ def __init__(
self.flags_with_values["enable_upsert_v2"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_coalesce_case_transform"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_compute_sync_mv_sink"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_column_paged_batcher"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_column_paged_batcher_spill"] = (
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["column_paged_batcher_budget_fraction"] = [
"0.0",
"0.01",
"0.05",
"0.25",
]

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down
59 changes: 59 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,62 @@ pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
"Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
);

/// Use the column-paged merge batcher code path at arrange sites. When
/// `true`, arrange operators use `Col2ValPagedBatcher` (in
/// `mz_timely_util::columnar`) and `RowRowColPagedBuilder` (in
/// `mz_row_spine`) — the columnar-native batcher that the pager can
/// spill (gated by [`ENABLE_COLUMN_PAGED_BATCHER_SPILL`]). When `false`
/// (the default), the same arrange sites use the legacy
/// `Col2ValBatcher` / `RowRowBuilder` (columnation-merger) path that
/// shipped before #36627. Read at operator construction time; flips
/// take effect on dataflows created after the change.
///
/// Disabled by default while the new path is stabilizing.
/// `DifferentialJoinHydration*` feature-benchmark scenarios opt in
/// explicitly so the spill path is measured.
pub const ENABLE_COLUMN_PAGED_BATCHER: Config<bool> = Config::new(
"enable_column_paged_batcher",
false,
"Use the columnar-native paged merge batcher at arrange sites. When `false` (default), \
arranges fall back to the legacy columnation `Col2ValBatcher` / `RowRowBuilder` path.",
);

/// Allow the column-paged batcher's pager to actually evict chunks
/// under memory pressure. Only meaningful when
/// [`ENABLE_COLUMN_PAGED_BATCHER`] is `true`; with the spill flag off
/// the pager keeps every chunk resident regardless of budget.
///
/// Off by default, even when the batcher path itself is on, so the
/// no-pressure case stays a pure resident operation. Tune the budget /
/// backend via [`COLUMN_PAGED_BATCHER_BUDGET_FRACTION`].
pub const ENABLE_COLUMN_PAGED_BATCHER_SPILL: Config<bool> = Config::new(
"enable_column_paged_batcher_spill",
false,
"Allow the column-paged batcher's pager to evict chunks under memory pressure. Only \
meaningful when `enable_column_paged_batcher = true`.",
);

/// Total resident-byte budget the column-paged batcher's tiered policy
/// (`mz_timely_util::column_pager::policy::TieredPolicy`) is allowed to
/// hold across all workers in this process, expressed as a fraction of
/// the replica's announced memory limit. A single
/// process-wide pool tracks all resident chunks; allocations beyond the
/// pool spill to the configured backend.
///
/// `0.05` (5%) is a reasonable starting point: large enough that the
/// per-call ColumnBuilder ship-threshold (~2 MiB) fits multiple chunks
/// per worker, small enough that the merge-batcher's transient state
/// doesn't crowd out the spine. Set lower to spill more aggressively
/// under pressure. The computed budget is floored at 128 MiB so the
/// no-pressure case doesn't page per chunk. Ignored when
/// `enable_column_paged_batcher_spill` is `false`.
pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config<f64> = Config::new(
"column_paged_batcher_budget_fraction",
0.05,
"Fraction of replica memory the column-paged batcher's tiered policy may hold resident \
before spilling to the backend. Total budget = max(mem_limit * fraction, 128 MiB).",
);

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
"enable_mz_join_core",
Expand Down Expand Up @@ -424,4 +480,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL)
.add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION)
.add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS)
.add(&ENABLE_COLUMN_PAGED_BATCHER)
.add(&ENABLE_COLUMN_PAGED_BATCHER_SPILL)
.add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION)
}
Loading
Loading