Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Here are the direct links for downloading the Comet $COMET_VERSION jar file.
- [Comet plugin for Spark 3.5 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.13/$COMET_VERSION/comet-spark-spark3.5_2.13-$COMET_VERSION.jar)
- [Comet plugin for Spark 4.0 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark4.0_2.13/$COMET_VERSION/comet-spark-spark4.0_2.13-$COMET_VERSION.jar)
- [Comet plugin for Spark 4.1 / Scala 2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark4.1_2.13/$COMET_VERSION/comet-spark-spark4.1_2.13-$COMET_VERSION.jar)

<!-- ENDIF -->

## Building from source
Expand Down
47 changes: 33 additions & 14 deletions native/spark-expr/src/array_funcs/array_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{make_array, Array, ArrayRef, GenericListArray, Int32Array, OffsetSizeTrait};
use arrow::array::{
make_array, Array, ArrayRef, BooleanArray, GenericListArray, Int32Array, OffsetSizeTrait,
};
use arrow::datatypes::{DataType, Schema};
use arrow::{
array::{as_primitive_array, Capacities, MutableArrayData},
Expand Down Expand Up @@ -104,9 +106,32 @@ impl PhysicalExpr for ArrayInsert {
}

fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
// Spark evaluates arguments left-to-right:
// 1. src
// 2. pos only when src is non-null
// 3. item only when src and pos are non-null

// Check that src array is actually an array and get it's value type
let src_value = self
.src_array_expr
.evaluate(batch)?
.into_array(batch.num_rows())?;

let src_element_type = match self.array_type(src_value.data_type())? {
DataType::List(field) => &field.data_type().clone(),
DataType::LargeList(field) => &field.data_type().clone(),
_ => unreachable!(),
};

let evaluate_pos = BooleanArray::from(
(0..batch.num_rows())
.map(|row| src_value.is_valid(row))
.collect::<Vec<_>>(),
);

let pos_value = self
.pos_expr
.evaluate(batch)?
.evaluate_selection(batch, &evaluate_pos)?
.into_array(batch.num_rows())?;

// Spark supports only IntegerType (Int32):
Expand All @@ -118,22 +143,16 @@ impl PhysicalExpr for ArrayInsert {
)));
}

// Check that src array is actually an array and get it's value type
let src_value = self
.src_array_expr
.evaluate(batch)?
.into_array(batch.num_rows())?;

let src_element_type = match self.array_type(src_value.data_type())? {
DataType::List(field) => &field.data_type().clone(),
DataType::LargeList(field) => &field.data_type().clone(),
_ => unreachable!(),
};
let evaluate_item = BooleanArray::from(
(0..batch.num_rows())
.map(|row| src_value.is_valid(row) && pos_value.is_valid(row))
.collect::<Vec<_>>(),
);

// Check that inserted value has the same type as an array
let item_value = self
.item_expr
.evaluate(batch)?
.evaluate_selection(batch, &evaluate_item)?
.into_array(batch.num_rows())?;
if item_value.data_type() != src_element_type {
return Err(DataFusionError::Internal(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,39 @@ INSERT INTO test_array_insert VALUES
query
SELECT array_insert(arr, pos, val) FROM test_array_insert

statement
CREATE TABLE test_array_insert_short_circuit(
arr ARRAY<INT>,
idx INT
) USING parquet

statement
INSERT INTO test_array_insert_short_circuit VALUES
(NULL, 0),
(array(1), 1)

-- null source array short-circuits position evaluation
query
SELECT array_insert(arr, element_at(array(1), idx), 9)
FROM test_array_insert_short_circuit

statement
CREATE TABLE test_array_insert_null_pos_short_circuit(
arr ARRAY<INT>,
idx INT
) USING parquet

statement
INSERT INTO test_array_insert_null_pos_short_circuit VALUES
(array(1), 0),
(array(1), 1),
(array(1), 2)

-- null position short-circuits item evaluation
query
SELECT array_insert(arr, CAST(NULL AS INT), element_at(array(1), idx))
FROM test_array_insert_null_pos_short_circuit

-- ============================================================
-- Literal arguments (all-literal queries test native eval
-- because CometSqlFileTestSuite disables constant folding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ statement
CREATE TABLE test_array_prepend(arr array<int>, val int) USING parquet

statement
INSERT INTO test_array_prepend VALUES (array(1, 2, 3), 4), (array(), 1), (NULL, 1), (array(1, 2), NULL)
INSERT INTO test_array_prepend VALUES (array(1, 2, 3), 4), (array(), 1), (NULL, 1), (NULL, 0), (array(1, 2), NULL)

-- column + column
query
Expand Down Expand Up @@ -135,3 +135,7 @@ INSERT INTO test_array_prepend_bin VALUES (array(X'0102', X'0304'), X'0506'), (a

query
SELECT array_prepend(arr, val) FROM test_array_prepend_bin

query
SELECT array_prepend(arr, element_at(array(9), val))
FROM test_array_prepend
Loading