From c4ad44237c7efe013f60e6d8e396cbfb83027e0a Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 5 Jun 2026 11:34:25 +0800 Subject: [PATCH 1/3] chore(shuffle): add interleave_time metric and specify buffer size for output_data buffer writer --- native/shuffle/src/bin/shuffle_bench.rs | 3 +++ native/shuffle/src/metrics.rs | 5 +++++ native/shuffle/src/partitioners/multi_partition.rs | 7 +++++-- .../src/partitioners/partitioned_batch_iterator.rs | 12 +++++++----- native/shuffle/src/writers/spill.rs | 4 ++-- 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs index bb8c2a0380..cc1c960848 100644 --- a/native/shuffle/src/bin/shuffle_bench.rs +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -277,6 +277,9 @@ fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) { if let Some(nanos) = get_metric("repart_time") { println!(" repart time: {}", fmt_time(nanos)); } + if let Some(nanos) = get_metric("interleave_time") { + println!(" interleave time: {}", fmt_time(nanos)); + } if let Some(nanos) = get_metric("encode_time") { println!(" encode time: {}", fmt_time(nanos)); } diff --git a/native/shuffle/src/metrics.rs b/native/shuffle/src/metrics.rs index 1de751cf41..389fbcff79 100644 --- a/native/shuffle/src/metrics.rs +++ b/native/shuffle/src/metrics.rs @@ -27,6 +27,9 @@ pub(crate) struct ShufflePartitionerMetrics { /// Time to perform repartitioning pub(crate) repart_time: Time, + /// Time spent in `interleave_record_batch` gathering shuffled batches + pub(crate) interleave_time: Time, + /// Time encoding batches to IPC format pub(crate) encode_time: Time, @@ -51,6 +54,8 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + interleave_time: MetricBuilder::new(metrics) + .subset_time("interleave_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 7de9314f54..aa44cad4d5 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -434,10 +434,12 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } + #[allow(clippy::too_many_arguments)] fn shuffle_write_partition( partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, + interleave_time: &Time, encode_time: &Time, write_time: &Time, write_buffer_size: usize, @@ -449,7 +451,7 @@ impl MultiPartitionShuffleRepartitioner { write_buffer_size, batch_size, ); - for batch in partition_iter { + while let Some(batch) = partition_iter.next(interleave_time) { let batch = batch?; buf_batch_writer.write(&batch, encode_time, write_time)?; } @@ -573,7 +575,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { .open(data_file) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut output_data = BufWriter::new(output_data); + let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data); #[allow(clippy::needless_range_loop)] for i in 0..num_output_partitions { @@ -596,6 +598,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut partition_iter, &mut self.shuffle_block_writer, &mut output_data, + &self.metrics.interleave_time, &self.metrics.encode_time, &self.metrics.write_time, self.write_buffer_size, diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 8309a8ed4a..49528fc750 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -18,6 +18,7 @@ use arrow::array::RecordBatch; use arrow::compute::interleave_record_batch; use datafusion::common::DataFusionError; +use datafusion::physical_plan::metrics::Time; /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the @@ -85,18 +86,19 @@ impl<'a> PartitionedBatchIterator<'a> { pos: 0, } } -} - -impl Iterator for PartitionedBatchIterator<'_> { - type Item = datafusion::common::Result; - fn next(&mut self) -> Option { + /// Returns the next shuffled batch, recording the gather cost into `interleave_time`. + pub(crate) fn next( + &mut self, + interleave_time: &Time, + ) -> Option> { if self.pos >= self.indices.len() { return None; } let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; + let _timer = interleave_time.timer(); match interleave_record_batch(&self.record_batches, indices) { Ok(batch) => { self.pos = indices_end; diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/spill.rs index c16caddbf9..624a45befe 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/spill.rs @@ -83,7 +83,7 @@ impl PartitionWriter { write_buffer_size: usize, batch_size: usize, ) -> datafusion::common::Result { - if let Some(batch) = iter.next() { + if let Some(batch) = iter.next(&metrics.interleave_time) { self.ensure_spill_file_created(runtime)?; let total_bytes_written = { @@ -95,7 +95,7 @@ impl PartitionWriter { ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - for batch in iter { + while let Some(batch) = iter.next(&metrics.interleave_time) { let batch = batch?; bytes_written += buf_batch_writer.write( &batch, From f803fbff1aaf84d7fb146f1c5749f1d75bc336ef Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 5 Jun 2026 14:49:42 +0800 Subject: [PATCH 2/3] fmt --- native/shuffle/src/metrics.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native/shuffle/src/metrics.rs b/native/shuffle/src/metrics.rs index 389fbcff79..bda245fd93 100644 --- a/native/shuffle/src/metrics.rs +++ b/native/shuffle/src/metrics.rs @@ -54,8 +54,7 @@ impl ShufflePartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), - interleave_time: MetricBuilder::new(metrics) - .subset_time("interleave_time", partition), + interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), From 8bad3604306a8ad7d105d51bb547d8b7282af055 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 5 Jun 2026 21:44:25 +0800 Subject: [PATCH 3/3] address comment --- .../shuffle/src/partitioners/partitioned_batch_iterator.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 49528fc750..c7f1781866 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -98,8 +98,10 @@ impl<'a> PartitionedBatchIterator<'a> { let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - let _timer = interleave_time.timer(); - match interleave_record_batch(&self.record_batches, indices) { + let mut timer = interleave_time.timer(); + let result = interleave_record_batch(&self.record_batches, indices); + timer.stop(); + match result { Ok(batch) => { self.pos = indices_end; Some(Ok(batch))