Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 12 additions & 42 deletions sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,7 @@ pub fn create_global_runtime(
crate::memory_guard::mark_spill_disabled();
DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled)
} else {
let effective_spill_limit = if spill_limit == 0 {
resolve_dynamic_spill_limit(spill_dir)
} else {
spill_limit as u64
};
let effective_spill_limit = spill_limit as u64;

// Wipe leaked entries from a prior non-graceful shutdown.
//
Expand Down Expand Up @@ -1125,36 +1121,6 @@ pub async unsafe fn fetch_by_row_ids(
Ok(wrap_stream_as_handle(df_stream, manager.cpu_executor(), runtime, context_id))
}

/// it; the failure mode is documented here to keep the dispatch contract
/// explicit.
/// Resolve the dynamic spill limit based on available disk space.
/// Uses 80% of available space on the spill directory's filesystem.
/// Falls back to 8GB if disk space cannot be determined.
fn resolve_dynamic_spill_limit(spill_dir: &str) -> u64 {
const FRACTION: f64 = 0.80;
const FALLBACK: u64 = 8 * 1024 * 1024 * 1024; // 8GB

let _ = std::fs::create_dir_all(spill_dir);

match crate::memory_guard::available_disk_space(spill_dir) {
Some(available) => {
let limit = (available as f64 * FRACTION) as u64;
log::info!(
"Dynamic spill limit: {} bytes (80% of {} available on {})",
limit, available, spill_dir
);
limit
}
None => {
log::warn!(
"Could not determine disk space for '{}', using fallback {}GB",
spill_dir, FALLBACK / (1024 * 1024 * 1024)
);
FALLBACK
}
}
}

/// Inspect substrait plan bytes for routing signals.
/// Returns (has_index_filter, has_row_id).
fn inspect_plan_bytes(plan_bytes: &[u8]) -> (bool, bool) {
Expand Down Expand Up @@ -2003,11 +1969,10 @@ mod tests {
#[test]
fn create_global_runtime_with_spill_dir_enables_disk_manager() {
// Non-empty spill_dir takes the Directories(...) path. tmp_files_enabled() must
// be true so spill attempts succeed. Passing spill_limit=0 also exercises the
// dynamic-limit resolver (resolve_dynamic_spill_limit + set_spill_dir). The
// budget must NOT be Disabled — set_spill_dir flips SPILL_ENABLED on. Whether
// it's Available or Critical depends on the test host's free disk; both prove
// the enabled-path branch is taken.
// be true so spill attempts succeed. The budget must NOT be Disabled —
// set_spill_dir flips SPILL_ENABLED on. Whether it's Available or Critical
// depends on the test host's free disk; both prove the enabled-path branch
// is taken.
//
// Also doubles as a startup-cleanup regression check: drop a "leaked" sentinel
// file in the directory before the call and assert it's gone after.
Expand All @@ -2020,7 +1985,7 @@ mod tests {
fs::write(&sentinel, b"stale spill data").expect("seed sentinel");
assert!(sentinel.exists(), "sentinel must exist before runtime build");

let ptr = create_global_runtime(64 * 1024 * 1024, 0, spill_path, 0).expect("runtime build");
let ptr = create_global_runtime(64 * 1024 * 1024, 0, spill_path, 1024 * 1024 * 1024).expect("runtime build");
assert!(ptr > 0);

// Phase 1 renames the sentinel file to leaked_from_prior_run.tmp.stale
Expand All @@ -2036,6 +2001,11 @@ mod tests {
runtime.runtime_env.disk_manager.tmp_files_enabled(),
"expected DiskManagerMode::Directories when spill_dir is set"
);
assert_eq!(
runtime.runtime_env.disk_manager.max_temp_directory_size(),
1024 * 1024 * 1024,
"DiskManager cap must equal the positive spill_limit passed in"
);
assert_ne!(
crate::memory_guard::per_query_spill_budget(),
crate::memory_guard::SpillBudget::Disabled,
Expand Down Expand Up @@ -2072,7 +2042,7 @@ mod tests {
assert!(top_file.exists());
assert!(nested_file.exists());

let ptr = create_global_runtime(64 * 1024 * 1024, 0, spill_path, 0).expect("runtime build");
let ptr = create_global_runtime(64 * 1024 * 1024, 0, spill_path, 1024 * 1024 * 1024).expect("runtime build");
assert!(ptr > 0);

// Phase 1: original names gone (renamed to *.stale).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ pub extern "C" fn df_set_reduce_target_partitions(value: i64) {
api::set_reduce_target_partitions(value);
}

/// Sets the spill-exemption cap in bytes (the total in-flight allocation allowed
/// through the 85% spill gate by spillable consumers so they can finish spilling).
/// Live-tunable; takes effect on the next try_grow. Java: NativeBridge.setSpillExemptCapBytes(long).
#[no_mangle]
pub extern "C" fn df_set_spill_exempt_cap_bytes(value: i64) {
crate::memory_guard::set_spill_exempt_cap_bytes(value.max(0) as u64);
}

/// Sets memory guard thresholds. Values are thresholds multiplied by 1000
/// (e.g., 700 = 0.70, 850 = 0.85, 950 = 0.95).
#[no_mangle]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ pub mod native_node_stats;
pub mod search_stats;
pub mod stats;
pub mod task_monitors;

#[cfg(test)]
mod spill_e2e_test;
196 changes: 169 additions & 27 deletions sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,57 @@ use std::sync::Arc;
use datafusion::common::DataFusionError;
use datafusion::execution::memory_pool::{MemoryPool, MemoryReservation};

/// Outcome of the 85%→95% spill-gate decision for one reservation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SpillGateDecision {
/// Allow the request through so the spill can finish writing.
Exempt,
/// Reject: not a spillable consumer, so reject it to make it spill.
RejectNonSpillable,
/// Reject: spillable, but the request is larger than the remaining budget.
RejectCapped,
}

/// Decides what to do with a memory request whose RSS is in the 85%-to-95%
/// band (above the spill threshold, below the critical threshold).
///
/// A request from a spillable consumer is allowed through ("exempt"). When such
/// a consumer hits the spill threshold it spills: it frees its existing data
/// first, then needs a small temporary buffer to write the spilled data out.
/// That buffer must be allowed to allocate, otherwise the spill cannot finish
/// and the query gets stuck. The exemption is bounded by a fixed byte budget,
/// `cap - outstanding`, so that many spills happening at once cannot add up to
/// enough memory to cross the 95% critical threshold.
///
/// The decision deliberately does not look at how many pages the allocator has
/// freed-but-not-yet-returned. Right after a spill frees its data, the OS RSS
/// has not dropped yet (the allocator holds the pages), so that signal reads as
/// near-zero exactly when the spill needs its buffer, and an earlier version of
/// this code wrongly rejected the buffer ("Failed to reserve memory for sort
/// during spill"). The fixed byte budget avoids that. The caller still applies
/// the pool-limit check and the 95% critical check, so the node stays protected
/// from running out of memory.
///
/// Kept as a separate pure function so it can be unit-tested without needing
/// real process memory. The 95% critical check runs in the caller before this
/// is called, so this can never allow crossing 95%.
pub(crate) fn spill_gate_decision(
can_spill: bool,
additional: usize,
cap: usize,
outstanding: usize,
) -> SpillGateDecision {
if !can_spill {
return SpillGateDecision::RejectNonSpillable;
}
let cap_room = cap.saturating_sub(outstanding);
if additional <= cap_room {
SpillGateDecision::Exempt
} else {
SpillGateDecision::RejectCapped
}
}

/// A `MemoryPool` whose limit can be changed at runtime.
///
/// Behaviour matches `GreedyMemoryPool` exactly, except the limit is stored
Expand All @@ -32,6 +83,12 @@ pub struct DynamicLimitPool {
used: AtomicUsize,
dynamic_limit: Arc<AtomicUsize>,
tripped_count: Arc<AtomicUsize>,
/// Total bytes currently allowed through the 85% gate by the spill
/// exemption that have not yet been freed. This is the `outstanding` value
/// the exemption budget is measured against, so concurrent spills together
/// cannot exceed the cap. Increased when a request is exempted, decreased in
/// `shrink` as memory is freed (saturating, so it never goes below zero).
exempt_outstanding: AtomicUsize,
}

/// Handle to change the pool limit at runtime.
Expand Down Expand Up @@ -76,6 +133,7 @@ impl DynamicLimitPool {
used: AtomicUsize::new(0),
dynamic_limit: limit,
tripped_count: tripped,
exempt_outstanding: AtomicUsize::new(0),
};
(pool, handle)
}
Expand Down Expand Up @@ -120,28 +178,34 @@ impl MemoryPool for DynamicLimitPool {

fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
// Give back exemption budget as memory is freed (a spill writing out its
// data calls `shrink`). Subtract without going below zero. A normal,
// non-exempt shrink may give budget back a little early, which is safe:
// it only makes the exemption stricter, never looser.
let _ = self
.exempt_outstanding
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |out| {
Some(out.saturating_sub(shrink))
});
}

fn try_grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) -> Result<(), DataFusionError> {
// Two-tier RSS guard for in-flight queries:
//
// - operator (85%): reject to trigger spill. The operator will flush its hash
// table to disk and retry. This is recoverable — the query continues.
// - critical (95%): hard reject to prevent OOM. Last resort — even spill can't
// save the node at this pressure level.
// Two memory checks based on process RSS:
// - At 95% (critical): reject every request. This is the hard limit that
// protects the node from running out of memory.
// - At 85% (spill): reject requests so consumers spill to disk, except
// spillable consumers, which are allowed through so they can finish
// spilling (see `spill_gate_decision`). The exemption is bounded by a
// byte budget, and the pool-limit and 95% checks still apply, so this
// cannot push the node over the limit.
//
// Between 85-95%, the override (below) ensures spill sort buffers can still
// allocate: when the CAS fails and RSS has dropped below operator threshold
// (because the hash table was freed during spill), the override fires and
// allows the sort allocation through.
//
// The adaptive cache (cached_resident_bytes) bypasses the 100ms cache when
// the last value was above operator threshold — so the override sees the
// fresh (lower) RSS after spill frees memory, not a stale peak.
// Set if a spillable consumer is allowed through the 85% check; its
// budget is only counted after the allocation actually succeeds below.
let mut exempted = false;
let limit = self.dynamic_limit.load(Ordering::Acquire);
let resident = crate::memory_guard::cached_resident_bytes();
if resident > 0 && limit >= 16 * 1024 * 1024 {
Expand All @@ -151,9 +215,9 @@ impl MemoryPool for DynamicLimitPool {
let resident_usize = resident as usize;

// Critical (95%): hard reject — OOM imminent, protect the node.
// Absolute and pre-CAS for every consumer, spillable or not.
if resident_usize > critical_bytes {
self.tripped_count.fetch_add(1, Ordering::Relaxed);
let used = self.used.load(Ordering::Relaxed);
return Err(crate::native_error::pool_limit_error(
additional,
reservation.consumer().name(),
Expand All @@ -163,19 +227,29 @@ impl MemoryPool for DynamicLimitPool {
));
}

// Operator (85%): soft reject — triggers spill. The operator will
// flush to disk, free memory, then retry. Spill sort buffers will
// succeed on retry because RSS drops and the override allows them.
// RSS is between 85% and 95%. Allow a spillable consumer through so
// it can finish spilling; reject everything else so it spills.
if resident_usize > spill_bytes {
self.tripped_count.fetch_add(1, Ordering::Relaxed);
let used = self.used.load(Ordering::Relaxed);
return Err(crate::native_error::pool_limit_error(
additional,
reservation.consumer().name(),
reservation.size(),
0,
limit,
));
let can_spill = reservation.consumer().can_spill();
let outstanding = self.exempt_outstanding.load(Ordering::Relaxed);
let cap = crate::memory_guard::spill_exempt_cap_bytes();

match spill_gate_decision(can_spill, additional, cap, outstanding) {
// Continue to the allocation below. Only count this against
// the budget if the allocation actually succeeds (see
// `exempted`), so a failed one doesn't use up the budget.
SpillGateDecision::Exempt => exempted = true,
_ => {
self.tripped_count.fetch_add(1, Ordering::Relaxed);
return Err(crate::native_error::pool_limit_error(
additional,
reservation.consumer().name(),
reservation.size(),
0,
limit,
));
}
}
}
}

Expand All @@ -190,6 +264,11 @@ impl MemoryPool for DynamicLimitPool {
});

if cas_result.is_ok() {
// Charge the exemption budget only now that the grow succeeded;
// released saturating in `shrink`.
if exempted {
self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
}
return Ok(());
}

Expand All @@ -210,6 +289,9 @@ impl MemoryPool for DynamicLimitPool {
let _ = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| {
u.checked_add(additional)
});
if exempted {
self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
}
return Ok(());
}
}
Expand Down Expand Up @@ -535,4 +617,64 @@ mod tests {
);
assert_eq!(pool.reserved(), 4096);
}

// ---- Spill-gate exemption rule (pure, deterministic) ----

const MB: usize = 1024 * 1024;

#[test]
fn test_spill_gate_rejects_non_spillable() {
// A non-spillable consumer in the 85% band is always rejected so it
// triggers spill — regardless of the cap budget.
let d = spill_gate_decision(false, 256 * 1024, 512 * MB, 0);
assert_eq!(d, SpillGateDecision::RejectNonSpillable);
}

#[test]
fn test_spill_gate_exempts_spillable_within_cap() {
// Spillable, small request, full cap available → exempt.
// Mirrors the observed 256 KiB spill-trigger allocation.
let d = spill_gate_decision(true, 262_528, 512 * MB, 0);
assert_eq!(d, SpillGateDecision::Exempt);
}

#[test]
fn test_spill_gate_exempts_buffer_within_budget() {
// A 193 MB spill buffer must be allowed through as long as it fits the
// budget. An earlier version also looked at how much memory the allocator
// had freed but not yet returned, which reads as near-zero right when a
// spill needs its buffer, so it wrongly rejected this and the spill could
// not finish. The decision now depends only on the budget.
let d = spill_gate_decision(true, 193 * MB, 512 * MB, 0);
assert_eq!(d, SpillGateDecision::Exempt);
// A buffer larger than the whole budget is still rejected.
let d2 = spill_gate_decision(true, 600 * MB, 512 * MB, 0);
assert_eq!(d2, SpillGateDecision::RejectCapped);
}

#[test]
fn test_spill_gate_caps_when_budget_exhausted() {
// Most of the budget is already used (500MB of a 512MB cap, leaving
// 12MB). A 64 MB request does not fit, so it is rejected.
let d = spill_gate_decision(true, 64 * MB, 512 * MB, 500 * MB);
assert_eq!(d, SpillGateDecision::RejectCapped);
// A request that fits the remaining 12 MB is allowed through.
let d2 = spill_gate_decision(true, 8 * MB, 512 * MB, 500 * MB);
assert_eq!(d2, SpillGateDecision::Exempt);
}

#[test]
fn test_spill_gate_bound_is_cap_minus_outstanding() {
// The sole bound is the hard cap minus outstanding exemptions.
// 40MB room (512 - 472), 60MB request → rejected.
assert_eq!(
spill_gate_decision(true, 60 * MB, 512 * MB, 472 * MB),
SpillGateDecision::RejectCapped
);
// Same request fits when the cap is unused.
assert_eq!(
spill_gate_decision(true, 60 * MB, 512 * MB, 0),
SpillGateDecision::Exempt
);
}
}
Loading
Loading