Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions libs/langgraph/tests/test_channels.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import operator
from collections.abc import Sequence
from typing import Annotated
Expand All @@ -8,6 +8,7 @@
from langgraph.checkpoint.serde.types import _DeltaSnapshot
from typing_extensions import NotRequired, TypedDict

from langgraph._internal._constants import OVERWRITE
from langgraph._internal._typing import MISSING
from langgraph.channels.binop import BinaryOperatorAggregate
from langgraph.channels.delta import DeltaChannel
Expand Down Expand Up @@ -397,6 +398,49 @@
assert len(state.values["messages"]) == 4 # 2 human + 2 AI


def test_delta_channel_api_json_overwrite_sentinel_snapshots_and_replays() -> None:
def reducer(state: list[str], writes: Sequence[list[str]]) -> list[str]:
result = list(state)
for write in writes:
result.extend(write)
return result

class State(TypedDict):
items: Annotated[
list[str], DeltaChannel(reducer, list, snapshot_frequency=1000)
]

calls = 0

def node(state: State) -> dict:
nonlocal calls
calls += 1
if calls == 1:
return {"items": {OVERWRITE: ["reset"]}}
return {"items": ["after"]}

builder = StateGraph(State)
builder.add_node("node", node)
builder.add_edge(START, "node")

saver = InMemorySaver()
graph = builder.compile(checkpointer=saver)
config = {"configurable": {"thread_id": "overwrite-json-sentinel"}}

updates = list(graph.stream({"items": ["before"]}, config, stream_mode=["updates"]))
assert updates == [("updates", {"node": {"items": {OVERWRITE: ["reset"]}}})]
assert graph.get_state(config).values == {"items": ["reset"]}

first_saved = saver.get_tuple(config)
assert first_saved is not None
snapshot = first_saved.checkpoint["channel_values"].get("items")
assert isinstance(snapshot, _DeltaSnapshot)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 New regression test fails immediately

The test added by this PR fails on the current head: running uv run --directory /langgraph/libs/langgraph pytest /langgraph/libs/langgraph/tests/test_channels.py::test_delta_channel_api_json_overwrite_sentinel_snapshots_and_replays -q reaches this assertion with snapshot is None. With snapshot_frequency=1000, the current checkpoint code does not write an _DeltaSnapshot after the first overwrite update, so this PR would make the test suite fail until the implementation change it depends on is included or the test is marked/structured accordingly.

(Refers to line 437)


Your feedback helps Open SWE learn. React with 👍 or 👎 to tell us if this review comment was useful.

assert snapshot.value == ["reset"]

assert graph.invoke({"items": []}, config) == {"items": ["reset", "after"]}
assert graph.get_state(config).values == {"items": ["reset", "after"]}


# ---------------------------------------------------------------------------
# DeltaChannel — dict reducer
# ---------------------------------------------------------------------------
Expand Down
Loading