Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,14 @@ def load_inputs_and_outputs(self) -> Tuple["DataFrame", "DataFrame"]:
outputs = self._outputs_padding(outputs, inputs[LINE_NUMBER].tolist())
outputs.fillna(value="(Failed)", inplace=True) # replace nan with explicit prompt
outputs = outputs.set_index(LINE_NUMBER)
# Sort inputs by line_number so their row order matches the sorted
# outputs index. Without this, get_details() merges inputs and
# outputs positionally and rows become misaligned whenever the
# async executor wrote results in a different order than the input
# data was originally listed.
# See https://github.com/microsoft/promptflow/issues/2646
if LINE_NUMBER in inputs.columns:
inputs = inputs.sort_values(LINE_NUMBER, ascending=True).reset_index(drop=True)
return inputs, outputs

def _collect_io_from_debug_info(self) -> Tuple["DataFrame", "DataFrame"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,62 @@ def test_outputs_padding(self) -> None:
assert df_with_padding.iloc[0].to_dict() == {LINE_NUMBER: 1, "col": "a"}
assert df_with_padding.iloc[1].to_dict() == {LINE_NUMBER: 2, "col": "b"}
assert df_with_padding.iloc[2].to_dict() == {LINE_NUMBER: 4, "col": ""}

def test_load_inputs_and_outputs_sorts_inputs_by_line_number(
self, tmp_path
) -> None:
"""Inputs returned by load_inputs_and_outputs must be sorted by line_number.

Regression test for https://github.com/microsoft/promptflow/issues/2646

When the async executor writes results in a different order than the
input dataset, the raw inputs JSON file can have rows in a non-sequential
order (e.g. [2, 0, 1]). _outputs_padding() already sorts outputs by
line_number, but inputs were left unsorted. get_details() merges the
two DataFrames positionally, so mismatched orderings caused input rows
to be paired with the wrong output rows.
"""
import json

# Write inputs file with rows in out-of-order line_number sequence
inputs_data = [
{LINE_NUMBER: 2, "query": "third"},
{LINE_NUMBER: 0, "query": "first"},
{LINE_NUMBER: 1, "query": "second"},
]
inputs_path = tmp_path / "inputs.jsonl"
with open(inputs_path, "w") as f:
for row in inputs_data:
f.write(json.dumps(row) + "\n")

# Write outputs file with rows in sorted line_number sequence
outputs_data = [
{LINE_NUMBER: 0, "answer": "ans0"},
{LINE_NUMBER: 1, "answer": "ans1"},
{LINE_NUMBER: 2, "answer": "ans2"},
]
outputs_path = tmp_path / "outputs.jsonl"
with open(outputs_path, "w") as f:
for row in outputs_data:
f.write(json.dumps(row) + "\n")

# Patch the LocalStorageOperations instance to point at our temp files
# without needing a full Run / directory tree.
ops = object.__new__(LocalStorageOperations)
ops._sdk_inputs_path = inputs_path
ops._sdk_output_path = outputs_path

inputs, outputs = ops.load_inputs_and_outputs()

# inputs must be in ascending line_number order after the fix
assert list(inputs[LINE_NUMBER]) == [0, 1, 2], (
"load_inputs_and_outputs() returned inputs in wrong order; "
f"got line_numbers={list(inputs[LINE_NUMBER])}"
)
assert list(inputs["query"]) == ["first", "second", "third"], (
"Input rows are not aligned with their line_numbers after sort"
)

# outputs must also be in ascending line_number order (already guaranteed
# by _outputs_padding / set_index, just verify the contract is intact)
assert list(outputs.index) == [0, 1, 2]
Loading