-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: add DataFrame fill_nan #22702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: add DataFrame fill_nan #22702
Changes from 5 commits
3b9c184
be9e1d4
6fe477a
28b1e72
b5eaa14
b70c962
eb79a11
cb13967
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,13 +58,14 @@ use datafusion_common::{ | |
| }; | ||
| use datafusion_expr::select_expr::SelectExpr; | ||
| use datafusion_expr::{ | ||
| ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, | ||
| ExplainOption, ScalarUDF, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case, | ||
| dml::InsertOp, | ||
| expr::{Alias, ScalarFunction}, | ||
| is_null, lit, | ||
| utils::COUNT_STAR_EXPANSION, | ||
| }; | ||
| use datafusion_functions::core::coalesce; | ||
| use datafusion_functions::math::nanvl; | ||
| use datafusion_functions_aggregate::expr_fn::{ | ||
| avg, count, max, median, min, stddev, sum, | ||
| }; | ||
|
|
@@ -2471,6 +2472,68 @@ impl DataFrame { | |
| &self, | ||
| value: ScalarValue, | ||
| columns: Vec<String>, | ||
| ) -> Result<DataFrame> { | ||
| self.fill_columns(value, &columns, coalesce(), |_| true) | ||
| } | ||
|
|
||
| // Helper to find columns from names | ||
| fn find_columns(&self, names: &[String]) -> Result<Vec<FieldRef>> { | ||
| let schema = self.logical_plan().schema(); | ||
| names | ||
| .iter() | ||
| .map(|name| { | ||
| schema | ||
| .field_with_name(None, name) | ||
| .cloned() | ||
| .map_err(|_| plan_datafusion_err!("Column '{}' not found", name)) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Fill NaN values in specified columns with a given value | ||
| /// If no columns are specified (empty vector), applies to all columns | ||
| /// Only fills if the value can be cast to the column's type | ||
| /// | ||
| /// # Arguments | ||
| /// * `value` - Value to fill NaNs with | ||
| /// * `columns` - List of column names to fill. If empty, fills all columns. | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// # use datafusion::prelude::*; | ||
| /// # use datafusion::error::Result; | ||
| /// # use datafusion_common::ScalarValue; | ||
| /// # #[tokio::main] | ||
| /// # async fn main() -> Result<()> { | ||
| /// let ctx = SessionContext::new(); | ||
| /// let df = ctx | ||
| /// .read_csv("tests/data/example.csv", CsvReadOptions::new()) | ||
| /// .await?; | ||
| /// // Fill NaN in only columns "a" and "c": | ||
| /// let df = df.fill_nan(ScalarValue::from(0.0), vec!["a".to_owned(), "c".to_owned()])?; | ||
| /// // Fill NaN across all columns: | ||
| /// let df = df.fill_nan(ScalarValue::from(0.0), vec![])?; | ||
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| #[expect(clippy::needless_pass_by_value)] | ||
| pub fn fill_nan( | ||
| &self, | ||
| value: ScalarValue, | ||
| columns: Vec<String>, | ||
| ) -> Result<DataFrame> { | ||
| self.fill_columns(value, &columns, nanvl(), |field| { | ||
| field.data_type().is_floating() | ||
| }) | ||
| } | ||
|
|
||
| #[expect(clippy::needless_pass_by_value)] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider fixing this lint since it's a private function now
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the review. I moved the #[expect] to the public method. If we keep the call and consistent with fill_null, there'll always be one #[expect] either here or on the helper. |
||
| fn fill_columns( | ||
| &self, | ||
| value: ScalarValue, | ||
| columns: &[String], | ||
| func: Arc<ScalarUDF>, | ||
| applies: impl Fn(&FieldRef) -> bool, | ||
| ) -> Result<DataFrame> { | ||
| let cols = if columns.is_empty() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks review. Done in 6fe477a. |
||
| self.logical_plan() | ||
|
|
@@ -2480,22 +2543,21 @@ impl DataFrame { | |
| .map(Arc::clone) | ||
| .collect() | ||
| } else { | ||
| self.find_columns(&columns)? | ||
| self.find_columns(columns)? | ||
| }; | ||
|
|
||
| // Create projections for each column | ||
| let projections = self | ||
| .logical_plan() | ||
| .schema() | ||
| .fields() | ||
| .iter() | ||
| .map(|field| { | ||
| if cols.contains(field) { | ||
| if cols.contains(field) && applies(field) { | ||
| // Try to cast fill value to column type. If the cast fails, fallback to the original column. | ||
| match value.clone().cast_to(field.data_type()) { | ||
| Ok(fill_value) => Expr::Alias(Alias { | ||
| expr: Box::new(Expr::ScalarFunction(ScalarFunction { | ||
| func: coalesce(), | ||
| func: Arc::clone(&func), | ||
| args: vec![col(field.name()), lit(fill_value)], | ||
| })), | ||
| relation: None, | ||
|
|
@@ -2513,20 +2575,6 @@ impl DataFrame { | |
| self.clone().select(projections) | ||
|
|
||
| } | ||
|
|
||
| // Helper to find columns from names | ||
| fn find_columns(&self, names: &[String]) -> Result<Vec<FieldRef>> { | ||
| let schema = self.logical_plan().schema(); | ||
| names | ||
| .iter() | ||
| .map(|name| { | ||
| schema | ||
| .field_with_name(None, name) | ||
| .cloned() | ||
| .map_err(|_| plan_datafusion_err!("Column '{}' not found", name)) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Find qualified columns for this dataframe from names | ||
| /// | ||
| /// # Arguments | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6539,6 +6539,181 @@ async fn test_fill_null_all_columns() -> Result<()> { | |
| Ok(()) | ||
| } | ||
|
|
||
| async fn create_nan_table() -> Result<DataFrame> { | ||
|
Nagato-Yuzuru marked this conversation as resolved.
|
||
| // create a DataFrame with a NaN value in a float column "a" and a | ||
| // non-float column "b" that must stay untouched by fill_nan. | ||
| // "+-----+---+", | ||
| // "| a | b |", | ||
| // "+-----+---+", | ||
| // "| 1.0 | 1 |", | ||
| // "| NaN | 2 |", | ||
| // "| 3.0 | 3 |", | ||
| // "+-----+---+", | ||
| let schema = Arc::new(Schema::new(vec![ | ||
| Field::new("a", DataType::Float64, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| ])); | ||
| let a_values = Float64Array::from(vec![Some(1.0), Some(f64::NAN), Some(3.0)]); | ||
| let b_values = Int32Array::from(vec![Some(1), Some(2), Some(3)]); | ||
| let batch = RecordBatch::try_new( | ||
| schema.clone(), | ||
| vec![Arc::new(a_values), Arc::new(b_values)], | ||
| )?; | ||
|
|
||
| let ctx = SessionContext::new(); | ||
| let table = MemTable::try_new(schema.clone(), vec![vec![batch]])?; | ||
| ctx.register_table("t_nan", Arc::new(table))?; | ||
| let df = ctx.table("t_nan").await?; | ||
| Ok(df) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_fill_nan() -> Result<()> { | ||
| let df = create_nan_table().await?; | ||
|
|
||
| // Fill NaNs in the float column "a" with 0.0. | ||
| let df_filled = | ||
| df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["a".to_string()])?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice test coverage around the happy path and cast failure case. One thing that seems worth locking in is the documented behavior that replacements are applied when the provided value can be cast to the target float type. Right now the positive tests use an already-Float64 value, while the negative test covers an uncastable value. Would it make sense to add a positive cast case as well, for example
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in b5eaa14. |
||
|
|
||
| let results = df_filled.collect().await?; | ||
| assert_snapshot!( | ||
| batches_to_sort_string(&results), | ||
| @r" | ||
| +-----+---+ | ||
| | a | b | | ||
| +-----+---+ | ||
| | 0.0 | 2 | | ||
| | 1.0 | 1 | | ||
| | 3.0 | 3 | | ||
| +-----+---+ | ||
| " | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_fill_nan_all_columns() -> Result<()> { | ||
| let df = create_nan_table().await?; | ||
|
|
||
| // Fill NaNs across all columns. Only the float column "a" is affected; | ||
| // the non-float column "b" is left unchanged since NaN only exists for | ||
| // floating-point types. | ||
| let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), vec![])?; | ||
|
|
||
| let results = df_filled.collect().await?; | ||
| assert_snapshot!( | ||
| batches_to_sort_string(&results), | ||
| @r" | ||
| +-----+---+ | ||
| | a | b | | ||
| +-----+---+ | ||
| | 0.0 | 2 | | ||
| | 1.0 | 1 | | ||
| | 3.0 | 3 | | ||
| +-----+---+ | ||
| " | ||
| ); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_fill_nan_non_float_column() -> Result<()> { | ||
| let df = create_nan_table().await?; | ||
|
|
||
| // Explicitly naming a non-float column is a no-op, not an error: NaN does | ||
| // not exist for Int32, so column "b" (and the un-targeted "a") are unchanged. | ||
| let df_filled = | ||
| df.fill_nan(ScalarValue::Float64(Some(0.0)), vec!["b".to_string()])?; | ||
|
|
||
| let results = df_filled.collect().await?; | ||
| assert_snapshot!( | ||
| batches_to_sort_string(&results), | ||
| @r" | ||
| +-----+---+ | ||
| | a | b | | ||
| +-----+---+ | ||
| | 1.0 | 1 | | ||
| | 3.0 | 3 | | ||
| | NaN | 2 | | ||
| +-----+---+ | ||
| " | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_fill_nan_unknown_column() -> Result<()> { | ||
| let df = create_nan_table().await?; | ||
|
|
||
| // A column name that is not in the schema is propagated as an error. | ||
| let err = df | ||
| .fill_nan( | ||
| ScalarValue::Float64(Some(0.0)), | ||
| vec!["does_not_exist".to_string()], | ||
| ) | ||
| .unwrap_err(); | ||
|
|
||
| assert_snapshot!(err.strip_backtrace(), @"Error during planning: Column 'does_not_exist' not found"); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_fill_nan_casts_fill_value() -> Result<()> { | ||
| let df = create_nan_table().await?; | ||
|
|
||
| // Int32(0) is not the column's type (Float64) but can be cast to it, so the | ||
| // NaN is replaced with 0.0. Exercises the cross-type cast path — the other | ||
| // positive tests pass a Float64 value, which skips the actual cast. | ||
| let df_filled = df.fill_nan(ScalarValue::Int32(Some(0)), vec!["a".to_string()])?; | ||
|
|
||
| let results = df_filled.collect().await?; | ||
| assert_snapshot!( | ||
| batches_to_sort_string(&results), | ||
| @r" | ||
| +-----+---+ | ||
| | a | b | | ||
| +-----+---+ | ||
| | 0.0 | 2 | | ||
| | 1.0 | 1 | | ||
| | 3.0 | 3 | | ||
| +-----+---+ | ||
| " | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_fill_nan_uncastable_value() -> Result<()> { | ||
| let df = create_nan_table().await?; | ||
|
|
||
| // The float column "a" is targeted, but "abc" cannot be cast to Float64, so | ||
| // the fill is skipped and column "a" keeps its original NaN value. | ||
| let df_filled = df.fill_nan( | ||
| ScalarValue::Utf8(Some("abc".to_string())), | ||
| vec!["a".to_string()], | ||
| )?; | ||
|
|
||
| let results = df_filled.collect().await?; | ||
| assert_snapshot!( | ||
| batches_to_sort_string(&results), | ||
| @r" | ||
| +-----+---+ | ||
| | a | b | | ||
| +-----+---+ | ||
| | 1.0 | 1 | | ||
| | 3.0 | 3 | | ||
| | NaN | 2 | | ||
| +-----+---+ | ||
| " | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
|
Nagato-Yuzuru marked this conversation as resolved.
|
||
| async fn test_insert_into_casting_support() -> Result<()> { | ||
| // Testing case1: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we switch columns to something more like
&[&str]]or even&[impl Into<Column>], like the other functions?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks review. I modeled
fill_nanon the existing fill_null, which also takes Vec, so changing justfill_nanwould leave the two siblings inconsistent.I agree with aligning better, but changing fill_null's signature is a breaking(e.g.
df.fill_null(val, vec!["a".to_string()])). So maybe we not bundle it into this PR? Could keep fill_nan matching fill_null here and migrate both in a follow-up?Also unnest_columns uses
&[&str]while drop_columns uses&[impl Into<Column>]. Do we have any preference which to standardize on?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer trying to get the API right from the start, since as you point out changing them later can be a breaking change (technically we can avoid if we do the followup within the same release window)
Requiring a
Vec<String>is a bit unwieldy, which I think is what the clippy lint is trying to tell us?I think it would be good to try get
&[impl Into<Column>]to work if possible since that allows&[&str]to work as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll update fill_nan to
&[impl Into<Column>]For
fill_null, it's been public. So aligning its signature is a genuine breaking. I'll open a separate PR for it with the api-change label and an upgrade-guide note. Happy to aim for the same release.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I use
&[&str]instead of&[impl Into<Column>].The blocker was the "all columns" case with the generic signature an empty slice can't infer its type:
Making callers annotate &[] for the common case didn't seem worth it just to accept Into. &[&str] it is both cases stay clean:
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this works, thanks