Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 67 additions & 31 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ use datafusion_common::{
};
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{
ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case,
dml::InsertOp,
expr::{Alias, ScalarFunction},
is_null, lit,
utils::COUNT_STAR_EXPANSION,
ExplainOption, ScalarUDF, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case,
dml::InsertOp, is_null, lit, utils::COUNT_STAR_EXPANSION,
};
use datafusion_functions::core::coalesce;
use datafusion_functions::math::nanvl;
use datafusion_functions_aggregate::expr_fn::{
avg, count, max, median, min, stddev, sum,
};
Expand Down Expand Up @@ -2471,6 +2469,65 @@ impl DataFrame {
&self,
value: ScalarValue,
columns: Vec<String>,
) -> Result<DataFrame> {
self.fill_columns(value, &columns, coalesce(), |_| true)
}

// Helper to find columns from names
fn find_columns(&self, names: &[impl AsRef<str>]) -> Result<Vec<FieldRef>> {
let schema = self.logical_plan().schema();
names
.iter()
.map(|name| {
let name = name.as_ref();
schema
.field_with_name(None, name)
.cloned()
.map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
})
.collect()
}

/// Fill NaN values in specified floating-point columns with a given value
/// If no columns are specified (empty slice), applies to all columns
/// Only floating-point columns are affected; other columns are left unchanged
/// Only fills if the value can be cast to the column's type
///
/// # Arguments
/// * `value` - Value to fill NaNs with
/// * `columns` - List of column names to fill. If empty, fills all columns.
///
/// # Example
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use datafusion_common::ScalarValue;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx
/// .read_csv("tests/data/example.csv", CsvReadOptions::new())
/// .await?;
/// // Fill NaN in only columns "a" and "c":
/// let df = df.fill_nan(ScalarValue::from(0.0), &["a", "c"])?;
/// // Fill NaN across all columns:
/// let df = df.fill_nan(ScalarValue::from(0.0), &[])?;
/// # Ok(())
/// # }
/// ```
pub fn fill_nan(&self, value: ScalarValue, columns: &[&str]) -> Result<DataFrame> {
self.fill_columns(value, columns, nanvl(), |field| {
field.data_type().is_floating()
})
}

#[expect(clippy::needless_pass_by_value)]
fn fill_columns(
&self,
value: ScalarValue,
columns: &[impl AsRef<str>],
func: Arc<ScalarUDF>,
applies: impl Fn(&FieldRef) -> bool,
) -> Result<DataFrame> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks review. I modeled fill_nan on the existing fill_null, which also takes Vec, so changing just fill_nan would leave the two siblings inconsistent.

I agree with aligning better, but changing fill_null's signature is a breaking(e.g. df.fill_null(val, vec!["a".to_string()])). So maybe we not bundle it into this PR? Could keep fill_nan matching fill_null here and migrate both in a follow-up?

Also unnest_columns uses &[&str] while drop_columns uses &[impl Into<Column>]. Do we have any preference which to standardize on?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer trying to get the API right from the start, since as you point out changing them later can be a breaking change (technically we can avoid if we do the followup within the same release window)

Requiring a Vec<String> is a bit unwieldy, which I think is what the clippy lint is trying to tell us?

I think it would be good to try get &[impl Into<Column>] to work if possible since that allows &[&str] to work as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll update fill_nan to &[impl Into<Column>]

For fill_null, it's been public. So aligning its signature is a genuine breaking. I'll open a separate PR for it with the api-change label and an upgrade-guide note. Happy to aim for the same release.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I use &[&str] instead of &[impl Into<Column>].

The blocker was the "all columns" case with the generic signature an empty slice can't infer its type:

df.fill_nan(val, &[])?;            // doesn't compile: can't infer T
df.fill_nan(val, &[] as &[Column])?;  // works but ugly

Making callers annotate &[] for the common case didn't seem worth it just to accept Into. &[&str] it is both cases stay clean:

df.fill_nan(val, &["a", "c"])?;
df.fill_nan(val, &[])?;

wdyt?

let cols = if columns.is_empty() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed fill_nan and fill_null share quite a bit of structure around column selection, cast handling, alias construction, and the fallback behavior when casting is not possible. It might be worth extracting a small private helper that takes the scalar function and applicability check. That could keep the no-op-on-cast-failure behavior defined in one place and make it a little easier to keep the two APIs in sync over time.

Copy link
Copy Markdown
Author

@Nagato-Yuzuru Nagato-Yuzuru Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks review. Done in 6fe477a.

self.logical_plan()
Expand All @@ -2480,28 +2537,21 @@ impl DataFrame {
.map(Arc::clone)
.collect()
} else {
self.find_columns(&columns)?
self.find_columns(columns)?
};

// Create projections for each column
let projections = self
.logical_plan()
.schema()
.fields()
.iter()
.map(|field| {
if cols.contains(field) {
if cols.contains(field) && applies(field) {
// Try to cast fill value to column type. If the cast fails, fallback to the original column.
match value.clone().cast_to(field.data_type()) {
Ok(fill_value) => Expr::Alias(Alias {
expr: Box::new(Expr::ScalarFunction(ScalarFunction {
func: coalesce(),
args: vec![col(field.name()), lit(fill_value)],
})),
relation: None,
name: field.name().to_string(),
metadata: None,
}),
Ok(fill_value) => func
.call(vec![col(field.name()), lit(fill_value)])
.alias(field.name()),
Err(_) => col(field.name()),
}
} else {
Expand All @@ -2513,20 +2563,6 @@ impl DataFrame {
self.clone().select(projections)
}

// Helper to find columns from names
fn find_columns(&self, names: &[String]) -> Result<Vec<FieldRef>> {
let schema = self.logical_plan().schema();
names
.iter()
.map(|name| {
schema
.field_with_name(None, name)
.cloned()
.map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
})
.collect()
}

/// Find qualified columns for this dataframe from names
///
/// # Arguments
Expand Down
167 changes: 167 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6539,6 +6539,173 @@ async fn test_fill_null_all_columns() -> Result<()> {
Ok(())
}

async fn create_nan_table() -> Result<DataFrame> {
Comment thread
Nagato-Yuzuru marked this conversation as resolved.
// create a DataFrame with a NaN value in a float column "a" and a
// non-float column "b" that must stay untouched by fill_nan.
// "+-----+---+",
// "| a | b |",
// "+-----+---+",
// "| 1.0 | 1 |",
// "| NaN | 2 |",
// "| 3.0 | 3 |",
// "+-----+---+",
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Int32, true),
]));
let a_values = Float64Array::from(vec![Some(1.0), Some(f64::NAN), Some(3.0)]);
let b_values = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(a_values), Arc::new(b_values)],
)?;

let ctx = SessionContext::new();
let table = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
ctx.register_table("t_nan", Arc::new(table))?;
let df = ctx.table("t_nan").await?;
Ok(df)
}

#[tokio::test]
async fn test_fill_nan() -> Result<()> {
let df = create_nan_table().await?;

// Fill NaNs in the float column "a" with 0.0.
let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), &["a"])?;

let results = df_filled.collect().await?;
assert_snapshot!(
batches_to_sort_string(&results),
@r"
+-----+---+
| a | b |
+-----+---+
| 0.0 | 2 |
| 1.0 | 1 |
| 3.0 | 3 |
+-----+---+
"
);

Ok(())
}

#[tokio::test]
async fn test_fill_nan_all_columns() -> Result<()> {
let df = create_nan_table().await?;

// Fill NaNs across all columns. Only the float column "a" is affected;
// the non-float column "b" is left unchanged since NaN only exists for
// floating-point types.
let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), &[])?;

let results = df_filled.collect().await?;
assert_snapshot!(
batches_to_sort_string(&results),
@r"
+-----+---+
| a | b |
+-----+---+
| 0.0 | 2 |
| 1.0 | 1 |
| 3.0 | 3 |
+-----+---+
"
);
Ok(())
}

#[tokio::test]
async fn test_fill_nan_non_float_column() -> Result<()> {
let df = create_nan_table().await?;

// Explicitly naming a non-float column is a no-op, not an error: NaN does
// not exist for Int32, so column "b" (and the un-targeted "a") are unchanged.
let df_filled = df.fill_nan(ScalarValue::Float64(Some(0.0)), &["b"])?;

let results = df_filled.collect().await?;
assert_snapshot!(
batches_to_sort_string(&results),
@r"
+-----+---+
| a | b |
+-----+---+
| 1.0 | 1 |
| 3.0 | 3 |
| NaN | 2 |
+-----+---+
"
);

Ok(())
}

#[tokio::test]
async fn test_fill_nan_unknown_column() -> Result<()> {
let df = create_nan_table().await?;

// A column name that is not in the schema is propagated as an error.
let err = df
.fill_nan(ScalarValue::Float64(Some(0.0)), &["does_not_exist"])
.unwrap_err();

assert_snapshot!(err.strip_backtrace(), @"Error during planning: Column 'does_not_exist' not found");

Ok(())
}

#[tokio::test]
async fn test_fill_nan_casts_fill_value() -> Result<()> {
let df = create_nan_table().await?;

// Int32(0) is not the column's type (Float64) but can be cast to it, so the
// NaN is replaced with 0.0. Exercises the cross-type cast path — the other
// positive tests pass a Float64 value, which skips the actual cast.
let df_filled = df.fill_nan(ScalarValue::Int32(Some(0)), &["a"])?;

let results = df_filled.collect().await?;
assert_snapshot!(
batches_to_sort_string(&results),
@r"
+-----+---+
| a | b |
+-----+---+
| 0.0 | 2 |
| 1.0 | 1 |
| 3.0 | 3 |
+-----+---+
"
);

Ok(())
}

#[tokio::test]
async fn test_fill_nan_uncastable_value() -> Result<()> {
let df = create_nan_table().await?;

// The float column "a" is targeted, but "abc" cannot be cast to Float64, so
// the fill is skipped and column "a" keeps its original NaN value.
let df_filled = df.fill_nan(ScalarValue::Utf8(Some("abc".to_string())), &["a"])?;

let results = df_filled.collect().await?;
assert_snapshot!(
batches_to_sort_string(&results),
@r"
+-----+---+
| a | b |
+-----+---+
| 1.0 | 1 |
| 3.0 | 3 |
| NaN | 2 |
+-----+---+
"
);

Ok(())
}

#[tokio::test]
Comment thread
Nagato-Yuzuru marked this conversation as resolved.
async fn test_insert_into_casting_support() -> Result<()> {
// Testing case1:
Expand Down
2 changes: 1 addition & 1 deletion parquet-testing
Submodule parquet-testing updated 302 files