diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java index 830377e19f6c..eff7652be1ae 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java @@ -19,7 +19,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.RecordField; import java.util.Collections; import java.util.LinkedHashMap; @@ -70,15 +69,17 @@ public void setField(final String fieldName, final Object fieldValue) { } /** - * Get Field value for specified position from supporting Record + * Get Field value for specified position from supporting Record. The position refers to the Iceberg Table struct, + * so the field is resolved by the Iceberg column name to align incoming Record fields with Table columns + * regardless of the incoming Record field ordering. Columns not present in the incoming Record return null. * - * @param position Field position + * @param position Field position in the Iceberg Table struct * @return Field value or null when not found */ @Override public Object get(final int position) { - final RecordField recordField = record.getSchema().getField(position); - return record.getValue(recordField); + final Types.NestedField field = struct.fields().get(position); + return record.getValue(field.name()); } /** @@ -133,16 +134,18 @@ public T get(final int position, final Class valueClass) { } /** - * Set Field value for specified position + * Set Field value for specified position. The position refers to the Iceberg Table struct, so the field is resolved + * by the Iceberg column name to remain symmetric with {@link #get(int)} regardless of the incoming Record field + * ordering. * - * @param position Field position + * @param position Field position in the Iceberg Table struct * @param value Field value * @param Field Value Type */ @Override public void set(final int position, final T value) { - final RecordField recordField = record.getSchema().getField(position); - record.setValue(recordField, value); + final Types.NestedField field = struct.fields().get(position); + record.setValue(field.name(), value); } @Override diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java index 7e43a5c44489..e676bde33148 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java @@ -25,6 +25,7 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -36,6 +37,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; class DelegatedRecordTest { @@ -82,7 +84,9 @@ void testSetGetStringField() { final Record record = new MapRecord(recordSchema, values); - final Types.StructType structType = Types.StructType.of(); + final Types.StructType structType = Types.StructType.of( + Types.NestedField.optional(1, LABEL_FIELD, Types.StringType.get()) + ); final DelegatedRecord delegatedRecord = new DelegatedRecord(record, structType); final Types.StructType recordStruct = delegatedRecord.struct(); @@ -135,4 +139,88 @@ void testGetTimestampDateTimeFields() { final Object stopped = delegatedRecord.getField(STOPPED_FIELD); assertEquals(STOPPED_CONVERTED, stopped); } + + /** + * Iceberg writers read values positionally against the table struct, so position 0 must always return the value of + * the table's first column ("id"), independent of the field ordering in the incoming Record schema. + */ + @Test + void testGetByPositionMatchesTableColumnNameRegardlessOfInputOrder() { + final Types.StructType structType = Types.StructType.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)), + Types.NestedField.optional(3, "label", Types.StringType.get()) + ); + + final RecordSchema recordSchema = new SimpleRecordSchema(List.of( + new RecordField("amount", RecordFieldType.DECIMAL.getDataType()), + new RecordField("label", RecordFieldType.STRING.getDataType()), + new RecordField("id", RecordFieldType.INT.getDataType()) + )); + final Map values = new LinkedHashMap<>(); + values.put("amount", new BigDecimal("12.34")); + values.put("label", "example"); + values.put("id", 7); + final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType); + + assertEquals(7, delegatedRecord.get(0)); + assertEquals(new BigDecimal("12.34"), delegatedRecord.get(1)); + assertEquals("example", delegatedRecord.get(2)); + } + + /** + * When the incoming Record does not contain a column present in the table schema, positional access must return + * null for that column rather than shifting subsequent input values into it. + */ + @Test + void testGetByPositionReturnsNullForColumnMissingFromInput() { + final Types.StructType structType = Types.StructType.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)), + Types.NestedField.optional(3, "label", Types.StringType.get()) + ); + + final RecordSchema recordSchema = new SimpleRecordSchema(List.of( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("label", RecordFieldType.STRING.getDataType()) + )); + final Map values = new LinkedHashMap<>(); + values.put("id", 42); + values.put("label", "present"); + final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType); + + assertEquals(42, delegatedRecord.get(0)); + assertNull(delegatedRecord.get(1), "Missing 'amount' column must be null, not shifted input data"); + assertEquals("present", delegatedRecord.get(2)); + } + + /** + * Positional set must resolve the target field by the Iceberg table column name for the given position, independent + * of the incoming Record field ordering, so that set(position) is symmetric with get(position). + */ + @Test + void testSetByPositionMatchesTableColumnNameRegardlessOfInputOrder() { + final Types.StructType structType = Types.StructType.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)), + Types.NestedField.optional(3, "label", Types.StringType.get()) + ); + + final RecordSchema recordSchema = new SimpleRecordSchema(List.of( + new RecordField("amount", RecordFieldType.DECIMAL.getDataType()), + new RecordField("label", RecordFieldType.STRING.getDataType()), + new RecordField("id", RecordFieldType.INT.getDataType()) + )); + final Map values = new LinkedHashMap<>(); + values.put("amount", new BigDecimal("12.34")); + values.put("label", "example"); + values.put("id", 7); + final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType); + + delegatedRecord.set(0, 99); + + assertEquals(99, delegatedRecord.getField("id")); + assertEquals(new BigDecimal("12.34"), delegatedRecord.getField("amount")); + assertEquals("example", delegatedRecord.getField("label")); + } }