Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ jobs:
export RUST_MIN_STACK=20971520
export TPCH_DATA=`realpath datafusion/sqllogictest/test_files/tpch/data`
cargo test plan_q --package datafusion-benchmarks --profile ci --features=ci -- --test-threads=1
INCLUDE_TPCH=true cargo test --features backtrace,parquet_encryption,substrait --profile ci --package datafusion-sqllogictest --test sqllogictests
INCLUDE_TPCH=true cargo test --features backtrace,parquet_encryption,substrait,memory-accounting --profile ci --package datafusion-sqllogictest --test sqllogictests -- --default-pool-size-mb 16384
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can call it memory-profiler or something like that ?

- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down
12 changes: 11 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
//! help with allocation accounting.

use datafusion_common::{Result, internal_datafusion_err};
use datafusion_common::{Result, internal_datafusion_err, not_impl_err};
use std::any::Any;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -223,6 +223,16 @@ pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display {
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Unknown
}

/// Attempt to update this pool's limit in place to `new_limit` bytes.
///
/// Default impl returns `Err`. Callers that route through
/// [`crate::runtime_env::RuntimeEnvBuilder::with_memory_limit`] fall
/// back to replacing the pool wholesale on `Err`, preserving historical
/// behavior for pools that can't be resized in place.
fn try_resize(&self, _new_limit: usize) -> Result<()> {
not_impl_err!("{} does not support resize", self.name())
}
}

impl dyn MemoryPool {
Expand Down
68 changes: 62 additions & 6 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ impl Display for UnboundedMemoryPool {
/// This pool works well for queries that do not need to spill or have
/// a single spillable operator. See [`FairSpillPool`] if there are
/// multiple spillable operators that all will spill.
///
/// Supports [`MemoryPool::try_resize`] for in-place limit adjustment, so
/// callers routing through
/// [`RuntimeEnvBuilder::with_memory_limit`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
/// can keep the existing pool (and any wrappers around it) rather than
/// replacing it on every change.
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
pool_size: AtomicUsize,
used: AtomicUsize,
}

Expand All @@ -84,7 +90,7 @@ impl GreedyMemoryPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
pool_size: AtomicUsize::new(pool_size),
used: AtomicUsize::new(0),
}
}
Expand All @@ -104,16 +110,17 @@ impl MemoryPool for GreedyMemoryPool {
}

fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let pool_size = self.pool_size.load(Ordering::Relaxed);
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
(new_used <= pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
pool_size.saturating_sub(used),
self,
)
})?;
Expand All @@ -125,19 +132,25 @@ impl MemoryPool for GreedyMemoryPool {
}

fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
MemoryLimit::Finite(self.pool_size.load(Ordering::Relaxed))
}

fn try_resize(&self, new_limit: usize) -> Result<()> {
self.pool_size.store(new_limit, Ordering::Relaxed);
Ok(())
}
}

impl Display for GreedyMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let used = self.used.load(Ordering::Relaxed);
let pool_size = self.pool_size.load(Ordering::Relaxed);
write!(
f,
"{}(used: {}, pool_size: {})",
&self.name(),
human_readable_size(used),
human_readable_size(self.pool_size)
human_readable_size(pool_size)
)
}
}
Expand Down Expand Up @@ -600,6 +613,10 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
fn memory_limit(&self) -> MemoryLimit {
self.inner.memory_limit()
}

fn try_resize(&self, new_limit: usize) -> Result<()> {
self.inner.try_resize(new_limit)
}
}

fn provide_top_memory_consumers_to_error_msg(
Expand Down Expand Up @@ -1046,4 +1063,43 @@ mod tests {
"TrackConsumersPool<UnboundedMemoryPool> Display"
);
}

#[test]
fn test_greedy_try_resize_in_place() {
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
let r = MemoryConsumer::new("r").register(&pool);

// Fill the pool, then verify it rejects further growth.
r.try_grow(100).unwrap();
r.try_grow(1).unwrap_err();

// Resize *up*: previously-rejected growth now succeeds.
pool.try_resize(200).unwrap();
assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(200)));
r.try_grow(50).unwrap();
assert_eq!(pool.reserved(), 150);

// Resize *down* below current usage: subsequent grows fail because
// reserved (150) already exceeds the new limit (120). Already-issued
// reservations are not retroactively shrunk.
pool.try_resize(120).unwrap();
assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(120)));
r.try_grow(1).unwrap_err();
}

#[test]
fn test_track_consumers_try_resize_forwards() {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
pool.try_resize(500).unwrap();
assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(500)));
}

#[test]
fn test_unbounded_try_resize_returns_err() {
let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
assert!(pool.try_resize(100).is_err());
}
}
60 changes: 58 additions & 2 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,23 @@ impl RuntimeEnvBuilder {
/// Specify the total memory to use while running the DataFusion
/// plan to `max_memory * memory_fraction` in bytes.
///
/// This defaults to using [`GreedyMemoryPool`] wrapped in the
/// [`TrackConsumersPool`] with a maximum of 5 consumers.
/// If a memory pool is already configured on this builder, this first
/// attempts to resize it in place via [`MemoryPool::try_resize`]. Pools
/// that support resize (e.g. [`GreedyMemoryPool`]) keep their identity
/// — useful for any wrapper that needs to observe limit changes (e.g.
/// to retune external accounting). Pools whose [`MemoryPool::try_resize`]
/// returns `Err` (the default) fall back to wholesale replacement
/// with a [`TrackConsumersPool`]-wrapped [`GreedyMemoryPool`] (top 5
/// consumers), preserving the historical behavior.
///
/// Note DataFusion does not yet respect this limit in all cases.
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
let pool_size = (max_memory as f64 * memory_fraction) as usize;
if let Some(existing) = &self.memory_pool
&& existing.try_resize(pool_size).is_ok()
{
return self;
}
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(pool_size),
NonZeroUsize::new(5).unwrap(),
Expand Down Expand Up @@ -562,3 +573,48 @@ impl RuntimeEnvBuilder {
docs
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::memory_pool::{GreedyMemoryPool, MemoryLimit, UnboundedMemoryPool};

#[test]
fn with_memory_limit_resizes_in_place_when_pool_supports_it() {
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
let pool_ptr = Arc::as_ptr(&pool);

let env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::clone(&pool))
.with_memory_limit(500, 1.0)
.build()
.unwrap();

// Same Arc as before — wrapper-or-other-resize-capable pools survive.
assert!(std::ptr::eq(Arc::as_ptr(&env.memory_pool), pool_ptr));
assert!(matches!(
env.memory_pool.memory_limit(),
MemoryLimit::Finite(500)
));
}

#[test]
fn with_memory_limit_falls_back_to_replace_when_resize_unsupported() {
let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
let pool_ptr = Arc::as_ptr(&pool);

let env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::clone(&pool))
.with_memory_limit(500, 1.0)
.build()
.unwrap();

// Different Arc — wholesale replacement happened because Unbounded's
// default `try_resize` returns Err.
assert!(!std::ptr::eq(Arc::as_ptr(&env.memory_pool), pool_ptr));
assert!(matches!(
env.memory_pool.memory_limit(),
MemoryLimit::Finite(500)
));
}
}
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ tokio-postgres = { version = "0.7.17", optional = true }
[features]
avro = ["datafusion/avro"]
backtrace = ["datafusion/backtrace"]
# Enable the `AccountingAllocator` `GlobalAlloc` wrapper and its thread-local
# byte counter. The binary still has to declare `#[global_allocator]` for it
# to actually take effect — building with this feature on alone is harmless.
memory-accounting = []
postgres = [
"bytes",
"chrono",
Expand Down
29 changes: 29 additions & 0 deletions datafusion/sqllogictest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,35 @@ For focusing on one specific failing test, a file:line filter can be used:
cargo test --test sqllogictests -- --substrait-round-trip binary.slt:23
```

## Running tests: allocator-level memory accounting

Build with `--features memory-accounting` to install a global allocator
wrapper that tracks actual bytes allocated per SLT file and reconciles them
against DataFusion's voluntary `MemoryPool` tracking. The point isn't to
enforce a process-wide budget — it's to catch DataFusion lying about how
much memory it's using. If `MemoryPool` reports 1 MB while the allocator
sees 100 MB go by, _that gap is the bug_.

```shell
cargo test --features memory-accounting --test sqllogictests -- \
--default-pool-size-mb 16384
```

`--default-pool-size-mb` seeds each per-file SLT context's MemoryPool with
the given size in MB and arms the bank as a no-op until a test opts in.

**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'` at the top of the `.slt`. The wrapping `AccountingMemoryPool` then
tightens its allocator-level bank to `N * 1.10` (10% headroom). If the test
allocates more than that — including bytes DataFusion's tracker didn't see
— the test panics with an `OverdraftPanic` reporting the actual balance at
panic time. SLTs without a `SET` of `memory_limit` see no change in
behavior; the bank stays loose and `SHOW ALL` continues to render the limit
as `unlimited`.

Inside the runner each file gets its own multi-thread Tokio runtime so
context-ids stamped onto worker threads stay stable for the allocator
hook, and per-file accounts in the bank are isolated from each other.

## `.slt` file format

[`sqllogictest`] was originally written for SQLite to verify the
Expand Down
71 changes: 67 additions & 4 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(feature = "memory-accounting")]
#[global_allocator]
static GLOBAL: datafusion_sqllogictest::AccountingAllocator =
datafusion_sqllogictest::AccountingAllocator::system();

use clap::{ColorChoice, Parser};
use datafusion::common::instant::Instant;
use datafusion::common::utils::get_available_parallelism;
Expand Down Expand Up @@ -137,6 +142,19 @@ async fn run_tests() -> Result<()> {

options.warn_on_ignored();

#[cfg(feature = "memory-accounting")]
if let Some(pool_mb) = options.default_pool_size_mb {
let pool_bytes = pool_mb.saturating_mul(1024 * 1024);
// Same value drives the inner MemoryPool's size and the bank's
// default budget. The wrapper renders this value as `unlimited` in
// `SHOW ALL` (sentinel for "no SET has happened"); once a test
// calls `SET datafusion.runtime.memory_limit`, the wrapper retunes
// the bank to that limit + 10% headroom.
datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes);
datafusion_sqllogictest::set_default_budget(pool_bytes as isize);
log::info!("memory-accounting on: default pool size = {pool_mb} MB");
}

// Print parallelism info for debugging CI performance
eprintln!(
"Running with {} test threads (available parallelism: {})",
Expand Down Expand Up @@ -209,7 +227,7 @@ async fn run_tests() -> Result<()> {
let currently_running_sql_tracker_clone =
currently_running_sql_tracker.clone();
let file_start = Instant::now();
SpawnedTask::spawn(async move {
let body = async move {
let result = match (
options.postgres_runner,
options.complete,
Expand Down Expand Up @@ -282,9 +300,41 @@ async fn run_tests() -> Result<()> {
}

(result, elapsed)
})
.join()
.map(move |result| {
};
// Each file gets its own multi-thread runtime so a stable per-file
// context-id (stamped via `on_thread_start`) is readable from the
// global allocator hook. Bank accounting and SET-driven limit
// retuning will key off this id in later steps. The outer
// orchestration runtime hosts this via `spawn_blocking` so its
// worker threads aren't blocked by the per-file `block_on`.
//
// Worker count matches `SLT_TARGET_PARTITIONS` so a query's
// partition streams each get a worker rather than contending.
#[cfg(feature = "memory-accounting")]
let spawned = {
let context_id = datafusion_sqllogictest::next_context_id();
SpawnedTask::spawn_blocking(move || {
// Stamp this thread too — `block_on` polls `body` here, so
// statements that don't suspend (e.g. `SET memory_limit`,
// pool construction) run on this thread, not a worker.
datafusion_sqllogictest::set_thread_context_id(context_id);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(datafusion_sqllogictest::SLT_TARGET_PARTITIONS)
.thread_name(format!("slt-file-{context_id}"))
.on_thread_start(move || {
datafusion_sqllogictest::set_thread_context_id(context_id);
})
.build()
.expect("build per-file Tokio runtime");
let out = runtime.block_on(body);
runtime.shutdown_background();
out
})
};
#[cfg(not(feature = "memory-accounting"))]
let spawned = SpawnedTask::spawn(body);
spawned.join().map(move |result| {
let elapsed = match &result {
Ok((_, elapsed)) => *elapsed,
Err(_) => Duration::ZERO,
Expand Down Expand Up @@ -910,6 +960,19 @@ struct Options {
default_value_t = ColorChoice::Auto
)]
color: ColorChoice,

#[clap(
long,
help = "Default MemoryPool size in MB for each per-file SLT context. \
The pool is wrapped in AccountingMemoryPool, which doubles \
this value as the 'no SET has happened yet' sentinel — until \
an SLT calls `SET datafusion.runtime.memory_limit`, SHOW ALL \
renders the limit as 'unlimited' and the allocator bank \
stays loose. Once a test SETs a limit, the bank tightens to \
that limit + 10% headroom. Requires the memory-accounting \
feature; ignored without it."
)]
default_pool_size_mb: Option<usize>,
}

impl Options {
Expand Down
Loading
Loading