Skip to content

[analytics-datafusion] Fixes for native memory on cancel, a query hang, spill progress, threshold updates, and spill size default#22187

Open
gaurav-amz wants to merge 4 commits into
opensearch-project:mainfrom
gaurav-amz:datafusion-try-grow-fix
Open

[analytics-datafusion] Fixes for native memory on cancel, a query hang, spill progress, threshold updates, and spill size default#22187
gaurav-amz wants to merge 4 commits into
opensearch-project:mainfrom
gaurav-amz:datafusion-try-grow-fix

Conversation

@gaurav-amz

@gaurav-amz gaurav-amz commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Description

Five fixes in the DataFusion analytics backend. They all affect native (Rust-side)
memory on data nodes when queries run concurrently, get cancelled, or spill to disk.

1. Let a spilling query allocate the buffer it needs to finish spilling (memory.rs, memory_guard.rs)

Background — how spilling is driven here. The memory pool watches the node's actual
resident memory (RSS, read from jemalloc) rather than only its own bookkeeping, because some
operators allocate native memory before they reserve it in the pool, so the pool's own counter
can understate reality. There are two RSS lines:

  • 85% — the spill line. Above this, the pool starts rejecting memory requests. A rejection
    is not an error to the operator: spillable operators (sort, grouped aggregate) treat a
    rejected request as the signal "out of room — spill to disk now." So rejecting is how the
    pool tells a query to spill.
  • 95% — the critical line. Above this, every request is rejected outright to protect the
    node from running out of memory.

The problem. Spilling is not free — to write its in-memory data out, an operator first frees
that data and then needs a small amount of memory to run the sort/merge and write the spill
file. The catch is timing: right after the operator frees its data, the freed pages have not
been returned to the OS yet (the allocator still holds them), so RSS is still above 85% at the
exact moment the spill needs its working memory. The old code rejected that request too — so
the spill it had just ordered could never finish, and the query got stuck with the
"Failed to reserve memory for sort during spill" error.

The fix. In the band between 85% and 95%, the pool now distinguishes two cases:

  • A request from a non-spillable consumer is still rejected (it has no disk fallback, so
    letting it grow would only add pressure).
  • A request from a spillable consumer is allowed through, because that consumer is in the
    middle of doing exactly what we want — spilling — and this request is part of finishing it.

To keep this safe, the allowance is bounded by a fixed byte budget (SPILL_EXEMPT_CAP_BYTES,
default 512 MB), not by a second RSS line. This matters: an RSS-based limit would not work,
because RSS lags behind the frees (the same staleness that caused the original bug). The byte
budget is logical accounting that updates the instant memory is freed, so it stays accurate
during a spill. A running total (exempt_outstanding) tracks how much has been allowed through
but not yet freed; once it reaches the budget, further spillable requests are rejected again
(which just makes them spill more aggressively). The budget is only charged after an
allocation actually succeeds, so a request that is allowed through but then fails for another
reason does not permanently consume budget; it is returned as memory is freed.

The 95% critical line is checked first and ignores this allowance entirely, so even with the
full budget granted to concurrent spills, the node can never be pushed past the hard limit.

Flow — one spillable query as RSS climbs:

  1. RSS below 85%: every request is checked only against the pool limit and allowed. The query buffers in memory.
  2. RSS crosses 85%: the next request from the spillable consumer is allowed through (it fits the budget) and charged to exempt_outstanding.
  3. The budget fills: the next request is rejected. The operator reads this as "spill now", writes its data to disk, and frees it.
  4. Freeing calls shrink, which lowers exempt_outstanding immediately — so the budget reopens even though RSS has not dropped yet.
  5. The spill's own sort/merge buffer is now allowed through the reopened budget, so the spill finishes and the query continues.
  6. If RSS ever reaches 95%, step 2 onward stops: every request is rejected regardless of the budget, protecting the node.

4. Make runtime updates to the memory-guard thresholds actually take effect (DataFusionPlugin.java)

The four memory-guard thresholds were each registered with a callback that re-read the
settings from the cluster service. During a settings update, that read returns the previous
value (the new values aren't visible to that read until after all callbacks have run), so
changing a threshold at runtime silently kept using the old value. The fix uses a single
callback that reads the four values from the updated settings it is handed, so the new values
take effect.

5. Default the spill size limit to 80% of the spill disk, and reject invalid overrides (api.rs, DataFusionPlugin.java, tests)

datafusion.spill_memory_limit_bytes now defaults to 80% of the total capacity of the disk
the spill directory is on (with an 8 GiB fallback if that capacity can't be read). An explicit
value set by an operator is rejected if it is larger than the disk's capacity.

Check List

  • Functionality includes unit tests (the spill decision rule; DataFusionPluginSettingsTests).
  • Commits are signed per the DCO using --signoff.

@gaurav-amz gaurav-amz requested a review from a team as a code owner June 16, 2026 14:28
@github-actions

github-actions Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit dd2383d)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Validator Dependency Race

SpillLimitValidator.validate(Long, Map) reads DATAFUSION_SPILL_DIRECTORY from the dependencies map, but the validator is registered before DATAFUSION_SPILL_DIRECTORY is declared (line 266 vs. line 236). If the Setting constructor evaluates validators eagerly or the framework resolves dependencies by declaration order, the spill-directory key may be absent from the map when the validator runs during DATAFUSION_SPILL_MEMORY_LIMIT initialization, causing dir to be null/empty and the capacity check to be skipped silently. The validator would then accept values exceeding disk capacity at boot time (runtime updates are protected by the grouped consumer, but the initial parse is not).

static final class SpillLimitValidator implements Setting.Validator<Long> {
    @Override
    public void validate(Long value) {
        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
    }

    @Override
    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
        if (dir == null) {
            dir = "";
        }
        if (dir.isEmpty()) {
            if (value != 0L) {
                throw new IllegalArgumentException(
                    "Setting [datafusion.spill_memory_limit_bytes]="
                        + value
                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
                );
            }
            return;
        }
        long total;
        try {
            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
        } catch (IOException e) {
            // Probe failed — skip the capacity check. Same fail-open behavior as the
            // boot-time default derivation.
            return;
        }
        if (total > 0 && value > total) {
            throw new IllegalArgumentException(
                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
            );
        }
    }

    @Override
    public Iterator<Setting<?>> settings() {
        return List.<Setting<?>>of(DATAFUSION_SPILL_DIRECTORY).iterator();
    }
}
Exemption Budget Leak

exempt_outstanding is incremented when an exempted allocation succeeds (lines 271, 293) but only decremented in shrink (line 184). If a MemoryReservation is dropped without calling shrink—e.g., the operator panics or the reservation is leaked—the budget is never released. Subsequent spillable consumers see a permanently reduced cap (outstanding stays high), eventually exhausting the budget and causing spills to fail with RejectCapped even when actual memory usage is low. The saturating_sub in shrink (line 188) prevents underflow but does not recover from a leak.

    // Charge the exemption budget only now that the grow succeeded;
    // released saturating in `shrink`.
    if exempted {
        self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
    }
    return Ok(());
}

// Pool accounting says "full". Before failing the operator (which
// triggers spill), consult jemalloc as ground truth. If actual process
// memory is below the override threshold, the pool's "full" state is
// from stale phantoms or accounting drift — allow the grow.
//
// This gives already-executing operators a higher effective limit,
// preventing unnecessary spills when phantoms from finished queries
// haven't been released yet.
let limit = dynamic_limit.load(Ordering::Acquire);
let used = self.used.load(Ordering::Relaxed);
// Only attempt override if the allocation is plausible (won't overflow).
if used.checked_add(additional).is_some() {
    if crate::memory_guard::should_override(limit, crate::memory_guard::OverrideContext::Execution) {
        // jemalloc confirms headroom — allow the grow
        let _ = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| {
            u.checked_add(additional)
        });
        if exempted {
            self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
        }
        return Ok(());

@github-actions

github-actions Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to dd2383d

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Validate spill directory existence

When the spill directory does not exist at boot time, getFileStore() throws
IOException and the fallback (8 GiB) is used. However, if the directory is created
later, the default remains at 8 GiB rather than the correct 80% of actual capacity.
Consider validating that the directory exists or logging a warning when falling
back.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [319-333]

 static String deriveSpillLimitDefault(Settings settings) {
     String dir = DATAFUSION_SPILL_DIRECTORY.get(settings);
     if (dir == null || dir.isEmpty()) {
         return "0";
     }
     try {
-        long total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
+        Path spillPath = Path.of(dir);
+        if (!Files.exists(spillPath)) {
+            logger.warn("Spill directory [{}] does not exist at startup; using fallback limit of {} bytes", dir, SPILL_LIMIT_FALLBACK_BYTES);
+            return Long.toString(SPILL_LIMIT_FALLBACK_BYTES);
+        }
+        long total = Environment.getFileStore(spillPath).getTotalSpace();
         if (total <= 0) {
             return Long.toString(SPILL_LIMIT_FALLBACK_BYTES);
         }
         return Long.toString((long) (total * SPILL_LIMIT_FRACTION));
     } catch (IOException e) {
+        logger.warn("Failed to probe spill directory [{}]; using fallback limit of {} bytes", dir, SPILL_LIMIT_FALLBACK_BYTES, e);
         return Long.toString(SPILL_LIMIT_FALLBACK_BYTES);
     }
 }
Suggestion importance[1-10]: 5

__

Why: Adding a warning when the spill directory doesn't exist at boot time would improve observability. However, the suggestion's claim that "the default remains at 8 GiB rather than the correct 80% of actual capacity" if the directory is created later is incorrect—the default is computed once at boot time and is not re-evaluated. The warning would still be useful for operators to understand why the fallback was used.

Low
Possible issue
Validate disk capacity determination

The validator silently skips the capacity check when getTotalSpace() returns 0 or
negative, which can occur on some filesystems. This allows operators to set
arbitrarily large values that exceed actual disk capacity, potentially causing
runtime failures when spill attempts to write. Add explicit validation to reject
non-zero limits when total space cannot be determined.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [145-157]

-static final class SpillLimitValidator implements Setting.Validator<Long> {
-    @Override
-    public void validate(Long value) {
-        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
-    }
+if (total <= 0) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " cannot be validated: unable to determine capacity of spill volume at [" + dir + "]"
+    );
+}
+if (value > total) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
+    );
+}
 
-    @Override
-    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
-        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
-        if (dir == null) {
-            dir = "";
-        }
-        if (dir.isEmpty()) {
-            if (value != 0L) {
-                throw new IllegalArgumentException(
-                    "Setting [datafusion.spill_memory_limit_bytes]="
-                        + value
-                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
-                );
-            }
-            return;
-        }
-        long total;
-        try {
-            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        } catch (IOException e) {
-            // Probe failed — skip the capacity check. Same fail-open behavior as the
-            // boot-time default derivation.
-            return;
-        }
-        if (total > 0 && value > total) {
-            throw new IllegalArgumentException(
-                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
-            );
-        }
-    }
-
Suggestion importance[1-10]: 3

__

Why: The suggestion proposes rejecting settings when getTotalSpace() returns <= 0, but the current code already handles this case by skipping the capacity check (fail-open behavior). The PR explicitly documents this as intentional: "If the probe fails the capacity check is skipped — operators retain the ability to set the cap." Changing this to fail-closed would contradict the design decision and could block legitimate configurations in containerized environments.

Low

Previous suggestions

Suggestions up to commit dd2383d
CategorySuggestion                                                                                                                                    Impact
General
Log fallback usage for visibility

The method returns a fallback value when getTotalSpace() returns 0 or negative, but
this can silently mask configuration errors where the spill directory path is
invalid or inaccessible. Log a warning when the fallback is used so operators are
aware the default is not based on actual disk capacity.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [319-333]

-static String deriveSpillLimitDefault(Settings settings) {
-    String dir = DATAFUSION_SPILL_DIRECTORY.get(settings);
-    if (dir == null || dir.isEmpty()) {
-        return "0";
-    }
-    try {
-        long total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        if (total <= 0) {
-            return Long.toString(SPILL_LIMIT_FALLBACK_BYTES);
-        }
-        return Long.toString((long) (total * SPILL_LIMIT_FRACTION));
-    } catch (IOException e) {
+try {
+    long total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
+    if (total <= 0) {
+        logger.warn("Could not determine disk capacity for spill directory [{}], using fallback {} bytes", dir, SPILL_LIMIT_FALLBACK_BYTES);
         return Long.toString(SPILL_LIMIT_FALLBACK_BYTES);
     }
+    return Long.toString((long) (total * SPILL_LIMIT_FRACTION));
+} catch (IOException e) {
+    logger.warn("Failed to probe spill directory [{}], using fallback {} bytes", dir, SPILL_LIMIT_FALLBACK_BYTES, e);
+    return Long.toString(SPILL_LIMIT_FALLBACK_BYTES);
 }
Suggestion importance[1-10]: 6

__

Why: Adding logging when the fallback is used would improve operational visibility, helping operators understand when the default is not based on actual disk capacity. This is a reasonable enhancement for troubleshooting, though not critical since the fallback behavior is documented and intentional.

Low
Handle exemption budget leak scenarios

The exemption budget is charged after the allocation succeeds, but if the allocation
is later freed without going through shrink (e.g., due to a panic or early return in
the consumer), the budget remains charged indefinitely. This can cause the exemption
cap to be exhausted by phantom allocations, preventing legitimate spills from
completing.

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs [267-272]

 if exempted {
+    // Only charge if the allocation will be tracked through shrink.
+    // Consider using a guard pattern to ensure budget is released on drop.
     self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
 }
 return Ok(());
Suggestion importance[1-10]: 4

__

Why: The concern about budget leaks from panics or early returns is valid in theory, but DataFusion's MemoryReservation uses RAII (Drop trait) to ensure shrink is called when the reservation is dropped, even on panic. The suggestion's "consider using a guard pattern" is already the pattern in use. The comment addition has minimal value since the existing code already handles this correctly.

Low
Possible issue
Validate disk capacity determination

The validator silently skips the capacity check when getTotalSpace() returns 0 or
negative, which can occur on some filesystems. This allows operators to set
arbitrarily large values that exceed actual disk capacity, potentially causing
runtime failures when spill attempts to write. Add explicit validation to reject
non-zero limits when total space cannot be determined.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [145-157]

-static final class SpillLimitValidator implements Setting.Validator<Long> {
-    @Override
-    public void validate(Long value) {
-        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
-    }
+if (total <= 0) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " cannot be validated: unable to determine spill volume capacity for directory [" + dir + "]"
+    );
+}
+if (value > total) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
+    );
+}
 
-    @Override
-    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
-        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
-        if (dir == null) {
-            dir = "";
-        }
-        if (dir.isEmpty()) {
-            if (value != 0L) {
-                throw new IllegalArgumentException(
-                    "Setting [datafusion.spill_memory_limit_bytes]="
-                        + value
-                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
-                );
-            }
-            return;
-        }
-        long total;
-        try {
-            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        } catch (IOException e) {
-            // Probe failed — skip the capacity check. Same fail-open behavior as the
-            // boot-time default derivation.
-            return;
-        }
-        if (total > 0 && value > total) {
-            throw new IllegalArgumentException(
-                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
-            );
-        }
-    }
-
Suggestion importance[1-10]: 3

__

Why: The suggestion proposes rejecting settings when getTotalSpace() returns ≤0, but the PR explicitly documents a "fail-open" behavior where the capacity check is skipped if the probe fails. This allows operators to retain control when filesystem probing is unreliable. Changing to fail-closed would break that design choice and could prevent legitimate configurations in containerized environments.

Low
Suggestions up to commit dd2383d
CategorySuggestion                                                                                                                                    Impact
Possible issue
Charge exemption budget only on successful allocation

When the override path succeeds, exempt_outstanding is charged but the corresponding
used update may fail silently (the fetch_update result is discarded). This creates a
budget leak where exempted bytes are counted but not actually allocated, causing the
exemption budget to drain incorrectly over time.

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs [284-295]

-if cas_result.is_ok() {
-    // Charge the exemption budget only now that the grow succeeded;
-    // released saturating in `shrink`.
-    if exempted {
-        self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
-    }
-    return Ok(());
-}
-...
 if crate::memory_guard::should_override(limit, crate::memory_guard::OverrideContext::Execution) {
     // jemalloc confirms headroom — allow the grow
-    let _ = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| {
+    let update_result = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| {
         u.checked_add(additional)
     });
-    if exempted {
-        self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
+    if update_result.is_ok() {
+        if exempted {
+            self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
+        }
+        return Ok(());
     }
-    return Ok(());
 }
Suggestion importance[1-10]: 8

__

Why: This identifies a real bug where exempt_outstanding is charged even when the fetch_update fails. The discarded result means the exemption budget could be incorrectly decremented without a successful allocation, causing budget leakage. The fix correctly conditions the budget charge on update_result.is_ok().

Medium
Track exemption status per reservation

The shrink method decrements exempt_outstanding for every shrink operation,
including non-exempt allocations. This causes the exemption budget to be incorrectly
credited when normal (non-spillable) consumers free memory, allowing more exemptions
than the configured cap permits and potentially exceeding the 95% critical
threshold.

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs [180-190]

 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
     self.used.fetch_sub(shrink, Ordering::Relaxed);
-    // Give back exemption budget as memory is freed (a spill writing out its
-    // data calls `shrink`). Subtract without going below zero. A normal,
-    // non-exempt shrink may give budget back a little early, which is safe:
-    // it only makes the exemption stricter, never looser.
-    let _ = self
-        .exempt_outstanding
-        .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |out| {
-            Some(out.saturating_sub(shrink))
-        });
+    // Only credit back exemption budget if this reservation was actually
+    // exempted. Track exempted reservations via a flag or separate counter
+    // to avoid crediting non-exempt shrinks.
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that shrink credits back exempt_outstanding for all shrinks, not just exempted ones. However, the code comment acknowledges this ("A normal, non-exempt shrink may give budget back a little early") and claims it's safe. The suggestion would require tracking per-reservation state, which is a significant design change that may or may not be necessary.

Medium
Reject when disk capacity is indeterminate

The validator silently skips the capacity check when getTotalSpace() returns 0 or
negative, which can occur on some filesystems. This allows operators to set
arbitrarily large values that exceed actual disk capacity, potentially causing
runtime failures when spill attempts to write beyond available space.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [145-157]

-static final class SpillLimitValidator implements Setting.Validator<Long> {
-    @Override
-    public void validate(Long value) {
-        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
-    }
+if (total <= 0) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes] cannot be validated: unable to determine capacity of spill volume at [" + dir + "]"
+    );
+}
+if (value > total) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
+    );
+}
 
-    @Override
-    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
-        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
-        if (dir == null) {
-            dir = "";
-        }
-        if (dir.isEmpty()) {
-            if (value != 0L) {
-                throw new IllegalArgumentException(
-                    "Setting [datafusion.spill_memory_limit_bytes]="
-                        + value
-                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
-                );
-            }
-            return;
-        }
-        long total;
-        try {
-            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        } catch (IOException e) {
-            // Probe failed — skip the capacity check. Same fail-open behavior as the
-            // boot-time default derivation.
-            return;
-        }
-        if (total > 0 && value > total) {
-            throw new IllegalArgumentException(
-                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
-            );
-        }
-    }
-
Suggestion importance[1-10]: 3

__

Why: The suggestion proposes rejecting when getTotalSpace() returns 0 or negative, but the PR's fail-open behavior is intentional and documented. The code comment explicitly states "skip the capacity check" when the probe fails, preserving operator flexibility. Changing this to reject would break the documented design.

Low
Suggestions up to commit 302c6c6
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent double-charging exemption budget

The exemption budget is charged twice when the override path succeeds after the CAS
fails. Both the normal path and the override path increment exempt_outstanding when
exempted is true, causing the budget to be double-counted and artificially
constraining future spill operations.

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs [267-295]

 if exempted {
     self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
 }
 return Ok(());
 ...
-if exempted {
-    self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
-}
+// Override path: budget already charged above if exempted was set
+let _ = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| {
+    u.checked_add(additional)
+});
 return Ok(());
Suggestion importance[1-10]: 9

__

Why: Critical correctness issue. When the CAS fails but the override succeeds, exempt_outstanding is incremented twice (lines 271 and 293), causing the exemption budget to be double-counted. This artificially constrains future spill operations and breaks the budget accounting invariant.

High
Fix exemption budget accounting leak

The shrink method decrements exempt_outstanding for every memory release, including
non-exempt allocations. This causes the exemption budget to underflow (saturate at
zero) when normal allocations are freed, incorrectly freeing up budget that was
never charged and allowing more exemptions than the configured cap permits.

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs [180-190]

 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
     self.used.fetch_sub(shrink, Ordering::Relaxed);
-    // Give back exemption budget as memory is freed (a spill writing out its
-    // data calls `shrink`). Subtract without going below zero. A normal,
-    // non-exempt shrink may give budget back a little early, which is safe:
-    // it only makes the exemption stricter, never looser.
-    let _ = self
-        .exempt_outstanding
-        .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |out| {
-            Some(out.saturating_sub(shrink))
-        });
+    // Only return budget if this reservation was actually exempted. Without
+    // tracking per-reservation exemption state, we cannot safely decrement
+    // exempt_outstanding here. Consider tracking exemption in MemoryReservation
+    // or accepting that budget is only freed when exempted reservations drop.
 }
Suggestion importance[1-10]: 8

__

Why: The shrink method decrements exempt_outstanding for all memory releases (line 183-189), not just exempted allocations. This causes budget underflow when non-exempt allocations are freed, incorrectly freeing budget that was never charged and allowing more exemptions than the configured cap permits, breaking the budget invariant.

Medium
Validate disk capacity probe results

The validator silently skips the capacity check when getTotalSpace() returns 0 or
negative, which can occur on some filesystems. This allows operators to set
arbitrarily large values that exceed actual disk capacity, potentially causing
runtime failures when spill attempts to write beyond available space.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [145-157]

-static final class SpillLimitValidator implements Setting.Validator<Long> {
-    @Override
-    public void validate(Long value) {
-        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
-    }
+if (total <= 0) {
+    throw new IllegalArgumentException(
+        "Cannot determine capacity for spill directory [" + dir + "]. Set a valid directory or disable spill."
+    );
+}
+if (value > total) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
+    );
+}
 
-    @Override
-    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
-        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
-        if (dir == null) {
-            dir = "";
-        }
-        if (dir.isEmpty()) {
-            if (value != 0L) {
-                throw new IllegalArgumentException(
-                    "Setting [datafusion.spill_memory_limit_bytes]="
-                        + value
-                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
-                );
-            }
-            return;
-        }
-        long total;
-        try {
-            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        } catch (IOException e) {
-            // Probe failed — skip the capacity check. Same fail-open behavior as the
-            // boot-time default derivation.
-            return;
-        }
-        if (total > 0 && value > total) {
-            throw new IllegalArgumentException(
-                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
-            );
-        }
-    }
-
Suggestion importance[1-10]: 3

__

Why: The suggestion identifies a potential edge case where getTotalSpace() returns 0 or negative, but the PR explicitly documents this as fail-open behavior (lines 149-151) to preserve operator flexibility when probes fail. The current design intentionally skips validation rather than blocking configuration updates, so this is a design choice rather than a bug.

Low
Suggestions up to commit 0dd4959
CategorySuggestion                                                                                                                                    Impact
General
Prevent exemption budget leakage on failure

The exemption budget is charged after the allocation succeeds, but if a subsequent
operation fails or the reservation is dropped before shrink is called, the budget
remains consumed. This can lead to budget leakage where the cap is never fully
released, progressively restricting future spill operations. Consider tracking
exempted reservations and releasing their budget on drop.

sandbox/plugins/analytics-backend-datafusion/rust/src/memory.rs [267-272]

 if exempted {
+    // Track this reservation for budget cleanup on drop
     self.exempt_outstanding.fetch_add(additional, Ordering::Relaxed);
 }
 return Ok(());
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a valid concern about potential budget leakage if a reservation is dropped without calling shrink. However, the improved_code is identical to the existing_code and doesn't actually implement the suggested fix (tracking reservations for cleanup on drop). The concern is legitimate but the solution is incomplete.

Low
Possible issue
Validate spill limit against disk capacity

The validator silently skips the capacity check when getTotalSpace() returns 0 or
negative, which can occur on some filesystems. This allows operators to set
arbitrarily large values that exceed actual disk capacity, potentially causing
runtime failures when spill attempts to write. Add explicit validation to reject
non-zero limits when total space is 0 or negative.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [145-157]

-static final class SpillLimitValidator implements Setting.Validator<Long> {
-    @Override
-    public void validate(Long value) {
-        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
-    }
+if (total <= 0) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " cannot be validated: spill volume capacity is unavailable or zero"
+    );
+}
+if (value > total) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
+    );
+}
 
-    @Override
-    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
-        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
-        if (dir == null) {
-            dir = "";
-        }
-        if (dir.isEmpty()) {
-            if (value != 0L) {
-                throw new IllegalArgumentException(
-                    "Setting [datafusion.spill_memory_limit_bytes]="
-                        + value
-                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
-                );
-            }
-            return;
-        }
-        long total;
-        try {
-            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        } catch (IOException e) {
-            // Probe failed — skip the capacity check. Same fail-open behavior as the
-            // boot-time default derivation.
-            return;
-        }
-        if (total > 0 && value > total) {
-            throw new IllegalArgumentException(
-                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
-            );
-        }
-    }
-
Suggestion importance[1-10]: 3

__

Why: The suggestion identifies a potential edge case where getTotalSpace() returns 0 or negative, but the current fail-open behavior is intentional (as documented in the code comments). The validator explicitly skips the capacity check when the probe fails to preserve operator flexibility. Rejecting the setting would be overly restrictive and contradict the documented fail-open design.

Low
Suggestions up to commit 35443e9
CategorySuggestion                                                                                                                                    Impact
General
Log warning when capacity check skipped

The validator silently skips the capacity check when getTotalSpace() returns 0 or
negative, which can occur on some filesystems. This allows operators to set
arbitrarily large values that exceed actual disk capacity. Consider logging a
warning when the probe returns invalid values so operators are aware the safety
check was bypassed.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [153-157]

-static final class SpillLimitValidator implements Setting.Validator<Long> {
-    @Override
-    public void validate(Long value) {
-        // Range check (>= 0) lives in the parser; nothing to do here without dependencies.
-    }
+if (total > 0 && value > total) {
+    throw new IllegalArgumentException(
+        "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
+    );
+} else if (total <= 0) {
+    logger.warn(
+        "Could not determine spill volume capacity for [{}] (getTotalSpace returned {}); skipping capacity validation for spill_memory_limit_bytes={}",
+        dir, total, value
+    );
+}
 
-    @Override
-    public void validate(Long value, Map<Setting<?>, Object> dependencies) {
-        String dir = (String) dependencies.get(DATAFUSION_SPILL_DIRECTORY);
-        if (dir == null) {
-            dir = "";
-        }
-        if (dir.isEmpty()) {
-            if (value != 0L) {
-                throw new IllegalArgumentException(
-                    "Setting [datafusion.spill_memory_limit_bytes]="
-                        + value
-                        + " is non-zero but datafusion.spill_directory is unset (spill disabled)"
-                );
-            }
-            return;
-        }
-        long total;
-        try {
-            total = Environment.getFileStore(Path.of(dir)).getTotalSpace();
-        } catch (IOException e) {
-            // Probe failed — skip the capacity check. Same fail-open behavior as the
-            // boot-time default derivation.
-            return;
-        }
-        if (total > 0 && value > total) {
-            throw new IllegalArgumentException(
-                "Setting [datafusion.spill_memory_limit_bytes]=" + value + " exceeds spill volume capacity (" + total + " bytes)"
-            );
-        }
-    }
-
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that when getTotalSpace() returns 0 or negative, the capacity check is silently skipped. Adding a warning log would improve observability, though the fail-open behavior is intentional and documented in the code comments.

Low

@gaurav-amz gaurav-amz changed the title [analytics-datafusion] Fix native memory leak, stream wedge, spill forward-progress, and dynamic threshold updates [analytics-datafusion] Native memory leak, stream wedge, spill forward-progress, threshold & spill-cap fixes Jun 16, 2026
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 33323b5

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 33323b5: SUCCESS

@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.36%. Comparing base (c73dc2e) to head (dd2383d).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22187      +/-   ##
============================================
+ Coverage     73.32%   73.36%   +0.03%     
- Complexity    75934    76000      +66     
============================================
  Files          6075     6075              
  Lines        345282   345282              
  Branches      49697    49697              
============================================
+ Hits         253177   253315     +138     
+ Misses        71786    71758      -28     
+ Partials      20319    20209     -110     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@gaurav-amz gaurav-amz force-pushed the datafusion-try-grow-fix branch from 33323b5 to d61cf19 Compare June 16, 2026 18:55
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d61cf19

@gaurav-amz gaurav-amz force-pushed the datafusion-try-grow-fix branch 2 times, most recently from 6e4b0f8 to 7d6e1bd Compare June 16, 2026 19:19
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7d6e1bd

@gaurav-amz gaurav-amz changed the title [analytics-datafusion] Native memory leak, stream wedge, spill forward-progress, threshold & spill-cap fixes [analytics-datafusion] Fixes for native memory on cancel, a query hang, spill progress, threshold updates, and spill size default Jun 16, 2026
@github-actions github-actions Bot added :sanitize Removing elastic specific artifacts :xpack-removal Related to removal of x-pack >FORK Related to the fork process labels Jun 16, 2026
@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 7d6e1bd: SUCCESS

// `DynamicLimitPool::try_grow`). Limits how much memory spillable consumers can
// be allowed through the 85% check at the same time, so several spills together
// stay below the 95% limit. Default 512MB.
static SPILL_EXEMPT_CAP_BYTES: AtomicU64 = AtomicU64::new(512 * 1024 * 1024);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make it around 10% in next revision.

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1d88317

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1d88317: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@Bukhtawar Bukhtawar left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a test to demonstrate spill kicks in?

Comment on lines +98 to +103
/** Fraction of the spill volume's total capacity used as the default cap. */
static final double SPILL_LIMIT_FRACTION = 0.80;

/** Fallback when the spill volume's capacity cannot be probed. 8 GiB. */
static final long SPILL_LIMIT_FALLBACK_BYTES = 8L * 1024 * 1024 * 1024;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this lead to wasted disk space? Can we reduce the 8GB buffer based on some benchmarks

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer is a fallback and based on minimum instance ram size (50% of 16GB). Ideally the SPILL_LIMIT_FRACTION will be used which is 80% of the spill disk size. Above this threshold the rejection will be triggered for spill.

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6cba71d

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 6cba71d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 60a286a

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5eef64a

@gaurav-amz gaurav-amz force-pushed the datafusion-try-grow-fix branch from 5eef64a to 69dc974 Compare June 20, 2026 22:20
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 69dc974

@gaurav-amz gaurav-amz force-pushed the datafusion-try-grow-fix branch from 69dc974 to 6271a7e Compare June 20, 2026 22:27
…the DataFusion engine

Squash of the datafusion-try-grow-fix work:

- Clamp data-node fragment gate to CPU worker count.
- Fix permanent CrossRtStream wedge by spawning the driver off the consumer.
- Exempt self-liquidating spill reservations from the 85% RSS gate
  (bounded by SPILL_EXEMPT_CAP_BYTES).
- Fix dynamic memory-guard threshold updates silently using stale values
  (single grouped settings-update consumer).
- Default spill cap to 80% of spill volume capacity; validate operator overrides.
- Make spill_exempt_cap_bytes a dynamic cluster setting
  (datafusion.memory_guard.spill_exempt_cap_bytes, raw bytes, default 512MB).
- Add Rust unit tests for the spill_exempt_cap setter and FFI negative-clamp.
- Add an end-to-end disk-spill test (GROUP BY under a small pool spills and
  returns correct results, asserted via DataFusion SpillCount/SpilledBytes metrics).

The cross_rt_stream.rs and runtime_manager.rs changes were reverted; this squash
reflects the net final state with those files unchanged from origin/main.

Signed-off-by: snghsvn <snghsvn@amazon.com>
@gaurav-amz gaurav-amz force-pushed the datafusion-try-grow-fix branch from 6271a7e to 35443e9 Compare June 20, 2026 22:29
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6271a7e

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 35443e9

@gaurav-amz

Copy link
Copy Markdown
Contributor Author

Can we write a test to demonstrate spill kicks in?

Have added a integ test for spill test.

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 35443e9: SUCCESS

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0dd4959

…NGS count

Adding datafusion.memory_guard.spill_exempt_cap_bytes to ALL_SETTINGS raised the
registered-setting count from 28 to 29, but testAllSettingsContainsAllExpectedSettings
still asserted 28. Update the count and assert the new setting is registered.

Signed-off-by: snghsvn <snghsvn@amazon.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 302c6c6

@github-actions

Copy link
Copy Markdown
Contributor

❕ Gradle check result for 302c6c6: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dd2383d

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for dd2383d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz closed this Jun 21, 2026
@gaurav-amz gaurav-amz reopened this Jun 21, 2026
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dd2383d

@gaurav-amz gaurav-amz closed this Jun 21, 2026
@gaurav-amz gaurav-amz reopened this Jun 21, 2026
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dd2383d

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for dd2383d: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>FORK Related to the fork process :sanitize Removing elastic specific artifacts :xpack-removal Related to removal of x-pack

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants