Skip to content

core page index cache changes#22254

Open
bharath-techie wants to merge 2 commits into
opensearch-project:mainfrom
bharath-techie:core-page-index-cache
Open

core page index cache changes#22254
bharath-techie wants to merge 2 commits into
opensearch-project:mainfrom
bharath-techie:core-page-index-cache

Conversation

@bharath-techie

@bharath-techie bharath-techie commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

This change reduces the dominant native heap consumer + compute on wide-schema Parquet files: the full ColumnIndex + OffsetIndex decoded for every column on every query. For example in textbench, 1 billion rows dataset - the full parquet metadata was 1750 MB.

So, DataFusion's parquet opener decodes the entire page index (per-page string min/max for all columns × all row groups) before caching, then caches the result. That amounts to 1750 MB.

So we try to only cache the parquet metadata footer which consists of file stats and RG stats by default and lazily load and cache the offset index and column index.

Two-cache scoped page index (column/offset index cache)

  • COLUMN_INDEX_CACHE — cell-granular (file, col, rg) key. Stores only the decoded ColumnIndexMetaData cells for predicate columns; NONE placeholders elsewhere. The per-page string min/max (the heap hog) is
    decoded and stored once per file, shared across every query whose predicate touches that column regardless of projection or literal.
  • OFFSET_INDEX_CACHE — cell-granular (file, col) key. Stores OffsetIndexMetaData (fixed-width page offsets) for predicate ∪ projection ∪ {col 0} columns; single-page safe placeholders elsewhere. Must span
    all row groups (arrow-rs push decoder indexes directly by file-global RG).
  • Backed by BoundedCache<K,V> with Box — eviction policy is pluggable (LRU now, S3-FIFO follow-up).

Footer-only level-1 metadata cache

  • load_parquet_metadata now uses PageIndexPolicy::Skip on cache miss — fetches only footer bytes, never page index bytes.
  • strip_page_index in MutexFileMetadataCache::put — safety net for any DataFusion internal path (infer_schema, fetch_statistics) that calls DFParquetMetadata::fetch_metadata with the cache. Strips before
    storing so the level-1 LRU only ever holds footer-only entries.
  • Pre-warming loop in create_session_context — populates the metadata cache footer-only before infer_schema fires, so every DataFusion planning call is a cache hit and never triggers the
    PageIndexPolicy::Optional code path.

Listing-table scan path (scoped_page_index_reader.rs, scoped_index_optimizer.rs)

  • ScopedPageIndexOptimizer — PhysicalOptimizerRule that walks the physical plan and installs ScopedPageIndexReaderFactory on every DataSourceExec backed by a ParquetSource. Provider-agnostic.
  • ScopedPageIndexReaderFactory / ScopedPageIndexReader — on get_metadata, loads footer from the shared cache then augments with a scoped page index. Gated on predicate OR projection being non-empty (fixes
    the match()-only 2.5× bytes_scanned regression).

Indexed-table scan path (indexed_executor.rs)

  • After build_segments, resolves predicate + projection column names → per-file parquet leaf indices in one pass (resolve_predicate_parquet_columns_pair) and augments each segment's metadata with a scoped
    page index via load_scoped_page_index_cols.
  • Both paths share the same process-global caches — a listing query seeds entries that an indexed query hits.

Cache size after these changes :

Level 1 cache : 157 MB [ parquet footer ].
Offset index : 158 MB even after select * type of queries .
Column index : 20-30 MB when 20 columns are used in various queries as filter.

So down to 300 MB from 1750 MB and lazy loading causes evictions to be less painful since only part of the offset index and column index - ~5 MB needs to be loaded back.


Follow-ups (not in this PR)

  1. Wire surviving_rgs into both scan paths (Step 2) — ScopedPageIndexReaderFactory already carries predicate: Option<Arc> for this. Indexed executor has it in scope. Both would call
    surviving_row_groups() → load_scoped_page_index_scoped(). Currently load_scoped_page_index_cols (no RG scoping) is used.
  2. S3-FIFO eviction policy — add PolicyType::S3Fifo to cache/eviction_policy.rs, swap the PolicyType::Lru constant in cache/page_index/mod.rs. Zero changes to BoundedCache.
  3. Cache the derived Arrow schema on SegmentFileInfo — parquet_to_arrow_schema is called O(N×RGs) times per file per query across page_pruner.rs, dynamic_filter.rs, and resolve.rs. Store it once on
    SegmentFileInfo at build_segments time and thread it through.
  4. Stats integration
  5. Clear cache integration

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@bharath-techie bharath-techie requested a review from a team as a code owner June 20, 2026 18:01
@bharath-techie bharath-techie added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label Jun 20, 2026
@github-actions

github-actions Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 4483eae)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The BufferChunkReader::rel method performs arithmetic that can overflow or produce incorrect results if start is less than self.base. While there is a checked_sub guard, the subsequent conversion to usize and bounds check assume the buffer is correctly positioned. If the page-index readers ever pass an offset below the buffer's base (e.g., due to corrupted metadata or a logic error elsewhere), the error message will be clear, but the function does not validate that self.base itself is sane relative to the file. This is a narrow scenario but could cause confusion during debugging if metadata is malformed.

fn rel(&self, start: u64, length: usize) -> ParquetResult<usize> {
    let rel = start.checked_sub(self.base).ok_or_else(|| {
        ParquetError::General(format!(
            "page-index read offset {start} precedes buffer base {}",
            self.base
        ))
    })?;
    let rel = usize::try_from(rel)
        .map_err(|e| ParquetError::General(format!("offset overflow: {e}")))?;
    if rel + length > self.bytes.len() {
        return Err(ParquetError::General(format!(
            "page-index read [{rel}..{}) exceeds buffer of len {}",
            rel + length,
            self.bytes.len()
        )));
    }
    Ok(rel)
}
Possible Issue

In CustomStatisticsCache::put, the eviction logic computes target_eviction as (current_size + memory_size) - (size_limit * 0.6). If current_size + memory_size is less than size_limit * 0.6, this subtraction can underflow (wrapping in release mode or panicking in debug mode with overflow checks). The code does not guard against this case. A query inserting a small entry when the cache is nearly empty could trigger undefined behavior or a panic.

let target_eviction = (current_size + memory_size) - (size_limit as f64 * 0.6) as usize;
if let Ok(policy_guard) = self.policy.lock() {
Possible Issue

The load_scoped_page_index_cols function unions predicate_cols and projection_cols into off_cols and always inserts column 0 for the page-skip metric. However, if num_cols is 0 (an empty schema or a malformed file), inserting 0 into the set and then filtering c < num_cols will remove it, leaving off_cols empty. The function then returns None at line 446, which is correct, but the comment at line 432 claims column 0 is always included. This inconsistency could confuse future maintainers or cause issues if the caller assumes column 0 is always present when off_cols is non-empty.

    None => (0..num_cols).collect(),
    Some(proj_cols) => {
        let mut set: HashSet<usize> = HashSet::new();
        set.insert(0); // page-skip metric always reads col 0
        for &c in predicate_cols {
            set.insert(c);
        }
        for &c in proj_cols {
            set.insert(c);
        }
        debug_assert!(
            set.iter().all(|&c| c < num_cols),
            "column index out of bounds (num_cols={num_cols}): {set:?}"
        );
        set.into_iter().filter(|&c| c < num_cols).collect()
    }
};
if off_cols.is_empty() {
    return None;

@github-actions

github-actions Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 4483eae

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Add production bounds check

The debug_assert! only validates bounds in debug builds. In production,
out-of-bounds access will panic. Add a runtime bounds check before indexing to
prevent panics from malformed metadata or logic errors.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [310-324]

 let built = build_column_index_cells(store, location, footer_meta, &missing_col_rg_matrix).await?;
 for cell in built {
-    debug_assert!(
-        cell.rg < col_index_matrix.len() && cell.col < col_index_matrix[cell.rg].len(),
-        "cell ({}, {}) out of matrix bounds ({num_rgs} rgs, {num_cols} cols)",
-        cell.col, cell.rg,
-    );
+    if cell.rg >= col_index_matrix.len() || cell.col >= col_index_matrix[cell.rg].len() {
+        return None;
+    }
     COLUMN_INDEX_CACHE.insert(
         CiCellKey { path: path.clone(), col: cell.col, rg: cell.rg },
         cell.data.clone(),
         cell.size,
     );
     col_index_matrix[cell.rg][cell.col] = cell.data;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that debug_assert! only runs in debug builds, leaving production vulnerable to panics from out-of-bounds access. Adding a runtime check before indexing col_index_matrix[cell.rg][cell.col] prevents panics from malformed metadata or logic errors, improving robustness.

Medium
General
Log partial read failures

When buffers.len() doesn't match fetch_ranges.len(), the function silently returns
None, masking a potential object store or network issue. Log the mismatch to aid
debugging partial read failures.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [373-376]

 let buffers = store.get_ranges(location, &fetch_ranges).await.ok()?;
 if buffers.len() != fetch_ranges.len() {
+    log::warn!(
+        "Partial read failure for {}: expected {} ranges, got {}",
+        location, fetch_ranges.len(), buffers.len()
+    );
     return None;
 }
Suggestion importance[1-10]: 6

__

Why: Logging the mismatch between expected and received buffer counts aids debugging of object store or network issues. The function already returns None (correct fallback), but the silent failure makes diagnosing partial reads harder in production.

Low
Remove invalid cache entries

The cache entry validation (is_valid_for) can fail silently, leaving stale metadata
in the cache. When validation fails, consider removing the invalid entry from the
cache to prevent repeated validation overhead on subsequent calls.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/parquet_bridge.rs [71-83]

 let pq_meta = if let Some(entry) = metadata_cache.get(location) {
     if entry.is_valid_for(&meta) {
         entry
             .file_metadata
             .as_any()
             .downcast_ref::<CachedParquetMetaData>()
             .map(|cached| Arc::clone(cached.parquet_metadata()))
     } else {
+        metadata_cache.remove(location);
         None
     }
 } else {
     None
 };
Suggestion importance[1-10]: 6

__

Why: The suggestion to remove invalid cache entries when is_valid_for fails is a good optimization to prevent repeated validation overhead. However, the suggestion assumes metadata_cache has a remove method, which may not exist in the FileMetadataCache trait.

Low
Log memory tracking failures

The memory_state lock is held across multiple operations including a HashMap insert.
If the lock is poisoned or contended, memory tracking silently fails. Consider
logging the error or returning a Result to surface tracking failures.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/statistics_cache.rs [386-393]

 if let Ok(mut state) = self.memory_state.lock() {
     if let Some(old_size) = state.tracker.get(&key) {
         state.total = state.total.saturating_sub(*old_size);
     }
     state.tracker.insert(key.clone(), memory_size);
     state.total += memory_size;
+} else {
+    log::error!("Failed to acquire memory_state lock during put operation for key: {}", key);
 }
Suggestion importance[1-10]: 5

__

Why: Logging when the memory_state lock fails improves observability of silent tracking failures. However, the impact is moderate since the cache continues to function (only tracking is affected), and lock poisoning is rare in normal operation.

Low
Warn on oversized cache entries

Silently dropping insertions when size > limit can hide configuration issues or
unexpectedly large entries. Consider logging a warning when an entry is rejected to
help diagnose cache misses.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/cache_store.rs [106-110]

 pub(super) fn insert(&self, key: K, value: V, size: usize) {
     let limit = self.limit.load(Relaxed);
     if size > limit {
+        log::warn!("Cache entry {} ({} bytes) exceeds limit ({} bytes), dropping", key, size, limit);
         return;
     }
Suggestion importance[1-10]: 5

__

Why: Logging when an entry exceeds the cache limit helps diagnose unexpected cache misses or misconfiguration. The silent drop is correct behavior, but the lack of visibility can hide issues with oversized entries or incorrect limit settings.

Low
Log schema derivation errors

The error from parquet_to_arrow_schema is silently discarded. Log the error before
returning empty to aid debugging when schema derivation fails due to malformed
footers or unsupported types.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/column_schema_resolver.rs [54-63]

 let file_arrow_schema = match parquet_to_arrow_schema(
     parquet_schema,
     metadata.file_metadata().key_value_metadata(),
 ) {
     Ok(s) => Arc::new(s),
-    // If we can't derive the file schema (malformed footer, unsupported type),
-    // return empty. Empty is the safe conservative choice:  the caller skips the
-    // scoped load and falls back to footer-only.
-    Err(_) => return vec![],
+    Err(e) => {
+        log::warn!("Failed to derive arrow schema from parquet: {}", e);
+        return vec![];
+    }
 };
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that parquet_to_arrow_schema errors are silently discarded. Logging the error would aid debugging, though the fallback behavior (returning empty) is already documented as intentional. The impact is moderate since the silent failure is by design.

Low
Log metadata pre-warming failures

Pre-warming failures are silently ignored with let _ =. If metadata loading fails
for a file, subsequent operations may trigger expensive page index fetches. Log
failures to help diagnose pre-warming issues.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [291-299]

 for meta in shard_view.object_metas.as_ref() {
-    let _ = crate::indexed_table::parquet_bridge::load_parquet_metadata(
+    if let Err(e) = crate::indexed_table::parquet_bridge::load_parquet_metadata(
         Arc::clone(&shard_view.store),
         &meta.location,
         Arc::clone(&metadata_cache),
     )
-    .await;
+    .await
+    {
+        log::warn!("Failed to pre-warm metadata for {}: {}", meta.location, e);
+    }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that pre-warming failures are silently ignored. Logging these failures would help diagnose issues, though the pre-warming is an optimization and failures may not be critical to overall functionality.

Low
Log scoped page index failures

The load_scoped_page_index_cols function can fail silently when returning None,
leaving segment.metadata unchanged. Consider logging when the scoped page index load
fails to help diagnose issues with page-level pruning.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs [984-994]

-if let Some(augmented) = crate::parquet_page_cache::load_scoped_page_index_cols(
+match crate::parquet_page_cache::load_scoped_page_index_cols(
     &store,
     &segment.object_path,
     &segment.metadata,
     &parquet_cols,
     &offset_cols,
 )
 .await
 {
-    segment.metadata = augmented;
+    Some(augmented) => segment.metadata = augmented,
+    None => log::debug!("Failed to load scoped page index for {}", segment.object_path),
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion adds logging when load_scoped_page_index_cols returns None. While helpful for debugging, the None case may be expected behavior (e.g., no page index available), so logging at debug level is appropriate but not critical.

Low

Previous suggestions

Suggestions up to commit 709d000
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add runtime bounds validation

The debug_assert! only validates bounds in debug builds. In production, an
out-of-bounds access would panic. Add a runtime bounds check before indexing to
prevent potential panics when processing malformed or unexpected metadata.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [310-324]

 let built = build_column_index_cells(store, location, footer_meta, &missing_col_rg_matrix).await?;
 for cell in built {
-    debug_assert!(
-        cell.rg < col_index_matrix.len() && cell.col < col_index_matrix[cell.rg].len(),
-        "cell ({}, {}) out of matrix bounds ({num_rgs} rgs, {num_cols} cols)",
-        cell.col, cell.rg,
-    );
+    if cell.rg >= col_index_matrix.len() || cell.col >= col_index_matrix[cell.rg].len() {
+        continue;
+    }
     COLUMN_INDEX_CACHE.insert(
         CiCellKey { path: path.clone(), col: cell.col, rg: cell.rg },
         cell.data.clone(),
         cell.size,
     );
     col_index_matrix[cell.rg][cell.col] = cell.data;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that debug_assert! is stripped in release builds, leaving a potential panic path. Adding a runtime check prevents crashes from malformed metadata, though the likelihood is low given the controlled data flow.

Medium
Handle lock failure in put

If the lock acquisition fails, the memory tracking state becomes inconsistent with
the actual cache contents. This can lead to incorrect eviction decisions and memory
accounting errors. Handle the lock failure explicitly to maintain consistency.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/statistics_cache.rs [386-394]

-if let Ok(mut state) = self.memory_state.lock() {
-    if let Some(old_size) = state.tracker.get(&key) {
-        state.total = state.total.saturating_sub(*old_size);
-    }
-    state.tracker.insert(key.clone(), memory_size);
-    state.total += memory_size;
+let mut state = self.memory_state.lock().map_err(|e| {
+    self.inner_cache.remove(k);
+    return None;
+}).ok()?;
+if let Some(old_size) = state.tracker.get(&key) {
+    state.total = state.total.saturating_sub(*old_size);
 }
+state.tracker.insert(key.clone(), memory_size);
+state.total += memory_size;
Suggestion importance[1-10]: 6

__

Why: The suggestion addresses a real consistency risk: if memory_state.lock() fails after inner_cache.insert, the cache and tracker diverge. However, the proposed fix changes the method signature (returns Option) which breaks the CacheAccessor trait contract. A better approach would log the error and continue, accepting temporary inconsistency.

Low
General
Validate buffer sizes match ranges

The code assumes that a length mismatch is the only failure mode, but partial reads
or corrupted data could return the correct count with invalid content. Validate that
each buffer is non-empty and matches the expected range size to catch data
corruption early.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [373-376]

 let buffers = store.get_ranges(location, &fetch_ranges).await.ok()?;
 if buffers.len() != fetch_ranges.len() {
     return None;
 }
+for (buf, range) in buffers.iter().zip(fetch_ranges.iter()) {
+    let expected_len = (range.end - range.start) as usize;
+    if buf.len() != expected_len {
+        return None;
+    }
+}
Suggestion importance[1-10]: 5

__

Why: The suggestion adds a defense-in-depth check for partial reads or data corruption. While get_ranges is expected to return exact-length buffers, validating this assumption improves robustness. The impact is moderate since the existing length check already catches most issues.

Low
Use consistent eviction threshold

The hardcoded 0.6 multiplier for target eviction size is inconsistent with the
configurable eviction_threshold. This can cause unexpected eviction behavior. Use a
derived value based on eviction_threshold to maintain consistency with the
configured policy.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/statistics_cache.rs [364-377]

 let eviction_candidates = {
     let size_limit = self.size_limit.load(Ordering::Relaxed);
     let threshold = (size_limit as f64 * self.eviction_threshold) as usize;
     if current_size + memory_size > threshold {
-        let target_eviction = (current_size + memory_size) - (size_limit as f64 * 0.6) as usize;
+        let target_size = (size_limit as f64 * (self.eviction_threshold - 0.2).max(0.5)) as usize;
+        let target_eviction = (current_size + memory_size).saturating_sub(target_size);
         if let Ok(policy_guard) = self.policy.lock() {
             policy_guard.select_for_eviction(target_eviction)
         } else { vec![] }
     } else { vec![] }
 };
Suggestion importance[1-10]: 4

__

Why: The hardcoded 0.6 multiplier is indeed inconsistent with the configurable eviction_threshold, but the suggested formula (threshold - 0.2).max(0.5) is arbitrary and not clearly better. The issue is valid but the fix needs more justification or should derive the target from a documented policy.

Low
Suggestions up to commit aae6832
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition in prefix eviction

The evict_by_prefix method collects all matching keys before removing them, which
can cause a race condition. Between collecting victims and removing them, other
threads may insert new entries with the same prefix or remove existing ones, leading
to inconsistent state or missed evictions. Consider iterating and removing in a
single pass or using a lock to ensure atomicity.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/cache_store.rs [166-183]

 pub(super) fn evict_by_prefix(&self, prefix: &str) {
-    let victims: Vec<String> = self.reverse
-        .iter()
-        .filter(|e| e.key().starts_with(prefix))
-        .map(|e| e.key().clone())
-        .collect();
-    for key_str in victims {
-        if let Some((_, typed_key)) = self.reverse.remove(&key_str) {
-            if let Some((_, (_, size))) = self.map.remove(&typed_key) {
+    self.reverse.retain(|key_str, typed_key| {
+        if key_str.starts_with(prefix) {
+            if let Some((_, (_, size))) = self.map.remove(typed_key) {
                 self.used_bytes.fetch_sub(size, Relaxed);
                 self.evictions.fetch_add(1, Relaxed);
                 if let Ok(mut p) = self.policy.lock() {
-                    p.on_remove(&key_str);
+                    p.on_remove(key_str);
                 }
             }
+            false
+        } else {
+            true
         }
-    }
+    });
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition where entries could be inserted/removed between collecting victims and evicting them. However, the proposed retain solution may not fully resolve the issue since self.map.remove is called separately, and the two maps (reverse and map) could still become inconsistent under concurrent access.

Medium
Update reverse map on entry replacement

When replacing an existing entry, the reverse map is not updated, leading to a stale
mapping from key_str to the old key instance. If the key's string representation is
identical but the key instance differs, the reverse map will point to the wrong key
object, breaking evict_by_prefix and policy-driven evictions.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/cache_store.rs [106-121]

 pub(super) fn insert(&self, key: K, value: V, size: usize) {
     let limit = self.limit.load(Relaxed);
     if size > limit {
         return;
     }
     let key_str = key.to_string();
     if let Some(old) = self.map.insert(key.clone(), (value, size)) {
         self.used_bytes.fetch_sub(old.1, Relaxed);
     }
-    self.reverse.insert(key_str.clone(), key);
+    self.reverse.insert(key_str.clone(), key.clone());
     self.used_bytes.fetch_add(size, Relaxed);
     if let Ok(mut p) = self.policy.lock() {
         p.on_insert(&key_str, size);
     }
     self.evict();
 }
Suggestion importance[1-10]: 2

__

Why: The suggestion misunderstands the code. Line 115 already updates the reverse map with self.reverse.insert(key_str.clone(), key), which replaces any existing mapping. The improved_code is identical to the existing_code at line 115, making this a non-issue.

Low
General
Handle lock poisoning explicitly

Handle lock poisoning explicitly instead of silently returning 0. A poisoned lock
indicates a panic occurred while holding the lock, which should be logged or
propagated rather than masked.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/statistics_cache.rs [364-366]

 let current_size = self.memory_state.lock()
     .map(|s| s.total)
-    .unwrap_or(0);
+    .unwrap_or_else(|e| {
+        log::error!("Memory state lock poisoned: {}", e);
+        0
+    });
Suggestion importance[1-10]: 6

__

Why: Explicitly handling lock poisoning with logging is a good practice for diagnosing issues. The current unwrap_or(0) silently masks a serious error condition (a panic while holding the lock), which could hide bugs. Logging the error improves debuggability.

Low
Pre-allocate outer vector capacity

Pre-allocate the outer vector with known capacity before collecting to reduce
allocations. The nested vectors are already collected efficiently, but the outer
vector can benefit from with_capacity.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [291-294]

-let mut col_index_matrix: ParquetColumnIndex = (0..num_rgs)
-    .map(|_| (0..num_cols).map(|_| ColumnIndexMetaData::NONE).collect())
-    .collect();
+let mut col_index_matrix: ParquetColumnIndex = {
+    let mut matrix = Vec::with_capacity(num_rgs);
+    for _ in 0..num_rgs {
+        matrix.push((0..num_cols).map(|_| ColumnIndexMetaData::NONE).collect());
+    }
+    matrix
+};
Suggestion importance[1-10]: 5

__

Why: Pre-allocating the outer vector capacity is a valid micro-optimization that reduces allocations when num_rgs is known. However, the impact is minor since the nested collections dominate the allocation cost, and the existing code is already clear and idiomatic.

Low
Pre-allocate matrix vector capacity

Pre-allocate the matrix vector with known capacity to avoid reallocation during
collection. This is especially beneficial when num_rgs is large.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [468-473]

-let mut matrix: ParquetOffsetIndex = (0..num_rgs)
-    .map(|rg| {
+let mut matrix: ParquetOffsetIndex = {
+    let mut m = Vec::with_capacity(num_rgs);
+    for rg in 0..num_rgs {
         let ph = placeholder_for(rg);
-        vec![ph; num_cols]
-    })
-    .collect();
+        m.push(vec![ph; num_cols]);
+    }
+    m
+};
Suggestion importance[1-10]: 5

__

Why: Similar to suggestion 1, pre-allocating the matrix capacity is a valid optimization that avoids reallocation. The benefit is modest and the existing iterator-based approach is idiomatic Rust, but the suggestion is correct.

Low
Handle expression traversal errors properly

The function silently ignores errors from expr.apply() by discarding the result with
let _ =. If the expression traversal fails, predicate columns may be missed, leading
to incomplete page index loading and incorrect query results. Handle or log the
error to ensure all predicate columns are collected.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs [319-338]

 fn collect_predicate_column_names(
     extraction: Option<&ExtractionResult>,
     schema: &SchemaRef,
 ) -> Vec<String> {
     let Some(e) = extraction else { return vec![] };
     let mut exprs = Vec::new();
     collect_predicate_exprs(&e.tree, &mut exprs);
     let mut names = HashSet::new();
     for expr in &exprs {
-        let _ = expr.apply(|node| {
+        if let Err(e) = expr.apply(|node| {
             if let Some(col) = node.downcast_ref::<Column>() {
                 if let Some(field) = schema.fields().get(col.index()) {
                     names.insert(field.name().to_string());
                 }
             }
             Ok(TreeNodeRecursion::Continue)
-        });
+        }) {
+            log::warn!("Failed to traverse expression for predicate columns: {}", e);
+        }
     }
     names.into_iter().collect()
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that errors from expr.apply() are silently ignored. While logging errors would improve observability, the impact is moderate since the function already handles the None case gracefully and returns an empty vector, preventing crashes but potentially missing columns.

Low
Avoid string allocation in extension check

Avoid allocating a new lowercase string for every file path check. Use ends_with
with case-insensitive comparison or check the extension directly without allocation.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/custom_manager.rs [344-347]

 fn metadata_cache_put(&self, file_path: &str, rt_handle: &tokio::runtime::Handle) -> Result<bool, String> {
-    if !file_path.to_lowercase().ends_with(".parquet") {
+    if !file_path.ends_with(".parquet") && !file_path.ends_with(".PARQUET") {
         return Ok(false);
     }
     ...
Suggestion importance[1-10]: 4

__

Why: Avoiding the to_lowercase() allocation is a valid micro-optimization. However, the suggested approach (checking both .parquet and .PARQUET) is less robust than case-insensitive comparison and doesn't handle mixed-case extensions like .Parquet. A better alternative would be using a case-insensitive comparison method.

Low
Suggestions up to commit 1d91513
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix reverse map memory leak

The reverse map is inserted unconditionally even when replacing an existing key,
causing a memory leak. When map.insert returns Some(old), the old key already exists
in reverse, so inserting again creates a duplicate entry that is never cleaned up.
Only insert into reverse when the key is new.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/cache_store.rs [106-121]

 pub(super) fn insert(&self, key: K, value: V, size: usize) {
     let limit = self.limit.load(Relaxed);
     if size > limit {
         return;
     }
     let key_str = key.to_string();
-    if let Some(old) = self.map.insert(key.clone(), (value, size)) {
+    let is_new = if let Some(old) = self.map.insert(key.clone(), (value, size)) {
         self.used_bytes.fetch_sub(old.1, Relaxed);
+        false
+    } else {
+        true
+    };
+    if is_new {
+        self.reverse.insert(key_str.clone(), key);
     }
-    self.reverse.insert(key_str.clone(), key);
     self.used_bytes.fetch_add(size, Relaxed);
     if let Ok(mut p) = self.policy.lock() {
         p.on_insert(&key_str, size);
     }
     self.evict();
 }
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical memory leak where the reverse map accumulates duplicate entries on key updates. The reverse map should only be populated for new keys, not when replacing existing ones, as the old key-to-typed-key mapping already exists.

High
Prevent potential deadlock in get

The on_access call holds the policy lock while the DashMap entry guard is still
held, creating potential for deadlock if eviction (which locks policy then accesses
map) runs concurrently. Clone the value and drop the entry guard before calling
on_access to ensure lock ordering is always map→policy.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/cache_store.rs [89-104]

 pub(super) fn get(&self, key: &K) -> Option<V> {
-    match self.map.get(key) {
-        Some(entry) => {
-            let size = entry.1;
+    let result = self.map.get(key).map(|entry| {
+        let value = entry.0.clone();
+        let size = entry.1;
+        (value, size)
+    });
+    match result {
+        Some((value, size)) => {
             if let Ok(mut p) = self.policy.lock() {
                 p.on_access(&key.to_string(), size);
             }
             self.hits.fetch_add(1, Relaxed);
-            Some(entry.0.clone())
+            Some(value)
         }
         None => {
             self.misses.fetch_add(1, Relaxed);
             None
         }
     }
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion identifies a valid deadlock risk where holding the DashMap entry guard while acquiring the policy lock violates the map→policy lock ordering used in eviction. Dropping the guard before on_access prevents this potential deadlock.

Medium
Add runtime bounds validation

The bounds check is a debug assertion that won't run in release builds. If
build_column_index_cells returns cells with out-of-bounds indices, production code
will panic on the array access. Add a runtime bounds check before indexing to
prevent potential panics in release mode.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [310-323]

 let built = build_column_index_cells(store, location, footer_meta, &missing_col_rg_matrix).await?;
 for cell in built {
-    debug_assert!(
-        cell.rg < col_index_matrix.len() && cell.col < col_index_matrix[cell.rg].len(),
-        "cell ({}, {}) out of matrix bounds ({num_rgs} rgs, {num_cols} cols)",
-        cell.col, cell.rg,
-    );
+    if cell.rg >= col_index_matrix.len() || cell.col >= col_index_matrix[cell.rg].len() {
+        continue;
+    }
     COLUMN_INDEX_CACHE.insert(
         CiCellKey { path: path.clone(), col: cell.col, rg: cell.rg },
         cell.data.clone(),
         cell.size,
     );
     col_index_matrix[cell.rg][cell.col] = cell.data;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that debug_assert! is stripped in release builds, leaving a potential panic on out-of-bounds access. Adding a runtime check (or returning an error) prevents production panics. However, the suggested continue silently skips invalid cells, which may mask bugs. A better fix would log or return an error, but the core issue is valid.

Medium
General
Fix inconsistent eviction threshold calculation

The hardcoded 0.6 multiplier for target eviction size is inconsistent with the
configurable eviction_threshold. This creates unpredictable eviction behavior where
the target size doesn't align with the threshold that triggered eviction. Use a
consistent calculation based on eviction_threshold to ensure predictable cache
behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/statistics_cache.rs [369-377]

 let eviction_candidates = {
     let size_limit = self.size_limit.load(Ordering::Relaxed);
     let threshold = (size_limit as f64 * self.eviction_threshold) as usize;
     if current_size + memory_size > threshold {
-        let target_eviction = (current_size + memory_size) - (size_limit as f64 * 0.6) as usize;
+        let target_size = (size_limit as f64 * self.eviction_threshold * 0.9) as usize;
+        let target_eviction = (current_size + memory_size).saturating_sub(target_size);
         if let Ok(policy_guard) = self.policy.lock() {
             policy_guard.select_for_eviction(target_eviction)
         } else { vec![] }
     } else { vec![] }
 };
Suggestion importance[1-10]: 6

__

Why: The hardcoded 0.6 multiplier is indeed inconsistent with the configurable eviction_threshold (default 0.8). The suggestion to derive the target from eviction_threshold improves consistency. However, the proposed 0.9 factor is arbitrary and not explained; a clearer formula (e.g., eviction_threshold - 0.1) would be better. The issue is real but the fix could be clearer.

Low
Log partial range fetch failures

When get_ranges returns fewer buffers than requested ranges, the function silently
returns None and falls back to footer-only metadata. This masks partial failures
where some ranges succeed but others fail. Consider logging the mismatch to aid
debugging and help identify transient storage issues.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/page_index/page_index_io.rs [373-376]

 let buffers = store.get_ranges(location, &fetch_ranges).await.ok()?;
 if buffers.len() != fetch_ranges.len() {
+    log::warn!(
+        "get_ranges returned {} buffers for {} ranges at {}",
+        buffers.len(),
+        fetch_ranges.len(),
+        location
+    );
     return None;
 }
Suggestion importance[1-10]: 5

__

Why: Adding a warning when get_ranges returns fewer buffers than requested aids debugging transient storage issues. The suggestion is correct and low-risk. However, the impact is moderate (observability improvement, not a correctness fix), and the same pattern appears in build_offset_index_columns (line 541-544) but isn't addressed there.

Low
Handle expression traversal errors properly

The function silently ignores errors from expr.apply by discarding the result with
let _ =. If the expression tree traversal fails, columns may be silently omitted
from the result, leading to incomplete page index loading. Propagate or log errors
to ensure all predicate columns are captured.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs [319-338]

 fn collect_predicate_column_names(
     extraction: Option<&ExtractionResult>,
     schema: &SchemaRef,
 ) -> Vec<String> {
     let Some(e) = extraction else { return vec![] };
     let mut exprs = Vec::new();
     collect_predicate_exprs(&e.tree, &mut exprs);
     let mut names = HashSet::new();
     for expr in &exprs {
-        let _ = expr.apply(|node| {
+        if let Err(e) = expr.apply(|node| {
             if let Some(col) = node.downcast_ref::<Column>() {
                 if let Some(field) = schema.fields().get(col.index()) {
                     names.insert(field.name().to_string());
                 }
             }
             Ok(TreeNodeRecursion::Continue)
-        });
+        }) {
+            log::warn!("Failed to traverse expression for predicate columns: {}", e);
+        }
     }
     names.into_iter().collect()
 }
Suggestion importance[1-10]: 5

__

Why: While logging errors is generally good practice, silently ignoring traversal errors is unlikely to cause issues in practice since TreeNodeRecursion::Continue errors are rare. The improvement is minor and primarily aids debugging rather than fixing a critical bug.

Low
Parallelize batch file cache operations

The add_files method processes files sequentially with blocking async operations via
rt_handle.block_on. For large file batches, this creates unnecessary latency.
Process files concurrently using tokio::spawn or futures::join_all to parallelize
cache warming and reduce total wall-clock time.

sandbox/plugins/analytics-backend-datafusion/rust/src/cache/custom_manager.rs [109-154]

 pub fn add_files(&self, file_paths: &[String], rt_handle: &tokio::runtime::Handle) -> Result<Vec<(String, bool)>, String> {
-    let mut results = Vec::new();
-
-    for file_path in file_paths {
-        let mut any_success = false;
-        let mut errors = Vec::new();
-
-        // Add to metadata cache
-        match self.metadata_cache_put(file_path, rt_handle) {
-            Ok(true) => {
-                any_success = true;
+    rt_handle.block_on(async {
+        let tasks: Vec<_> = file_paths.iter().map(|path| {
+            let path = path.clone();
+            let cache = self.clone();
+            async move {
+                // Process metadata and statistics concurrently per file
+                (path, cache.add_single_file_async(&path).await)
             }
-            ...
-        }
-        ...
-    }
-    Ok(results)
+        }).collect();
+        let results = futures::future::join_all(tasks).await;
+        Ok(results.into_iter().map(|(p, r)| (p, r.is_ok())).collect())
+    })
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion to parallelize add_files is a valid performance improvement for large batches. However, the improved_code is incomplete (references a non-existent add_single_file_async method and self.clone() on a non-Clone type) and doesn't handle error aggregation correctly. The idea is sound but the implementation sketch is flawed, limiting its immediate applicability.

Low

@bharath-techie bharath-techie force-pushed the core-page-index-cache branch from 1d91513 to aae6832 Compare June 20, 2026 18:17
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit aae6832

Signed-off-by: G <bharath78910@gmail.com>
@bharath-techie bharath-techie force-pushed the core-page-index-cache branch from aae6832 to 709d000 Compare June 20, 2026 18:28
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 709d000

/// request, so cells are shared across paths → cross-path sharing.
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub(crate) struct CiCellKey {
pub(crate) path: Arc<str>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this absolute path?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its store relative path - but key here is arc - its just a reference to the object_path

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be an issue when Warm Nodes refer to the files locally v/s in S3 ? Or is it abstracted away by S3 Filesystem ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indexed path:
  Java passes file paths via ShardView.object_metas (ObjectMeta[])
    ↓
  build_segments() — reads meta.location for:
    - load_parquet_metadata()         → metadata cache key
    - segment.object_path             → stored on SegmentFileInfo
    ↓ 
  SegmentFileInfo.object_path used by:
    - load_scoped_page_index_cols()   → page index cache key (Arc<str>)
    - CachedMetadataReaderFactory     → passed to ParquetSource for actual data reads
    - IndexedExec.object_path         → passed to RowGroupStreamConfig for parquet IO
  
  Listing path:
  DataFusion list_files_cache (pre-populated from shard_view.object_metas)
    ↓
  ListingTable.list_files_for_scan() → PartitionedFile.object_meta.location
    ↓
  Same location used by:
    - DFParquetMetadata.fetch_metadata()  → metadata cache key
    - ScopedPageIndexReader.location      → page index cache key (Arc<str>)
    - CachedParquetFileReader.location    → actual parquet data reads
    - ParquetFileMetrics                  → metrics labeling

So for overall query we are already using the file location, here just reusing with an arc reference.

/// read path. Counters are atomics so `stats()` is always lock-free.
pub(super) struct BoundedCache<K, V>
where
K: Eq + Hash + Clone + Display + Send + Sync + 'static,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Hash?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required by dashmap - needs both eq and hash for keys.

Ok(s) => Arc::new(s),
// If we can't derive the file schema, fall back to the union schema; the
// caller still falls back to footer-only on any downstream mismatch.
Err(_) => return resolve_with_schema(_arrow_schema, metadata, predicate_column_names),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we resolving with union here? Isn't that wrong?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is followup for resolve in general, we can handle it there.

pub fn resolve_predicate_parquet_columns_pair(
union_schema: &SchemaRef,
metadata: &ParquetMetaData,
names_a: &[String],

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be named better

}
// Same fallback as the single-name path: resolve against the union schema.
Err(_) => (
resolve_with_schema(union_schema, metadata, names_a),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure again if this is right, let's throw a warning log atleast?

//! index (no per-page string stats). Built for **all row groups** (an empty
//! OffsetIndex on a row group DataFusion scans panics / breaks reads, and
//! DataFusion chooses the scanned set itself, after our load — see
//! HANDOFF_step2_rg_scoping.md §1e).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please give reference of HANDOFF_step2 md file here? or update the comment?

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 709d000: SUCCESS

@codecov

codecov Bot commented Jun 20, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.39%. Comparing base (4cc80cf) to head (4483eae).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22254      +/-   ##
============================================
+ Coverage     73.35%   73.39%   +0.04%     
- Complexity    75937    75946       +9     
============================================
  Files          6071     6071              
  Lines        344993   344993              
  Branches      49638    49638              
============================================
+ Hits         253080   253221     +141     
+ Misses        71710    71568     -142     
- Partials      20203    20204       +1     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


// Process-global caches

pub(crate) static COLUMN_INDEX_CACHE: Lazy<BoundedCache<CiCellKey, ColumnIndexMetaData>> =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tie them with the global runTime rather?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a custom cache right? Global runtime doesn't have support for caches other than the list files, stats and metadata cache ?

arrow_schema: &SchemaRef,
predicate: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
) -> Vec<usize> {
use arrow::array::{ArrayRef, BooleanArray, UInt64Array};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can move them up?

set.into_iter().filter(|&c| c < num_cols).collect()
}
};
if off_cols.is_empty() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we returning here? Do we not have to place the placeholders?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None falls back to footer only. This only happens when there are technically no columns in parquet which shouldn't happen at all.

}

/// Union of `offset_index` byte ranges across the given column chunks.
fn offset_index_union(chunks: &[ColumnChunkMetaData]) -> Option<Range<u64>> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both column_index_union and this one looks same

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column_index_offset vs offset_index_offset but otherwise the same only

fetch_ranges.push(range);
}

let buffers = store.get_ranges(location, &fetch_ranges).await.ok()?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we stich the query cancellation framework with this? Just want to understand in case the store is lagging or stuck, we cancel the query, how will it propagate to here? Or that is something already handled in the task managers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we either insert to cache or don't right ? There is no possibility of leak here

/// (fixed-width page offsets) is tiny, so they get separate, separately-tunable
/// limits rather than sharing one number.
///
/// TODO : configure via settings

@expani expani Jun 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC This is global for the node which is at par with DocValueSkipList OR any other Index file stored in OS File page cache for Vanilla OpenSearch. Can you record this in a comment explicitly ?

Signed-off-by: G <bharath78910@gmail.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4483eae

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 4483eae: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants