Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;

import java.util.Collections;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -70,15 +69,17 @@ public void setField(final String fieldName, final Object fieldValue) {
}

/**
* Get Field value for specified position from supporting Record
* Get Field value for specified position from supporting Record. The position refers to the Iceberg Table struct,
* so the field is resolved by the Iceberg column name to align incoming Record fields with Table columns
* regardless of the incoming Record field ordering. Columns not present in the incoming Record return null.
*
* @param position Field position
* @param position Field position in the Iceberg Table struct
* @return Field value or null when not found
*/
@Override
public Object get(final int position) {
final RecordField recordField = record.getSchema().getField(position);
return record.getValue(recordField);
final Types.NestedField field = struct.fields().get(position);
return record.getValue(field.name());
}

/**
Expand Down Expand Up @@ -133,16 +134,18 @@ public <T> T get(final int position, final Class<T> valueClass) {
}

/**
* Set Field value for specified position
* Set Field value for specified position. The position refers to the Iceberg Table struct, so the field is resolved
* by the Iceberg column name to remain symmetric with {@link #get(int)} regardless of the incoming Record field
* ordering.
*
* @param position Field position
* @param position Field position in the Iceberg Table struct
* @param value Field value
* @param <T> Field Value Type
*/
@Override
public <T> void set(final int position, final T value) {
final RecordField recordField = record.getSchema().getField(position);
record.setValue(recordField, value);
final Types.NestedField field = struct.fields().get(position);
record.setValue(field.name(), value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
Expand All @@ -36,6 +37,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

class DelegatedRecordTest {

Expand Down Expand Up @@ -82,7 +84,9 @@ void testSetGetStringField() {

final Record record = new MapRecord(recordSchema, values);

final Types.StructType structType = Types.StructType.of();
final Types.StructType structType = Types.StructType.of(
Types.NestedField.optional(1, LABEL_FIELD, Types.StringType.get())
);
final DelegatedRecord delegatedRecord = new DelegatedRecord(record, structType);

final Types.StructType recordStruct = delegatedRecord.struct();
Expand Down Expand Up @@ -135,4 +139,88 @@ void testGetTimestampDateTimeFields() {
final Object stopped = delegatedRecord.getField(STOPPED_FIELD);
assertEquals(STOPPED_CONVERTED, stopped);
}

/**
* Iceberg writers read values positionally against the table struct, so position 0 must always return the value of
* the table's first column ("id"), independent of the field ordering in the incoming Record schema.
*/
@Test
void testGetByPositionMatchesTableColumnNameRegardlessOfInputOrder() {
final Types.StructType structType = Types.StructType.of(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(3, "label", Types.StringType.get())
);

final RecordSchema recordSchema = new SimpleRecordSchema(List.of(
new RecordField("amount", RecordFieldType.DECIMAL.getDataType()),
new RecordField("label", RecordFieldType.STRING.getDataType()),
new RecordField("id", RecordFieldType.INT.getDataType())
));
final Map<String, Object> values = new LinkedHashMap<>();
values.put("amount", new BigDecimal("12.34"));
values.put("label", "example");
values.put("id", 7);
final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType);

assertEquals(7, delegatedRecord.get(0));
assertEquals(new BigDecimal("12.34"), delegatedRecord.get(1));
assertEquals("example", delegatedRecord.get(2));
}

/**
* When the incoming Record does not contain a column present in the table schema, positional access must return
* null for that column rather than shifting subsequent input values into it.
*/
@Test
void testGetByPositionReturnsNullForColumnMissingFromInput() {
final Types.StructType structType = Types.StructType.of(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(3, "label", Types.StringType.get())
);

final RecordSchema recordSchema = new SimpleRecordSchema(List.of(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("label", RecordFieldType.STRING.getDataType())
));
final Map<String, Object> values = new LinkedHashMap<>();
values.put("id", 42);
values.put("label", "present");
final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType);

assertEquals(42, delegatedRecord.get(0));
assertNull(delegatedRecord.get(1), "Missing 'amount' column must be null, not shifted input data");
assertEquals("present", delegatedRecord.get(2));
}

/**
* Positional set must resolve the target field by the Iceberg table column name for the given position, independent
* of the incoming Record field ordering, so that set(position) is symmetric with get(position).
*/
@Test
void testSetByPositionMatchesTableColumnNameRegardlessOfInputOrder() {
final Types.StructType structType = Types.StructType.of(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "amount", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(3, "label", Types.StringType.get())
);

final RecordSchema recordSchema = new SimpleRecordSchema(List.of(
new RecordField("amount", RecordFieldType.DECIMAL.getDataType()),
new RecordField("label", RecordFieldType.STRING.getDataType()),
new RecordField("id", RecordFieldType.INT.getDataType())
));
final Map<String, Object> values = new LinkedHashMap<>();
values.put("amount", new BigDecimal("12.34"));
values.put("label", "example");
values.put("id", 7);
final DelegatedRecord delegatedRecord = new DelegatedRecord(new MapRecord(recordSchema, values), structType);

delegatedRecord.set(0, 99);

assertEquals(99, delegatedRecord.getField("id"));
assertEquals(new BigDecimal("12.34"), delegatedRecord.getField("amount"));
assertEquals("example", delegatedRecord.getField("label"));
}
}
Loading