Skip to content
Open
4 changes: 2 additions & 2 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ The tables below list every Spark built-in expression with its current status.
| Function | Status | Notes |
| --- | --- | --- |
| `array_size` | ✅ | |
| `cardinality` | ✅ | MapType input falls back |
| `cardinality` | ✅ | |
| `concat` | ✅ | Binary/array children fall back |
| `reverse` | ✅ | Binary-element arrays fall back (Incompatible) ([details](compatibility/expressions/array.md)) |
| `size` | ✅ | MapType input falls back |
| `size` | ✅ | |

---

Expand Down
160 changes: 110 additions & 50 deletions native/spark-expr/src/array_funcs/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ fn spark_size_scalar(scalar: &ScalarValue) -> Result<ScalarValue, DataFusionErro
Ok(ScalarValue::Int32(Some(len)))
}
}
ScalarValue::Map(array) => {
if array.is_null(0) {
Ok(ScalarValue::Int32(Some(-1)))
} else {
let len = array.value_length(0);
Ok(ScalarValue::Int32(Some(len)))
Comment thread
marvelshan marked this conversation as resolved.
}
}
ScalarValue::Null => {
Ok(ScalarValue::Int32(Some(-1))) // Spark behavior: return -1 for null
}
Expand Down Expand Up @@ -276,78 +284,130 @@ mod tests {
assert_eq!(result, ScalarValue::Int32(Some(-1)));
}

// TODO: Add map array test once Arrow MapArray API constraints are resolved
// Currently MapArray doesn't allow nulls in entries which makes testing complex
// The core size() implementation supports maps correctly
#[ignore]
#[test]
fn test_spark_size_map_array() {
use arrow::array::{MapArray, StringArray};

// Create a simpler test with maps:
// [{"key1": "value1", "key2": "value2"}, {"key3": "value3"}, {}, null]
use arrow::array::{Int32Array, MapArray, StringArray};

// Create keys array for all entries (no nulls)
let keys = StringArray::from(vec!["key1", "key2", "key3"]);
let keys = StringArray::from(vec![Some("key1"), Some("key2"), Some("key3")]);
let values = Int32Array::from(vec![Some(1), Some(2), Some(3)]);

// Create values array for all entries (no nulls)
let values = StringArray::from(vec!["value1", "value2", "value3"]);

// Create entry offsets: [0, 2, 3, 3] representing:
// - Map 1: entries 0-1 (2 key-value pairs)
// - Map 2: entries 2-2 (1 key-value pair)
// - Map 3: entries 3-2 (0 key-value pairs, empty map)
// - Map 4: null (handled by null buffer)
let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0, 2, 3, 3, 3].into());
let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 2, 3, 3, 3].into());

let key_field = Arc::new(Field::new("key", DataType::Utf8, false));
let value_field = Arc::new(Field::new("value", DataType::Utf8, false)); // Make values non-nullable too
let value_field = Arc::new(Field::new("value", DataType::Int32, true));

// Create the entries struct array
let entries = arrow::array::StructArray::new(
arrow::datatypes::Fields::from(vec![key_field, value_field]),
vec![Arc::new(keys), Arc::new(values)],
None, // No nulls in the entries struct array itself
None,
);

// Create null buffer for the map array (fourth map is null)
let mut null_buffer = NullBufferBuilder::new(4);
null_buffer.append(true); // Map with 2 entries - not null
null_buffer.append(true); // Map with 1 entry - not null
null_buffer.append(true); // Empty map - not null
null_buffer.append(false); // null map

let map_data_type = DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(arrow::datatypes::Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false), // Make values non-nullable too
])),
false,
)),
false, // keys are not sorted
);

let map_field = Arc::new(Field::new("map", map_data_type, true));

let map_array = MapArray::new(
null_buffer.append(true);
null_buffer.append(true);
null_buffer.append(true);
null_buffer.append(false);

let map_field = Arc::new(Field::new(
"entries",
DataType::Struct(arrow::datatypes::Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, true),
])),
false,
));

let map_array = MapArray::try_new(
map_field,
entry_offsets,
entries,
null_buffer.finish(),
false, // keys are not sorted
);
false,
)
.unwrap();

let array_ref: ArrayRef = Arc::new(map_array);
let result = spark_size_array(&array_ref).unwrap();
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();

// Expected: [2, 1, 0, -1]
assert_eq!(result.value(0), 2); // Map with 2 key-value pairs
assert_eq!(result.value(1), 1); // Map with 1 key-value pair
assert_eq!(result.value(2), 0); // empty map has 0 pairs
assert_eq!(result.value(3), -1); // null map returns -1
assert_eq!(result.value(0), 2);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 0);
assert_eq!(result.value(3), -1);
}

#[test]
fn test_spark_size_scalar_map() {
use arrow::array::{Int32Array, MapArray, StringArray};

let keys = StringArray::from(vec![Some("a"), Some("b")]);
let values = Int32Array::from(vec![Some(1), Some(2)]);
let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 2].into());

let key_field = Arc::new(Field::new("key", DataType::Utf8, false));
let value_field = Arc::new(Field::new("value", DataType::Int32, true));

let entries = arrow::array::StructArray::new(
arrow::datatypes::Fields::from(vec![key_field, value_field]),
vec![Arc::new(keys), Arc::new(values)],
None,
);

let map_field = Arc::new(Field::new(
"entries",
DataType::Struct(arrow::datatypes::Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, true),
])),
false,
));

let map_array = MapArray::try_new(map_field, entry_offsets, entries, None, false).unwrap();
let scalar = ScalarValue::Map(Arc::new(map_array));
let result = spark_size_scalar(&scalar).unwrap();
assert_eq!(result, ScalarValue::Int32(Some(2)));
}

#[test]
fn test_spark_size_scalar_null_map() {
use arrow::array::{Int32Array, MapArray, StringArray};

let keys = StringArray::from(vec![Some("a")]);
let values = Int32Array::from(vec![Some(1)]);
let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 1].into());

let key_field = Arc::new(Field::new("key", DataType::Utf8, false));
let value_field = Arc::new(Field::new("value", DataType::Int32, true));

let entries = arrow::array::StructArray::new(
arrow::datatypes::Fields::from(vec![key_field, value_field]),
vec![Arc::new(keys), Arc::new(values)],
None,
);

let map_field = Arc::new(Field::new(
"entries",
DataType::Struct(arrow::datatypes::Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int32, true),
])),
false,
));

let mut null_buffer = NullBufferBuilder::new(1);
null_buffer.append(false);

let map_array = MapArray::try_new(
map_field,
entry_offsets,
entries,
null_buffer.finish(),
false,
)
.unwrap();
let scalar = ScalarValue::Map(Arc::new(map_array));
let result = spark_size_scalar(&scalar).unwrap();
assert_eq!(result, ScalarValue::Int32(Some(-1)));
}

#[test]
Expand Down
6 changes: 1 addition & 5 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -667,15 +667,11 @@ object CometArrayFilter extends CometExpressionSerde[ArrayFilter] {

object CometSize extends CometExpressionSerde[Size] {

override def getUnsupportedReasons(): Seq[String] = Seq(
"Only supports `ArrayType` input; `MapType` input is not supported")

override def getSupportLevel(expr: Size): SupportLevel = {
expr.child.dataType match {
case _: ArrayType => Compatible()
case _: MapType => Unsupported(Some("size does not support map inputs"))
case _: MapType => Compatible()
case other =>
// this should be unreachable because Spark only supports map and array inputs
Unsupported(Some(s"Unsupported child data type: $other"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ INSERT INTO test_posexplode_map VALUES
(1, map('a', 1, 'b', 2)),
(2, map('c', 3))

-- posexplode over a map falls back to Spark (Comet only supports array inputs)
query expect_fallback(size does not support map inputs)
-- posexplode over a map falls back to Spark (Comet only supports array inputs, not maps)
query expect_fallback(Comet only supports explode/explode_outer for arrays, not maps)
SELECT id, posexplode(m) FROM test_posexplode_map
16 changes: 14 additions & 2 deletions spark/src/test/resources/sql-tests/expressions/array/size.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,21 @@ CREATE TABLE test_size(arr array<int>, m map<string, int>) USING parquet
statement
INSERT INTO test_size VALUES (array(1, 2, 3), map('a', 1, 'b', 2)), (array(), map()), (NULL, NULL)

query spark_answer_only
query
SELECT size(arr), size(m) FROM test_size

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add literal map cases below the existing literal-args query? Something like:

query
SELECT size(map('a', 1, 'b', 2)), size(map()), size(cast(NULL as map<string,int>))

That way the literal path is covered for both shapes. While you're here, a cardinality(m) query (Spark registers it as an alias for size) and a query with spark.sql.legacy.sizeOfNull=false would lock down the alias and the non-legacy null branch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think literal map is not supported yet

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — all three items added:

  1. Literal map cases: size(map(...)) queries added (0b3c451). Those going through CreateMap use spark_answer_only since Comet has no CreateMap serde yet; cast(NULL as map) uses default mode as CometLiteral supports null MapType.
  2. Cardinality alias: SELECT cardinality(arr), cardinality(m) FROM test_size added (0b3c451) — Spark registers it as Size in FunctionRegistry, so it goes through CometSize automatically.
  3. sizeOfNull=false: New size_legacy_off.sql with -- Config: spark.sql.legacy.sizeOfNull=false covering column-based, literal null, and cardinality queries under non-legacy semantics (0b3c451).

ScalarValue::Map unit tests also added per review (ff01bbc), and stale docs notes cleared (b1d177a).

Should I file a follow-up issue for CreateMap/MapType literal support so the literal map queries can be promoted to native execution later?


-- literal arguments
-- literal array arguments
query
SELECT size(array(1, 2, 3)), size(array()), size(cast(NULL as array<int>))

-- literal map via CreateMap (falls back: Comet has no CreateMap serde;
-- cast(NULL as map) avoids CreateMap and goes through CometLiteral instead)
query spark_answer_only
SELECT size(map('a', 1, 'b', 2)), size(map())

query
SELECT size(cast(NULL as map<string,int>))

-- cardinality is a SQL alias for size
query
SELECT cardinality(arr), cardinality(m) FROM test_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.sql.legacy.sizeOfNull=false

statement
CREATE TABLE test_size_legacy_off(arr array<int>, m map<string, int>) USING parquet

statement
INSERT INTO test_size_legacy_off VALUES (array(1, 2, 3), map('a', 1, 'b', 2)), (array(), map()), (NULL, NULL)

-- With sizeOfNull=false, size(NULL) returns NULL instead of -1
query
SELECT size(arr), size(m) FROM test_size_legacy_off

query
SELECT size(cast(NULL as array<int>)), size(cast(NULL as map<string,int>))

query
SELECT cardinality(arr), cardinality(m) FROM test_size_legacy_off
53 changes: 27 additions & 26 deletions spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,34 +126,35 @@ class CometMapExpressionSuite extends CometTestBase {
}
}

test("fallback for size with map input") {
withTempDir { dir =>
withTempView("t1") {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 100)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")

// Use column references in maps to avoid constant folding
checkSparkAnswerAndFallbackReason(
sql("SELECT size(case when _2 < 0 then map(_8, _9) else map() end) from t1"),
"size does not support map inputs")
test("size with map input") {
withTempPath { dir =>
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = spark
.range(100)
.select(
when(col("id") > 1, map(col("id"), when(col("id") > 2, col("id"))))
.alias("map1"),
when(col("id") > 5, map(lit("a"), col("id"), lit("b"), col("id") + 1))
.alias("map2"))
df.write.parquet(dir.toString())
}
}
}

// fails with "map is not supported"
ignore("size with map input") {
withTempDir { dir =>
withTempView("t1") {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 100)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")

// Use column references in maps to avoid constant folding
checkSparkAnswerAndOperator(
sql("SELECT size(map(_8, _9, _10, _11)) from t1 where _8 is not null"))
checkSparkAnswerAndOperator(
sql("SELECT size(case when _2 < 0 then map(_8, _9) else map() end) from t1"))
Seq("", "parquet").foreach { v1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
val df = spark.read.parquet(dir.toString())
df.createOrReplaceTempView("t1")
if (v1List.isEmpty) {
checkSparkAnswer(df.select(size(col("map1"))))
checkSparkAnswer(df.select(size(col("map2"))))
checkSparkAnswer(
sql("SELECT size(CASE WHEN id < 50 THEN map1 ELSE map2 END) FROM t1"))
} else {
checkSparkAnswerAndOperator(df.select(size(col("map1"))))
checkSparkAnswerAndOperator(df.select(size(col("map2"))))
checkSparkAnswerAndOperator(
sql("SELECT size(CASE WHEN id < 50 THEN map1 ELSE map2 END) FROM t1"))
}
}
}
}
}
Expand Down