Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions native/shuffle/src/bin/shuffle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) {
if let Some(nanos) = get_metric("repart_time") {
println!(" repart time: {}", fmt_time(nanos));
}
if let Some(nanos) = get_metric("interleave_time") {
println!(" interleave time: {}", fmt_time(nanos));
}
if let Some(nanos) = get_metric("encode_time") {
println!(" encode time: {}", fmt_time(nanos));
}
Expand Down
4 changes: 4 additions & 0 deletions native/shuffle/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub(crate) struct ShufflePartitionerMetrics {
/// Time to perform repartitioning
pub(crate) repart_time: Time,

/// Time spent in `interleave_record_batch` gathering shuffled batches
pub(crate) interleave_time: Time,

/// Time encoding batches to IPC format
pub(crate) encode_time: Time,

Expand All @@ -51,6 +54,7 @@ impl ShufflePartitionerMetrics {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
Expand Down
7 changes: 5 additions & 2 deletions native/shuffle/src/partitioners/multi_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,12 @@ impl MultiPartitionShuffleRepartitioner {
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn shuffle_write_partition(
partition_iter: &mut PartitionedBatchIterator,
shuffle_block_writer: &mut ShuffleBlockWriter,
output_data: &mut BufWriter<File>,
interleave_time: &Time,
encode_time: &Time,
write_time: &Time,
write_buffer_size: usize,
Expand All @@ -449,7 +451,7 @@ impl MultiPartitionShuffleRepartitioner {
write_buffer_size,
batch_size,
);
for batch in partition_iter {
while let Some(batch) = partition_iter.next(interleave_time) {
let batch = batch?;
buf_batch_writer.write(&batch, encode_time, write_time)?;
}
Expand Down Expand Up @@ -573,7 +575,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
.open(data_file)
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;

let mut output_data = BufWriter::new(output_data);
let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data);

#[allow(clippy::needless_range_loop)]
for i in 0..num_output_partitions {
Expand All @@ -596,6 +598,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
&mut partition_iter,
&mut self.shuffle_block_writer,
&mut output_data,
&self.metrics.interleave_time,
&self.metrics.encode_time,
&self.metrics.write_time,
self.write_buffer_size,
Expand Down
12 changes: 7 additions & 5 deletions native/shuffle/src/partitioners/partitioned_batch_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::RecordBatch;
use arrow::compute::interleave_record_batch;
use datafusion::common::DataFusionError;
use datafusion::physical_plan::metrics::Time;

/// A helper struct to produce shuffled batches.
/// This struct takes ownership of the buffered batches and partition indices from the
Expand Down Expand Up @@ -85,18 +86,19 @@ impl<'a> PartitionedBatchIterator<'a> {
pos: 0,
}
}
}

impl Iterator for PartitionedBatchIterator<'_> {
type Item = datafusion::common::Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
/// Returns the next shuffled batch, recording the gather cost into `interleave_time`.
pub(crate) fn next(
&mut self,
interleave_time: &Time,
) -> Option<datafusion::common::Result<RecordBatch>> {
Comment thread
wForget marked this conversation as resolved.
if self.pos >= self.indices.len() {
return None;
}

let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len());
let indices = &self.indices[self.pos..indices_end];
let _timer = interleave_time.timer();
match interleave_record_batch(&self.record_batches, indices) {
Ok(batch) => {
Comment thread
wForget marked this conversation as resolved.
self.pos = indices_end;
Expand Down
4 changes: 2 additions & 2 deletions native/shuffle/src/writers/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl PartitionWriter {
write_buffer_size: usize,
batch_size: usize,
) -> datafusion::common::Result<usize> {
if let Some(batch) = iter.next() {
if let Some(batch) = iter.next(&metrics.interleave_time) {
self.ensure_spill_file_created(runtime)?;

let total_bytes_written = {
Expand All @@ -95,7 +95,7 @@ impl PartitionWriter {
);
let mut bytes_written =
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
for batch in iter {
while let Some(batch) = iter.next(&metrics.interleave_time) {
let batch = batch?;
bytes_written += buf_batch_writer.write(
&batch,
Expand Down
Loading