From 21e0d278214e921a7c07152f170b507f419c471c Mon Sep 17 00:00:00 2001 From: peterxcli Date: Thu, 25 Jun 2026 23:03:16 +0800 Subject: [PATCH 1/5] Fix ArrayInsert evaluation for null source arrays --- .../src/array_funcs/array_insert.rs | 52 ++++++++++++------- .../expressions/array/array_insert.sql | 15 ++++++ .../expressions/array/array_prepend.sql | 33 ++++++++++++ 3 files changed, 82 insertions(+), 18 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql diff --git a/native/spark-expr/src/array_funcs/array_insert.rs b/native/spark-expr/src/array_funcs/array_insert.rs index e638c440fd..b03085be02 100644 --- a/native/spark-expr/src/array_funcs/array_insert.rs +++ b/native/spark-expr/src/array_funcs/array_insert.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{make_array, Array, ArrayRef, GenericListArray, Int32Array, OffsetSizeTrait}; +use arrow::array::{ + make_array, Array, ArrayRef, BooleanArray, GenericListArray, Int32Array, OffsetSizeTrait, +}; use arrow::datatypes::{DataType, Schema}; use arrow::{ array::{as_primitive_array, Capacities, MutableArrayData}, @@ -104,13 +106,32 @@ impl PhysicalExpr for ArrayInsert { } fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { + // Spark evaluates arguments left-to-right: + // 1. src + // 2. pos only when src is non-null + // 3. item only when src and pos are non-null + let src_value = self + .src_array_expr + .evaluate(batch)? + .into_array(batch.num_rows())?; + + let src_element_type = match self.array_type(src_value.data_type())? { + DataType::List(field) | DataType::LargeList(field) => field.data_type().clone(), + _ => unreachable!(), + }; + + // Do not evaluate pos for rows whose source array is NULL. + let evaluate_pos = BooleanArray::from( + (0..batch.num_rows()) + .map(|row| src_value.is_valid(row)) + .collect::>(), + ); + let pos_value = self .pos_expr - .evaluate(batch)? + .evaluate_selection(batch, &evaluate_pos)? .into_array(batch.num_rows())?; - // Spark supports only IntegerType (Int32): - // https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4737 if !matches!(pos_value.data_type(), DataType::Int32) { return Err(DataFusionError::Internal(format!( "Unexpected index data type in ArrayInsert: {:?}, expected type is Int32", @@ -118,24 +139,19 @@ impl PhysicalExpr for ArrayInsert { ))); } - // Check that src array is actually an array and get it's value type - let src_value = self - .src_array_expr - .evaluate(batch)? - .into_array(batch.num_rows())?; + // Do not evaluate item for rows whose source array or position is NULL. + let evaluate_item = BooleanArray::from( + (0..batch.num_rows()) + .map(|row| src_value.is_valid(row) && pos_value.is_valid(row)) + .collect::>(), + ); - let src_element_type = match self.array_type(src_value.data_type())? { - DataType::List(field) => &field.data_type().clone(), - DataType::LargeList(field) => &field.data_type().clone(), - _ => unreachable!(), - }; - - // Check that inserted value has the same type as an array let item_value = self .item_expr - .evaluate(batch)? + .evaluate_selection(batch, &evaluate_item)? .into_array(batch.num_rows())?; - if item_value.data_type() != src_element_type { + + if item_value.data_type() != &src_element_type { return Err(DataFusionError::Internal(format!( "Type mismatch in ArrayInsert: array type is {:?} but item type is {:?}", src_element_type, diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql b/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql index 06294bc318..db43f43fe2 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql @@ -39,6 +39,21 @@ INSERT INTO test_array_insert VALUES query SELECT array_insert(arr, pos, val) FROM test_array_insert +statement +CREATE TABLE test_array_insert_short_circuit( + arr ARRAY, + idx INT +) USING parquet + +statement +INSERT INTO test_array_insert_short_circuit VALUES + (NULL, 0), + (array(1), 1) + +query +SELECT array_insert(arr, element_at(array(1), idx), 9) +FROM test_array_insert_short_circuit + -- ============================================================ -- Literal arguments (all-literal queries test native eval -- because CometSqlFileTestSuite disables constant folding) diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql b/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql new file mode 100644 index 0000000000..caee1c79b4 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +statement +CREATE TABLE test_array_prepend( + arr ARRAY, + idx INT +) USING parquet + +statement +INSERT INTO test_array_prepend VALUES + (NULL, 0), + (array(1), 1) + +query +SELECT array_prepend(arr, element_at(array(9), idx)) +FROM test_array_prepend From bc6feabc1294c9706b37e46708ac82a29db73b62 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 26 Jun 2026 00:27:02 +0800 Subject: [PATCH 2/5] fix test --- .../resources/sql-tests/expressions/array/array_prepend.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql b/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql index ca0b878371..e0e25bf5d9 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql @@ -137,5 +137,5 @@ query SELECT array_prepend(arr, val) FROM test_array_prepend_bin query -SELECT array_prepend(arr, element_at(array(9), idx)) +SELECT array_prepend(arr, element_at(array(9), val)) FROM test_array_prepend From 5799588bfdac549bc878f3277c3fc7677a483792 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Fri, 26 Jun 2026 00:35:55 +0800 Subject: [PATCH 3/5] fix test --- .../resources/sql-tests/expressions/array/array_prepend.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql b/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql index e0e25bf5d9..e8b93d580f 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/array_prepend.sql @@ -25,7 +25,7 @@ statement CREATE TABLE test_array_prepend(arr array, val int) USING parquet statement -INSERT INTO test_array_prepend VALUES (array(1, 2, 3), 4), (array(), 1), (NULL, 1), (array(1, 2), NULL) +INSERT INTO test_array_prepend VALUES (array(1, 2, 3), 4), (array(), 1), (NULL, 1), (NULL, 0), (array(1, 2), NULL) -- column + column query From f53e9061e0e49d91058d312f38fc4e0e75cf98d4 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 29 Jun 2026 01:53:18 +0800 Subject: [PATCH 4/5] Add array_insert short-circuit SQL tests --- .../expressions/array/array_insert.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql b/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql index db43f43fe2..d80cc16e1e 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/array_insert.sql @@ -50,10 +50,28 @@ INSERT INTO test_array_insert_short_circuit VALUES (NULL, 0), (array(1), 1) +-- null source array short-circuits position evaluation query SELECT array_insert(arr, element_at(array(1), idx), 9) FROM test_array_insert_short_circuit +statement +CREATE TABLE test_array_insert_null_pos_short_circuit( + arr ARRAY, + idx INT +) USING parquet + +statement +INSERT INTO test_array_insert_null_pos_short_circuit VALUES + (array(1), 0), + (array(1), 1), + (array(1), 2) + +-- null position short-circuits item evaluation +query +SELECT array_insert(arr, CAST(NULL AS INT), element_at(array(1), idx)) +FROM test_array_insert_null_pos_short_circuit + -- ============================================================ -- Literal arguments (all-literal queries test native eval -- because CometSqlFileTestSuite disables constant folding) From 97f621d8d465663a0df8d4a403acd6f3d158d8cd Mon Sep 17 00:00:00 2001 From: peterxcli Date: Mon, 29 Jun 2026 04:48:34 +0800 Subject: [PATCH 5/5] minor --- docs/source/user-guide/latest/installation.md | 1 + native/spark-expr/src/array_funcs/array_insert.rs | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md index 442133facc..3bd1ec27ee 100644 --- a/docs/source/user-guide/latest/installation.md +++ b/docs/source/user-guide/latest/installation.md @@ -92,6 +92,7 @@ Here are the direct links for downloading the Comet $COMET_VERSION jar file. - [Comet plugin for Spark 3.5 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.13/$COMET_VERSION/comet-spark-spark3.5_2.13-$COMET_VERSION.jar) - [Comet plugin for Spark 4.0 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark4.0_2.13/$COMET_VERSION/comet-spark-spark4.0_2.13-$COMET_VERSION.jar) - [Comet plugin for Spark 4.1 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark4.1_2.13/$COMET_VERSION/comet-spark-spark4.1_2.13-$COMET_VERSION.jar) + ## Building from source diff --git a/native/spark-expr/src/array_funcs/array_insert.rs b/native/spark-expr/src/array_funcs/array_insert.rs index b03085be02..e056c108e0 100644 --- a/native/spark-expr/src/array_funcs/array_insert.rs +++ b/native/spark-expr/src/array_funcs/array_insert.rs @@ -110,17 +110,19 @@ impl PhysicalExpr for ArrayInsert { // 1. src // 2. pos only when src is non-null // 3. item only when src and pos are non-null + + // Check that src array is actually an array and get it's value type let src_value = self .src_array_expr .evaluate(batch)? .into_array(batch.num_rows())?; let src_element_type = match self.array_type(src_value.data_type())? { - DataType::List(field) | DataType::LargeList(field) => field.data_type().clone(), + DataType::List(field) => &field.data_type().clone(), + DataType::LargeList(field) => &field.data_type().clone(), _ => unreachable!(), }; - // Do not evaluate pos for rows whose source array is NULL. let evaluate_pos = BooleanArray::from( (0..batch.num_rows()) .map(|row| src_value.is_valid(row)) @@ -132,6 +134,8 @@ impl PhysicalExpr for ArrayInsert { .evaluate_selection(batch, &evaluate_pos)? .into_array(batch.num_rows())?; + // Spark supports only IntegerType (Int32): + // https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4737 if !matches!(pos_value.data_type(), DataType::Int32) { return Err(DataFusionError::Internal(format!( "Unexpected index data type in ArrayInsert: {:?}, expected type is Int32", @@ -139,19 +143,18 @@ impl PhysicalExpr for ArrayInsert { ))); } - // Do not evaluate item for rows whose source array or position is NULL. let evaluate_item = BooleanArray::from( (0..batch.num_rows()) .map(|row| src_value.is_valid(row) && pos_value.is_valid(row)) .collect::>(), ); + // Check that inserted value has the same type as an array let item_value = self .item_expr .evaluate_selection(batch, &evaluate_item)? .into_array(batch.num_rows())?; - - if item_value.data_type() != &src_element_type { + if item_value.data_type() != src_element_type { return Err(DataFusionError::Internal(format!( "Type mismatch in ArrayInsert: array type is {:?} but item type is {:?}", src_element_type,