diff --git a/src/promptflow-devkit/promptflow/_sdk/operations/_local_storage_operations.py b/src/promptflow-devkit/promptflow/_sdk/operations/_local_storage_operations.py index a3437de5348..4a7b31dc678 100644 --- a/src/promptflow-devkit/promptflow/_sdk/operations/_local_storage_operations.py +++ b/src/promptflow-devkit/promptflow/_sdk/operations/_local_storage_operations.py @@ -515,6 +515,14 @@ def load_inputs_and_outputs(self) -> Tuple["DataFrame", "DataFrame"]: outputs = self._outputs_padding(outputs, inputs[LINE_NUMBER].tolist()) outputs.fillna(value="(Failed)", inplace=True) # replace nan with explicit prompt outputs = outputs.set_index(LINE_NUMBER) + # Sort inputs by line_number so their row order matches the sorted + # outputs index. Without this, get_details() merges inputs and + # outputs positionally and rows become misaligned whenever the + # async executor wrote results in a different order than the input + # data was originally listed. + # See https://github.com/microsoft/promptflow/issues/2646 + if LINE_NUMBER in inputs.columns: + inputs = inputs.sort_values(LINE_NUMBER, ascending=True).reset_index(drop=True) return inputs, outputs def _collect_io_from_debug_info(self) -> Tuple["DataFrame", "DataFrame"]: diff --git a/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_local_storage_operations.py b/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_local_storage_operations.py index 78e07b4516e..bb06bec8bba 100644 --- a/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_local_storage_operations.py +++ b/src/promptflow-devkit/tests/sdk_cli_test/unittests/test_local_storage_operations.py @@ -34,3 +34,62 @@ def test_outputs_padding(self) -> None: assert df_with_padding.iloc[0].to_dict() == {LINE_NUMBER: 1, "col": "a"} assert df_with_padding.iloc[1].to_dict() == {LINE_NUMBER: 2, "col": "b"} assert df_with_padding.iloc[2].to_dict() == {LINE_NUMBER: 4, "col": ""} + + def test_load_inputs_and_outputs_sorts_inputs_by_line_number( + self, tmp_path + ) -> None: + """Inputs returned by load_inputs_and_outputs must be sorted by line_number. + + Regression test for https://github.com/microsoft/promptflow/issues/2646 + + When the async executor writes results in a different order than the + input dataset, the raw inputs JSON file can have rows in a non-sequential + order (e.g. [2, 0, 1]). _outputs_padding() already sorts outputs by + line_number, but inputs were left unsorted. get_details() merges the + two DataFrames positionally, so mismatched orderings caused input rows + to be paired with the wrong output rows. + """ + import json + + # Write inputs file with rows in out-of-order line_number sequence + inputs_data = [ + {LINE_NUMBER: 2, "query": "third"}, + {LINE_NUMBER: 0, "query": "first"}, + {LINE_NUMBER: 1, "query": "second"}, + ] + inputs_path = tmp_path / "inputs.jsonl" + with open(inputs_path, "w") as f: + for row in inputs_data: + f.write(json.dumps(row) + "\n") + + # Write outputs file with rows in sorted line_number sequence + outputs_data = [ + {LINE_NUMBER: 0, "answer": "ans0"}, + {LINE_NUMBER: 1, "answer": "ans1"}, + {LINE_NUMBER: 2, "answer": "ans2"}, + ] + outputs_path = tmp_path / "outputs.jsonl" + with open(outputs_path, "w") as f: + for row in outputs_data: + f.write(json.dumps(row) + "\n") + + # Patch the LocalStorageOperations instance to point at our temp files + # without needing a full Run / directory tree. + ops = object.__new__(LocalStorageOperations) + ops._sdk_inputs_path = inputs_path + ops._sdk_output_path = outputs_path + + inputs, outputs = ops.load_inputs_and_outputs() + + # inputs must be in ascending line_number order after the fix + assert list(inputs[LINE_NUMBER]) == [0, 1, 2], ( + "load_inputs_and_outputs() returned inputs in wrong order; " + f"got line_numbers={list(inputs[LINE_NUMBER])}" + ) + assert list(inputs["query"]) == ["first", "second", "third"], ( + "Input rows are not aligned with their line_numbers after sort" + ) + + # outputs must also be in ascending line_number order (already guaranteed + # by _outputs_padding / set_index, just verify the contract is intact) + assert list(outputs.index) == [0, 1, 2]