Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions misc/python/materialize/feature_benchmark/benchmark_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def this_as_str(self) -> str:
return f"{self.this():>11.3f}"

def other(self) -> T:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); treat the absent baseline as `None` so
# report rendering falls through to its `None` formatting.
if len(self._points) < 2:
return None # type: ignore[return-value]
return self._points[1]

def other_as_str(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def get_threshold(self, metric: BenchmarkScenarioMetric) -> float:
return self.threshold_by_measurement_type[metric.measurement_type]

def ratio(self, metric: BenchmarkScenarioMetric) -> float | None:
if metric._points[0] is None or metric._points[1] is None:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); there's no baseline to compare against.
if (
len(metric._points) < 2
or metric._points[0] is None
or metric._points[1] is None
):
return None
else:
return metric._points[0] / metric._points[1]
Expand Down
133 changes: 133 additions & 0 deletions misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,139 @@ def benchmark(self) -> MeasurementSource:
""")


class DifferentialJoinColumnPaged(Dataflow):
"""Same shape as `DifferentialJoin`, but with the column-paged merge
batcher enabled for the linear-join arrange stage.

Compare against `DifferentialJoin` to gauge the steady-state overhead of
the paged path (resident chunks plus byte-budget bookkeeping) when no
pressure forces spill. To measure spill cost, see
`DifferentialJoinHydrationFile`.
"""

def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""),
]

def benchmark(self) -> MeasurementSource:
return Td(f"""
> SELECT 1;
/* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
/* B */
{self.n()}
""")


class DifferentialJoinHydration(Dataflow):
"""Non-leaf parent for the linear-join hydration benchmark family.

Holds the shared `init` / `benchmark` (replica-toggle hydration loop) so
Baseline and File variants only need to override `shared()` with the
dyncfgs they want set. Has subclasses, so the feature-benchmark runner
treats it as non-leaf and never executes it directly — pick one of the
leaf classes via `--root-scenario`.

Run both leaves under a memory-capped Materialized (`--this-memory=2g`)
so the baseline has to swap and the paged-file variant has somewhere
predictable to spill.
"""

# SCALE=8 → 100M rows per side, ~1.6 GiB per side input. Two sides plus
# the join arrangement (typically 2–4× input) reliably exceeds a few
# GiB total; a 2g container cap forces real swap pressure on the
# baseline. File variant's 16 MiB per-worker + 128 MiB shared budget
# means almost everything spills under that cap.
SCALE = 8

def init(self) -> list[Action]:
# `v1` lives on the default cluster, not `join_cluster`, so the
# replication-factor toggle in `benchmark` only tears down `v2`'s
# dataflow. Keeps the measurement scoped to the join-arrangement
# rebuild we're trying to measure.
return [
self.view_ten(),
TdAction(f"""
> CREATE MATERIALIZED VIEW v1
AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) FROM v1
{self.n()}

> CREATE CLUSTER join_cluster SIZE 'scale=1,workers=16', REPLICATION FACTOR 1
"""),
]

def benchmark(self) -> MeasurementSource:
# Match HydrateIndex's pattern: take the cluster offline *before*
# defining the object so the dataflow doesn't pre-hydrate. The
# `REPLICATION FACTOR 1` flip after `/* A */` is the actual
# hydration trigger we want to time.
return Td(f"""
> DROP MATERIALIZED VIEW IF EXISTS v2

> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 0)

> CREATE MATERIALIZED VIEW v2
IN CLUSTER join_cluster
AS SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)

> SELECT 1
/* A */
1
> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = join_cluster
> SELECT * FROM v2
/* B */
{self.n()}
> SET CLUSTER = default
""")


class DifferentialJoinHydrationBaseline(DifferentialJoinHydration):
"""Hydration measurement with the paged batcher disabled (current
production path). Compare against `DifferentialJoinHydrationFile` to
see if user-space file-backed spill beats OS swap under pressure.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = false;
""")


class DifferentialJoinHydrationFile(DifferentialJoinHydration):
"""Hydration time with the column-paged batcher on, file backend, and
a tight budget fraction so the merge-batcher transient spills rather
than competing with the spine for RAM.

`budget_fraction = 0.01` (1% of announced memory limit) lands in the
clamp floors of the worker-init derivation (per-worker 16 MiB,
shared 128 MiB), giving us the same effective sizing we benchmarked
before the fraction-knob refactor.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;
ALTER SYSTEM SET column_paged_batcher_budget_fraction = 0.01;
""")


class FullOuterJoin(Dataflow):
def benchmark(self) -> BenchmarkingSequence:
columns_select = ", ".join(
Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ def get_default_system_parameters(
"enable_compute_half_join2",
"enable_mz_join_core",
"linear_join_yielding",
"enable_column_paged_batcher",
"column_paged_batcher_budget_fraction",
"enable_lgalloc_eager_reclamation",
"lgalloc_background_interval",
"lgalloc_file_growth_dampener",
Expand Down
7 changes: 7 additions & 0 deletions misc/python/materialize/mzcompose/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ class ServiceConfig(TypedDict, total=False):
security_opt: list[str] | None
"""Additional security options to apply to the container."""

memswap_limit: int | str | None
"""Total memory+swap limit (Docker `memswap_limit`). `-1` disables the swap
limit, leaving only the memory limit in effect."""

mem_swappiness: int | None
"""Container swap tendency, 0–100 (Docker `mem_swappiness`)."""


class Service:
"""A Docker Compose service in a `Composition`.
Expand Down
13 changes: 13 additions & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
environment_id: str | None = None,
environment_extra: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
restart: str = "no",
Expand Down Expand Up @@ -93,6 +95,17 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls aren't part of compose's `deploy.resources` schema; they
# live as top-level compose v2 service keys (`memswap_limit`,
# `mem_swappiness`). Setting `memswap_limit > mem_limit` enables the
# container to use host swap when RAM pressure builds, which lets the
# kernel page out anonymous memory rather than OOM-killing. Useful for
# benchmarking "OS swap" as a baseline vs application-managed spill.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

config.update(
{
"command": options,
Expand Down
11 changes: 11 additions & 0 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(
volumes_extra: list[str] = [],
depends_on: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
persist_blob_url: str | None = None,
Expand Down Expand Up @@ -332,6 +334,15 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls live as top-level compose v2 service keys, not under
# `deploy.resources`. `memswap_limit > mem_limit` lets the container use
# host swap so the kernel can page out anonymous memory rather than OOM.
# `mem_swappiness=100` biases the kernel toward swapping aggressively.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

if sanity_restart:
# Workaround for https://github.com/docker/compose/issues/11133
config["labels"] = {"sanity_restart": True}
Expand Down
7 changes: 7 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,13 @@ def __init__(
self.flags_with_values["enable_upsert_v2"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_coalesce_case_transform"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_compute_sync_mv_sink"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_column_paged_batcher"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["column_paged_batcher_budget_fraction"] = [
"0.0",
"0.01",
"0.05",
"0.25",
]

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down
34 changes: 34 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,38 @@ pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
"Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
);

/// Install the column-pageable merge batcher on each compute worker, so
/// arrangements that route through it can spill chunks under memory
/// pressure rather than holding them all resident. Disabled by default;
/// the budget/backend knobs below tune the behavior when enabled.
pub const ENABLE_COLUMN_PAGED_BATCHER: Config<bool> = Config::new(
"enable_column_paged_batcher",
false,
"Install the column-paged merge batcher on each compute worker so it can spill under memory \
pressure.",
);

/// Total resident-byte budget the column-paged batcher's tiered policy
/// (`mz_timely_util::column_pager::policy::TieredPolicy`) is allowed to
/// hold across all workers in this process, expressed as a fraction of
/// the replica's announced memory limit. A single
/// process-wide pool tracks all resident chunks; allocations beyond the
/// pool spill to the configured backend.
///
/// `0.05` (5%) is a reasonable starting point: large enough that the
/// per-call ColumnBuilder ship-threshold (~2 MiB) fits multiple chunks
/// per worker, small enough that the merge-batcher's transient state
/// doesn't crowd out the spine. Set lower to spill more aggressively
/// under pressure. The computed budget is floored at 128 MiB so the
/// no-pressure case doesn't page per chunk. Ignored when
/// `enable_column_paged_batcher` is `false`.
pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config<f64> = Config::new(
"column_paged_batcher_budget_fraction",
0.05,
"Fraction of replica memory the column-paged batcher's tiered policy may hold resident \
before spilling to the backend. Total budget = max(mem_limit * fraction, 128 MiB).",
);

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
"enable_mz_join_core",
Expand Down Expand Up @@ -424,4 +456,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL)
.add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION)
.add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS)
.add(&ENABLE_COLUMN_PAGED_BATCHER)
.add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION)
}
44 changes: 43 additions & 1 deletion src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use mz_compute_types::plan::render_plan::RenderPlan;
use mz_dyncfg::ConfigSet;
use mz_expr::row::RowCollection;
use mz_expr::{RowComparator, SafeMfpPlan};
use mz_ore::cast::CastFrom;
use mz_ore::cast::{CastFrom, CastLossy};
use mz_ore::collections::CollectionExt;
use mz_ore::metrics::{MetricsRegistry, UIntGauge};
use mz_ore::now::EpochMillis;
Expand Down Expand Up @@ -308,6 +308,48 @@ impl ComputeState {
std::sync::atomic::Ordering::Relaxed,
);

// Apply column-paged-batcher configuration. Routes through
// `apply_tiered_config`, which reuses a process-wide `TieredPolicy`
// singleton — operator-driven tunes mutate the existing atomics
// rather than installing a fresh policy with a fresh budget atomic
// that would orphan in-flight resident tickets.
//
// Backend selection mirrors the lower-level `mz_ore::pager`
// already configured above: file when a scratch directory is
// available, swap otherwise.
{
use mz_ore::pager::Backend;
use mz_timely_util::column_pager::apply_tiered_config;

let enabled = ENABLE_COLUMN_PAGED_BATCHER.get(config);

// Budget derivation: fraction × announced memory limit, with a
// 128 MiB floor so the no-pressure case doesn't page per chunk.
// Falls back to a 4 GiB assumption if no limit was announced
// (e.g. dev environments).
const MIB: usize = 1024 * 1024;
const DEFAULT_MEM_LIMIT: usize = 4 * 1024 * MIB;
let mem_limit = crate::memory_limiter::get_memory_limit().unwrap_or(DEFAULT_MEM_LIMIT);
let fraction = COLUMN_PAGED_BATCHER_BUDGET_FRACTION.get(config).max(0.0);
let total = usize::cast_lossy(f64::cast_lossy(mem_limit) * fraction).max(128 * MIB);

let backend = if self.context.scratch_directory.is_some() {
Backend::File
} else {
Backend::Swap
};

debug!(
enabled,
?backend,
fraction,
mem_limit,
budget_bytes = total,
"column-paged batcher: applying tiered config",
);
apply_tiered_config(enabled, total, backend, None);
}

// Remember the maintenance interval locally to avoid reading it from the config set on
// every server iteration.
self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {

#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
/// A timestamp type that can be used for operations within MZ's dataflow layer.
pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
pub trait RenderTimestamp: MzTimestamp + Default + Refines<mz_repr::Timestamp> {
/// The system timestamp component of the timestamp.
///
/// This is useful for manipulating the system time, as when delaying
Expand Down
Loading