Skip to content

Integrate rust side changes for memory pool#22227

Open
rayshrey wants to merge 1 commit into
opensearch-project:mainfrom
rayshrey:memory-pool-merge-integration
Open

Integrate rust side changes for memory pool#22227
rayshrey wants to merge 1 commit into
opensearch-project:mainfrom
rayshrey:memory-pool-merge-integration

Conversation

@rayshrey

Copy link
Copy Markdown
Contributor

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: rayshrey <rayshrey@amazon.com>
@github-actions

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In load_next_batch, when an error occurs reading the sort batch, the function releases old_sort_bytes and returns the error. However, if current_data_batch_bytes > 0, that memory is not released before returning. This can leak tracked memory if a data batch was loaded in a previous iteration and the sort batch read fails.

let sort_result = match self.sort_prefetch_rx.recv() {
    Ok(Some(Ok(batch))) => Some(batch),
    Ok(Some(Err(e))) => {
        self.sort_prefetch_pending = false;
        // Error: release sort batch tracking since cursor is now exhausted
        reservation.shrink(old_sort_bytes);
        self.current_sort_batch_bytes = 0;
        return Err(e);
    }
Possible Issue

In try_load_data, if the loop exits without finding a matching batch (e.g., data_batch_index never reaches sort_batch_index), current_data_batch_bytes remains non-zero from the last shrink/grow cycle, but data_batch is None. Subsequent calls to ensure_data_loaded may incorrectly assume memory is tracked when no batch is actually loaded.

fn try_load_data(&mut self, reservation: &mut MemoryReservation) -> MergeResult<()> {
    let reader = self.data_reader.as_mut()
        .ok_or_else(|| MergeError::Logic("Data reader already closed".into()))?;

    // Release previous data_batch — about to load a new one
    if self.current_data_batch_bytes > 0 {
        reservation.shrink(self.current_data_batch_bytes);
        self.current_data_batch_bytes = 0;
    }
    self.data_batch = None;

    while self.data_batch_index <= self.sort_batch_index {
        match reader.next() {
            Some(Ok(batch)) => {
                if batch.num_rows() == 0 {
                    return Err(MergeError::Logic(format!(
                        "Data reader returned empty batch at position {}",
                        self.data_batch_index
                    )));
                }
                if self.data_batch_index == self.sort_batch_index {
                    // Target batch — validate and keep
                    if let Some(ref sb) = self.sort_batch {
                        if batch.num_rows() != sb.num_rows() {
                            return Err(MergeError::Logic(format!(
                                "Data batch rows ({}) != sort batch rows ({}) at index {}",
                                batch.num_rows(), sb.num_rows(), self.sort_batch_index
                            )));
                        }
                    }
                    let data_bytes = batch.get_array_memory_size();
                    // Track full-column data batch — already allocated by data_reader.next()
                    reservation.grow(data_bytes);
                    self.current_data_batch_bytes = data_bytes;
                    self.data_batch = Some(batch);
                }
                // Skipped batch — discard
                self.data_batch_index += 1;
            }
            Some(Err(e)) => return Err(e.into()),
            None => return Err(MergeError::Logic(format!(
Possible Issue

In write_batch, the code clones writer_arc before locking, then calls reserve_estimated on state.reservation while holding the lock. If reserve_estimated blocks waiting for memory, the writer lock is held during the wait, potentially blocking other threads trying to write or finalize. This can cause deadlock if multiple writers contend for the same pool.

if let Some(mut state) = WRITERS.get_mut(&temp_filename) {
    match &state.variant {
        WriterVariant::Ipc(writer_arc) => {
            log_debug!("Writing RecordBatch to IPC staging file");
            let writer_arc = writer_arc.clone();
            let mut writer = writer_arc.lock().unwrap();
            writer.write(&record_batch, &mut state.reservation)?;
        }
        WriterVariant::Parquet(writer_arc) => {
            log_debug!("Writing RecordBatch to Parquet file");
            let batch_bytes = record_batch.get_array_memory_size();
            // Reserve 2× batch as estimate — ArrowWriter encoding may temporarily
            // hold dictionary, compressed pages, and data page buffers.
            let estimated = batch_bytes * 2;
            let writer_arc = writer_arc.clone();
            state.reservation.reserve_estimated(estimated)?;
            let mut writer = writer_arc.lock().unwrap();
            let before = writer.memory_size();
            writer.write(&record_batch)?;
            // Reconcile: adjust reservation to actual delta reported by ArrowWriter
            let actual = writer.memory_size().saturating_sub(before);
            drop(writer);
            state.reservation.reconcile(estimated, actual);
Possible Issue

In finalize_sorted_chunks (multiple chunks path), merge_output.mapping is dropped at line 718, then reservation.shrink(mapping_bytes) is called. However, merge_output also contains gen_keys, gen_offsets, gen_sizes, and gen_count vectors that are dropped at the same time. If these vectors are large, their memory is not tracked or released from the reservation, leading to under-accounting of actual memory freed.

drop(merge_output);
// merge_output.mapping freed — release its share, flat_mapping remains tracked
reservation.shrink(mapping_bytes);
log_info!("finalize_sorted_chunks: produced {} permutation entries for {}", flat_mapping.len(), output_filename);

@github-actions

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix reservation leak on write failure

If writer.write() fails, the estimated reservation remains allocated but reconcile()
is never called, causing a memory leak in the reservation tracking. Ensure
reconcile() or shrink() is called on the error path to release the estimated
reservation.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [491-503]

 let estimated = batch_bytes * 2;
 let writer_arc = writer_arc.clone();
 state.reservation.reserve_estimated(estimated)?;
 let mut writer = writer_arc.lock().unwrap();
 let before = writer.memory_size();
-writer.write(&record_batch)?;
+let write_result = writer.write(&record_batch);
 let actual = writer.memory_size().saturating_sub(before);
 drop(writer);
 state.reservation.reconcile(estimated, actual);
+write_result?;
Suggestion importance[1-10]: 9

__

Why: Critical bug fix. If writer.write() fails, reconcile() is never called, leaving the estimated reservation allocated. The suggested fix ensures reconcile() runs before propagating the error, preventing memory tracking leaks.

High
General
Release prefetch memory on error

When the prefetch fails, only the current sort batch memory is released, but the
prefetch batch (which may have been partially allocated before the error) is not
accounted for. This could leave unreleased memory in the reservation if the prefetch
thread allocated memory before encountering the error.

sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs [211-217]

 Ok(Some(Err(e))) => {
     self.sort_prefetch_pending = false;
-    // Error: release sort batch tracking since cursor is now exhausted
-    reservation.shrink(old_sort_bytes);
+    // Error: release both current and estimated prefetch memory
+    reservation.shrink(old_sort_bytes * 2);
     self.current_sort_batch_bytes = 0;
     return Err(e);
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion identifies a potential issue but the fix is imprecise. The prefetch thread may not have allocated memory before failing, so releasing old_sort_bytes * 2 could over-release. A more accurate approach would track prefetch allocation separately.

Low
Improve prefetch memory estimation accuracy

The prefetch estimation (2× first batch) may be inaccurate if subsequent batches
differ significantly in size. Consider tracking the prefetch batch separately or
using a more conservative estimate to prevent under-reservation when the prefetch
batch is larger than the first batch.

sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs [172-174]

 let batch_bytes = cursor.sort_batch.as_ref().unwrap().get_array_memory_size();
-reservation.grow(batch_bytes * 2);
+// Reserve 3× first batch to account for prefetch variability
+reservation.grow(batch_bytes * 3);
 cursor.current_sort_batch_bytes = batch_bytes;
Suggestion importance[1-10]: 3

__

Why: The suggestion to increase prefetch estimation from 2× to 3× is overly conservative without evidence that 2× is insufficient. The current implementation already handles size variations through delta adjustments in load_next_batch(), making this change unnecessary.

Low

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 1d8a5e0: SUCCESS

@codecov

codecov Bot commented Jun 17, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.43%. Comparing base (cca17b8) to head (1d8a5e0).
⚠️ Report is 8 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22227      +/-   ##
============================================
+ Coverage     73.40%   73.43%   +0.02%     
- Complexity    75872    75892      +20     
============================================
  Files          6068     6068              
  Lines        344623   344623              
  Branches      49584    49584              
============================================
+ Hits         252978   253078     +100     
+ Misses        71490    71350     -140     
- Partials      20155    20195      +40     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rayshrey rayshrey marked this pull request as ready for review June 18, 2026 05:25
@rayshrey rayshrey requested a review from a team as a code owner June 18, 2026 05:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant