Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,8 @@ fn estimate_inner_join_cardinality(
..
} = right_stats;

// The algorithm here is partly based on the non-histogram selectivity estimation
// from Spark's Catalyst optimizer.
// Follow Spark Catalyst's conservative NDV join estimate: for multi-key
// joins, use the most selective key instead of multiplying all key denominators.
let mut join_selectivity = Precision::Absent;
for (left_stat, right_stat) in left_column_statistics
.iter()
Expand All @@ -654,7 +654,11 @@ fn estimate_inner_join_cardinality(
// Seems like there are a few implementations of this algorithm that implement
// exponential decay for the selectivity (like Hive's Optiq Optimizer). Needs
// further exploration.
join_selectivity = max_distinct;
join_selectivity = if join_selectivity.get_value().is_some() {
join_selectivity.max(&max_distinct)
} else {
max_distinct
};
}
}

Expand Down Expand Up @@ -2730,6 +2734,68 @@ mod tests {
Ok(())
}

#[test]
fn test_join_cardinality_key_order() -> Result<()> {
// Reversing join key order should not change estimated cardinality
let left_col_stats = vec![
create_column_stats(Inexact(0), Inexact(100), Inexact(100), Absent),
create_column_stats(Inexact(0), Inexact(500), Inexact(500), Absent),
create_column_stats(Inexact(1000), Inexact(10000), Absent, Absent),
];

let right_col_stats = vec![
create_column_stats(Inexact(0), Inexact(100), Inexact(50), Absent),
create_column_stats(Inexact(0), Inexact(2000), Inexact(2500), Absent),
create_column_stats(Inexact(0), Inexact(100), Absent, Absent),
];

let join_on_ab = vec![
(
Arc::new(Column::new("a", 0)) as _,
Arc::new(Column::new("c", 0)) as _,
),
(
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("d", 1)) as _,
),
];
let join_on_ba = vec![
(
Arc::new(Column::new("b", 1)) as _,
Arc::new(Column::new("d", 1)) as _,
),
(
Arc::new(Column::new("a", 0)) as _,
Arc::new(Column::new("c", 0)) as _,
),
];

let stats_ab = estimate_join_cardinality(
&JoinType::Inner,
create_stats(Some(1000), left_col_stats.clone(), false),
create_stats(Some(2000), right_col_stats.clone(), false),
&join_on_ab,
)
.unwrap();
let stats_ba = estimate_join_cardinality(
&JoinType::Inner,
create_stats(Some(1000), left_col_stats.clone(), false),
create_stats(Some(2000), right_col_stats.clone(), false),
&join_on_ba,
)
.unwrap();

assert_eq!(stats_ab.num_rows, 1000);
assert_eq!(stats_ba.num_rows, stats_ab.num_rows);
assert_eq!(stats_ba.column_statistics, stats_ab.column_statistics);
assert_eq!(
stats_ab.column_statistics,
[left_col_stats, right_col_stats].concat()
);

Ok(())
}

#[test]
fn test_join_cardinality_when_one_column_is_disjoint() -> Result<()> {
// Left table (rows=1000)
Expand Down
Loading