Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ jobs:
export RUST_MIN_STACK=20971520
export TPCH_DATA=`realpath datafusion/sqllogictest/test_files/tpch/data`
cargo test plan_q --package datafusion-benchmarks --profile ci --features=ci -- --test-threads=1
INCLUDE_TPCH=true cargo test --features backtrace,parquet_encryption,substrait --profile ci --package datafusion-sqllogictest --test sqllogictests
INCLUDE_TPCH=true cargo test --features backtrace,parquet_encryption,substrait,memory-accounting --profile ci --package datafusion-sqllogictest --test sqllogictests -- --default-pool-size-mb 16384
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can call it memory-profiler or something like that ?

- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down
12 changes: 11 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
//! help with allocation accounting.

use datafusion_common::{Result, internal_datafusion_err};
use datafusion_common::{Result, internal_datafusion_err, not_impl_err};
use std::any::Any;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -223,6 +223,16 @@ pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display {
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Unknown
}

/// Attempt to update this pool's limit in place to `new_limit` bytes.
///
/// Default impl returns `Err`. Callers that route through
/// [`crate::runtime_env::RuntimeEnvBuilder::with_memory_limit`] fall
/// back to replacing the pool wholesale on `Err`, preserving historical
/// behavior for pools that can't be resized in place.
fn try_resize(&self, _new_limit: usize) -> Result<()> {
not_impl_err!("{} does not support resize", self.name())
}
}

impl dyn MemoryPool {
Expand Down
68 changes: 62 additions & 6 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ impl Display for UnboundedMemoryPool {
/// This pool works well for queries that do not need to spill or have
/// a single spillable operator. See [`FairSpillPool`] if there are
/// multiple spillable operators that all will spill.
///
/// Supports [`MemoryPool::try_resize`] for in-place limit adjustment, so
/// callers routing through
/// [`RuntimeEnvBuilder::with_memory_limit`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
/// can keep the existing pool (and any wrappers around it) rather than
/// replacing it on every change.
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
pool_size: AtomicUsize,
used: AtomicUsize,
}

Expand All @@ -84,7 +90,7 @@ impl GreedyMemoryPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
pool_size: AtomicUsize::new(pool_size),
used: AtomicUsize::new(0),
}
}
Expand All @@ -104,16 +110,17 @@ impl MemoryPool for GreedyMemoryPool {
}

fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let pool_size = self.pool_size.load(Ordering::Relaxed);
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
(new_used <= pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
pool_size.saturating_sub(used),
self,
)
})?;
Expand All @@ -125,19 +132,25 @@ impl MemoryPool for GreedyMemoryPool {
}

fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
MemoryLimit::Finite(self.pool_size.load(Ordering::Relaxed))
}

fn try_resize(&self, new_limit: usize) -> Result<()> {
self.pool_size.store(new_limit, Ordering::Relaxed);
Ok(())
}
}

impl Display for GreedyMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let used = self.used.load(Ordering::Relaxed);
let pool_size = self.pool_size.load(Ordering::Relaxed);
write!(
f,
"{}(used: {}, pool_size: {})",
&self.name(),
human_readable_size(used),
human_readable_size(self.pool_size)
human_readable_size(pool_size)
)
}
}
Expand Down Expand Up @@ -600,6 +613,10 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
fn memory_limit(&self) -> MemoryLimit {
self.inner.memory_limit()
}

fn try_resize(&self, new_limit: usize) -> Result<()> {
self.inner.try_resize(new_limit)
}
}

fn provide_top_memory_consumers_to_error_msg(
Expand Down Expand Up @@ -1046,4 +1063,43 @@ mod tests {
"TrackConsumersPool<UnboundedMemoryPool> Display"
);
}

#[test]
fn test_greedy_try_resize_in_place() {
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
let r = MemoryConsumer::new("r").register(&pool);

// Fill the pool, then verify it rejects further growth.
r.try_grow(100).unwrap();
r.try_grow(1).unwrap_err();

// Resize *up*: previously-rejected growth now succeeds.
pool.try_resize(200).unwrap();
assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(200)));
r.try_grow(50).unwrap();
assert_eq!(pool.reserved(), 150);

// Resize *down* below current usage: subsequent grows fail because
// reserved (150) already exceeds the new limit (120). Already-issued
// reservations are not retroactively shrunk.
pool.try_resize(120).unwrap();
assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(120)));
r.try_grow(1).unwrap_err();
}

#[test]
fn test_track_consumers_try_resize_forwards() {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
pool.try_resize(500).unwrap();
assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(500)));
}

#[test]
fn test_unbounded_try_resize_returns_err() {
let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
assert!(pool.try_resize(100).is_err());
}
}
60 changes: 58 additions & 2 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,23 @@ impl RuntimeEnvBuilder {
/// Specify the total memory to use while running the DataFusion
/// plan to `max_memory * memory_fraction` in bytes.
///
/// This defaults to using [`GreedyMemoryPool`] wrapped in the
/// [`TrackConsumersPool`] with a maximum of 5 consumers.
/// If a memory pool is already configured on this builder, this first
/// attempts to resize it in place via [`MemoryPool::try_resize`]. Pools
/// that support resize (e.g. [`GreedyMemoryPool`]) keep their identity
/// — useful for any wrapper that needs to observe limit changes (e.g.
/// to retune external accounting). Pools whose [`MemoryPool::try_resize`]
/// returns `Err` (the default) fall back to wholesale replacement
/// with a [`TrackConsumersPool`]-wrapped [`GreedyMemoryPool`] (top 5
/// consumers), preserving the historical behavior.
///
/// Note DataFusion does not yet respect this limit in all cases.
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
let pool_size = (max_memory as f64 * memory_fraction) as usize;
if let Some(existing) = &self.memory_pool
&& existing.try_resize(pool_size).is_ok()
{
return self;
}
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(pool_size),
NonZeroUsize::new(5).unwrap(),
Expand Down Expand Up @@ -562,3 +573,48 @@ impl RuntimeEnvBuilder {
docs
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::memory_pool::{GreedyMemoryPool, MemoryLimit, UnboundedMemoryPool};

#[test]
fn with_memory_limit_resizes_in_place_when_pool_supports_it() {
let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
let pool_ptr = Arc::as_ptr(&pool);

let env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::clone(&pool))
.with_memory_limit(500, 1.0)
.build()
.unwrap();

// Same Arc as before — wrapper-or-other-resize-capable pools survive.
assert!(std::ptr::eq(Arc::as_ptr(&env.memory_pool), pool_ptr));
assert!(matches!(
env.memory_pool.memory_limit(),
MemoryLimit::Finite(500)
));
}

#[test]
fn with_memory_limit_falls_back_to_replace_when_resize_unsupported() {
let pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
let pool_ptr = Arc::as_ptr(&pool);

let env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::clone(&pool))
.with_memory_limit(500, 1.0)
.build()
.unwrap();

// Different Arc — wholesale replacement happened because Unbounded's
// default `try_resize` returns Err.
assert!(!std::ptr::eq(Arc::as_ptr(&env.memory_pool), pool_ptr));
assert!(matches!(
env.memory_pool.memory_limit(),
MemoryLimit::Finite(500)
));
}
}
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ tokio-postgres = { version = "0.7.17", optional = true }
[features]
avro = ["datafusion/avro"]
backtrace = ["datafusion/backtrace"]
# Enable the `AccountingAllocator` `GlobalAlloc` wrapper and its thread-local
# byte counter. The binary still has to declare `#[global_allocator]` for it
# to actually take effect — building with this feature on alone is harmless.
memory-accounting = []
postgres = [
"bytes",
"chrono",
Expand Down
29 changes: 29 additions & 0 deletions datafusion/sqllogictest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,35 @@ For focusing on one specific failing test, a file:line filter can be used:
cargo test --test sqllogictests -- --substrait-round-trip binary.slt:23
```

## Running tests: allocator-level memory accounting

Build with `--features memory-accounting` to install a global allocator
wrapper that tracks actual bytes allocated per SLT file and reconciles them
against DataFusion's voluntary `MemoryPool` tracking. The point isn't to
enforce a process-wide budget — it's to catch DataFusion lying about how
much memory it's using. If `MemoryPool` reports 1 MB while the allocator
sees 100 MB go by, _that gap is the bug_.

```shell
cargo test --features memory-accounting --test sqllogictests -- \
--default-pool-size-mb 16384
```

`--default-pool-size-mb` seeds each per-file SLT context's MemoryPool with
the given size in MB and arms the bank as a no-op until a test opts in.

**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'` at the top of the `.slt`. The wrapping `AccountingMemoryPool` then
tightens its allocator-level bank to `N * 1.10` (10% headroom). If the test
allocates more than that — including bytes DataFusion's tracker didn't see
— the test panics with an `OverdraftPanic` reporting the actual balance at
panic time. SLTs without a `SET` of `memory_limit` see no change in
behavior; the bank stays loose and `SHOW ALL` continues to render the limit
as `unlimited`.

Inside the runner each file gets its own multi-thread Tokio runtime so
context-ids stamped onto worker threads stay stable for the allocator
hook, and per-file accounts in the bank are isolated from each other.

## `.slt` file format

[`sqllogictest`] was originally written for SQLite to verify the
Expand Down
71 changes: 67 additions & 4 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(feature = "memory-accounting")]
#[global_allocator]
static GLOBAL: datafusion_sqllogictest::AccountingAllocator =
datafusion_sqllogictest::AccountingAllocator::system();

use clap::{ColorChoice, Parser};
use datafusion::common::instant::Instant;
use datafusion::common::utils::get_available_parallelism;
Expand Down Expand Up @@ -137,6 +142,19 @@ async fn run_tests() -> Result<()> {

options.warn_on_ignored();

#[cfg(feature = "memory-accounting")]
if let Some(pool_mb) = options.default_pool_size_mb {
let pool_bytes = pool_mb.saturating_mul(1024 * 1024);
// Same value drives the inner MemoryPool's size and the bank's
// default budget. The wrapper renders this value as `unlimited` in
// `SHOW ALL` (sentinel for "no SET has happened"); once a test
// calls `SET datafusion.runtime.memory_limit`, the wrapper retunes
// the bank to that limit + 10% headroom.
datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes);
datafusion_sqllogictest::set_default_budget(pool_bytes as isize);
log::info!("memory-accounting on: default pool size = {pool_mb} MB");
}

// Print parallelism info for debugging CI performance
eprintln!(
"Running with {} test threads (available parallelism: {})",
Expand Down Expand Up @@ -209,7 +227,7 @@ async fn run_tests() -> Result<()> {
let currently_running_sql_tracker_clone =
currently_running_sql_tracker.clone();
let file_start = Instant::now();
SpawnedTask::spawn(async move {
let body = async move {
let result = match (
options.postgres_runner,
options.complete,
Expand Down Expand Up @@ -282,9 +300,41 @@ async fn run_tests() -> Result<()> {
}

(result, elapsed)
})
.join()
.map(move |result| {
};
// Each file gets its own multi-thread runtime so a stable per-file
// context-id (stamped via `on_thread_start`) is readable from the
// global allocator hook. Bank accounting and SET-driven limit
// retuning will key off this id in later steps. The outer
// orchestration runtime hosts this via `spawn_blocking` so its
// worker threads aren't blocked by the per-file `block_on`.
//
// Worker count matches `SLT_TARGET_PARTITIONS` so a query's
// partition streams each get a worker rather than contending.
#[cfg(feature = "memory-accounting")]
let spawned = {
let context_id = datafusion_sqllogictest::next_context_id();
SpawnedTask::spawn_blocking(move || {
// Stamp this thread too — `block_on` polls `body` here, so
// statements that don't suspend (e.g. `SET memory_limit`,
// pool construction) run on this thread, not a worker.
datafusion_sqllogictest::set_thread_context_id(context_id);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(datafusion_sqllogictest::SLT_TARGET_PARTITIONS)
.thread_name(format!("slt-file-{context_id}"))
.on_thread_start(move || {
datafusion_sqllogictest::set_thread_context_id(context_id);
})
.build()
.expect("build per-file Tokio runtime");
let out = runtime.block_on(body);
runtime.shutdown_background();
out
})
};
#[cfg(not(feature = "memory-accounting"))]
let spawned = SpawnedTask::spawn(body);
spawned.join().map(move |result| {
let elapsed = match &result {
Ok((_, elapsed)) => *elapsed,
Err(_) => Duration::ZERO,
Expand Down Expand Up @@ -910,6 +960,19 @@ struct Options {
default_value_t = ColorChoice::Auto
)]
color: ColorChoice,

#[clap(
long,
help = "Default MemoryPool size in MB for each per-file SLT context. \
The pool is wrapped in AccountingMemoryPool, which doubles \
this value as the 'no SET has happened yet' sentinel — until \
an SLT calls `SET datafusion.runtime.memory_limit`, SHOW ALL \
renders the limit as 'unlimited' and the allocator bank \
stays loose. Once a test SETs a limit, the bank tightens to \
that limit + 10% headroom. Requires the memory-accounting \
feature; ignored without it."
)]
default_pool_size_mb: Option<usize>,
}

impl Options {
Expand Down
Loading
Loading