Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 0 additions & 183 deletions sandbox/plugins/analytics-backend-datafusion/rust/src/cache.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use std::sync::Arc;
use datafusion::execution::cache::cache_manager::{FileMetadataCache, FileStatisticsCache, CacheManagerConfig};
use datafusion::execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
use datafusion::execution::cache::CacheAccessor;
use crate::statistics_cache::compute_parquet_statistics;
use crate::cache::MutexFileMetadataCache;
use crate::statistics_cache::CustomStatisticsCache;
use crate::cache::statistics_cache::compute_parquet_statistics;
use crate::cache::metadata_cache::MutexFileMetadataCache;
use crate::cache::statistics_cache::CustomStatisticsCache;
use object_store::path::Path;
use object_store::ObjectMeta;
use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata;
use object_store::ObjectStore;
use log::{debug, error};
use crate::indexed_table::parquet_bridge;

/// Create ObjectMeta from a local file path.
fn create_object_meta_from_file(file_path: &str) -> Result<Vec<ObjectMeta>, datafusion::common::DataFusionError> {
Expand Down Expand Up @@ -229,14 +230,14 @@ impl CustomCacheManager {
/// Check if a file exists in a specific cache type
pub fn contains_file_by_type(&self, file_path: &str, cache_type: &str) -> bool {
match cache_type {
crate::cache::CACHE_TYPE_METADATA => {
crate::cache::metadata_cache::CACHE_TYPE_METADATA => {
let path = Path::from(file_path);
self.file_metadata_cache
.as_ref()
.and_then(|cache| cache.get(&path))
.is_some()
}
crate::cache::CACHE_TYPE_STATS => {
crate::cache::metadata_cache::CACHE_TYPE_STATS => {
self.statistics_cache
.as_ref()
.map_or(false, |cache| cache.contains_key(&Path::from(file_path)))
Expand Down Expand Up @@ -294,15 +295,15 @@ impl CustomCacheManager {
/// Clear specific cache type
pub fn clear_cache_type(&self, cache_type: &str) -> Result<(), String> {
match cache_type {
crate::cache::CACHE_TYPE_METADATA => {
crate::cache::metadata_cache::CACHE_TYPE_METADATA => {
if let Some(cache) = &self.file_metadata_cache {
cache.clear();
Ok(())
} else {
Err("No metadata cache configured".to_string())
}
}
crate::cache::CACHE_TYPE_STATS => {
crate::cache::metadata_cache::CACHE_TYPE_STATS => {
if let Some(cache) = &self.statistics_cache {
cache.clear();
Ok(())
Expand All @@ -317,7 +318,7 @@ impl CustomCacheManager {
/// Get memory consumed by specific cache type
pub fn get_memory_consumed_by_type(&self, cache_type: &str) -> Result<usize, String> {
match cache_type {
crate::cache::CACHE_TYPE_METADATA => {
crate::cache::metadata_cache::CACHE_TYPE_METADATA => {
if let Some(cache) = &self.file_metadata_cache {
if let Ok(cache_guard) = cache.inner.lock() {
Ok(cache_guard.memory_used())
Expand All @@ -328,7 +329,7 @@ impl CustomCacheManager {
Err("No metadata cache configured".to_string())
}
}
crate::cache::CACHE_TYPE_STATS => {
crate::cache::metadata_cache::CACHE_TYPE_STATS => {
if let Some(cache) = &self.statistics_cache {
Ok(cache.memory_consumed())
} else {
Expand All @@ -351,42 +352,20 @@ impl CustomCacheManager {
let object_meta = object_metas.first()
.ok_or_else(|| "No object metadata returned".to_string())?;

let store = Arc::new(object_store::local::LocalFileSystem::new());
let store: Arc<dyn ObjectStore> = Arc::new(object_store::local::LocalFileSystem::new());

// Get cache reference for DataFusion metadata loading
let cache_ref = self.file_metadata_cache.as_ref()
.ok_or_else(|| "No file metadata cache configured".to_string())?;
let metadata_cache = self.file_metadata_cache.as_ref()
.ok_or_else(|| "No file metadata cache configured".to_string())?
.clone() as Arc<dyn FileMetadataCache>;

let metadata_cache = cache_ref.clone() as Arc<dyn FileMetadataCache>;

// Use DataFusion's metadata loading by passing reference to file_metadata_cache to get complete metadata
// IMPORTANT: When a cache is provided to DFParquetMetadata, fetch_metadata() will:
// 1. Enable page index loading (with_page_indexes(true))
// 2. Load the complete metadata including column and offset indexes
// 3. Automatically put the metadata into the cache (lines 155-160 in datafusion's metadata.rs)
// This ensures we cache exactly what DataFusion would cache during query execution
let _parquet_metadata = rt_handle.block_on(async {
let df_metadata = DFParquetMetadata::new(store.as_ref(), object_meta)
.with_file_metadata_cache(Some(metadata_cache));

// fetch_metadata() performs the cache put operation internally
df_metadata.fetch_metadata().await
.map_err(|e| format!("Failed to fetch metadata: {}", e))
// Warm the level-1 metadata cache footer-only. `load_parquet_metadata`
// fetches with PageIndexPolicy::Skip — only footer bytes, no page index IO.
// On success the entry is in the cache; on failure the error propagates.
let location = object_meta.location.clone();
rt_handle.block_on(async {
parquet_bridge::load_parquet_metadata(store, &location, metadata_cache).await
})?;

// Verify the metadata was cached properly
match cache_ref.inner.lock() {
Ok(cache_guard) => {
let path = Path::from(file_path.to_string());
if cache_guard.contains_key(&path) {
Ok(true)
} else {
debug!("[CACHE ERROR] Failed to cache metadata for: {}", file_path);
Ok(false)
}
}
Err(e) => Err(format!("Failed to verify cache: {}", e))
}
Ok(true)
}

/// Compute and put statistics into cache
Expand Down
Loading
Loading