Skip to content

fix: unify schema across batches in JSONStreamDatasource to handle null → concrete type evolution#972

Open
fengrui-z wants to merge 7 commits into
datajuicer:mainfrom
fengrui-z:fix/json-stream-schema-lock
Open

fix: unify schema across batches in JSONStreamDatasource to handle null → concrete type evolution#972
fengrui-z wants to merge 7 commits into
datajuicer:mainfrom
fengrui-z:fix/json-stream-schema-lock

Conversation

@fengrui-z

@fengrui-z fengrui-z commented Apr 29, 2026

Copy link
Copy Markdown
Collaborator

Summary

Fixes #936

JSONStreamDatasource._read_stream locks the schema from the first batch and reuses it for all subsequent batches. When an
early batch infers a nested field as null (e.g. meta.url = null) and a later batch introduces a concrete type (e.g.
string), the forced cast from string to null fails with ArrowInvalid.

This is a correctness bug in DJ's custom JSON streaming ingestion path. Ray's native ray.data.read_json handles the same input correctly.

Root Cause

# Before: first batch locks schema, all subsequent batches forced to it
table = pyarrow.Table.from_batches([batch], schema=schema)
if schema is None:
    schema = table.schema  # locked forever

Fix

  1. Remove the first-batch schema lock — create table without forced schema
  2. Use pyarrow.unify_schemas to merge schemas across batches, allowing null → concrete type promotion
  3. After unification, cast the batch to the unified schema for consistency
  # After: schema evolves across batches
  table = pyarrow.Table.from_batches([batch])
  if schema is None:
      schema = table.schema
  else:
      unified = pyarrow.unify_schemas([schema, table.schema])
      if not unified.equals(schema):
          schema = unified
      table = pyarrow.Table.from_batches([batch], schema=schema)

unify_schemas internally delegates to Arrow C++ UnifyTypes, which promotes null to the concrete type and recursively handles nested structs.

Test Plan

See #936 for the minimal reproduction script.

…ll → concrete type evolution

The previous implementation locked the schema from the first batch and
reused it for all subsequent batches via `Table.from_batches([batch],
schema=schema)`. When an early batch inferred a nested field as `null`
(e.g. `meta.url = null`) and a later batch introduced a concrete type
(e.g. `string`), the cast from `string` to `null` would fail with
ArrowInvalid.

This fix removes the first-batch schema lock and instead uses
`pyarrow.unify_schemas` to merge schemas across batches, allowing
`null` types to be promoted to concrete types as new data is read.

Fixes datajuicer#936

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the _read_stream method in ray_dataset.py to support schema unification when reading batches from a stream. This allows the system to handle batches with varying but compatible schemas. A review comment suggests refactoring the implementation to reduce code duplication by consolidating the pyarrow.Table creation after the final schema has been determined.

Comment thread data_juicer/core/data/ray_dataset.py Outdated
Ray's internal concat requires all tables from a single file to have
identical schemas. The streaming reader cannot guarantee this when
schema evolution occurs (null → concrete type triggers ArrowInvalid
at block boundaries).

Approach:
- Buffer all batches from the streaming reader, unify schema, then
  yield with consistent schema. This is necessary because Ray's
  _combine_tables fails on struct child type mismatch (null vs string).
- When PyArrow's reader throws ArrowInvalid ("changed from"), fall
  back to paj.read_json() which handles schema inference across all
  rows in a single pass.

Performance note: buffering adds O(file_size) memory within the read
task, but Ray already materializes the full generator output per task
before passing downstream, so latency impact is negligible.

Fixes datajuicer#936

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@fengrui-z

Copy link
Copy Markdown
Collaborator Author

Investigated the performance concern. Key findings:

  1. Buffering is unavoidable — Ray's internal _combine_tables_align_struct_fields requires all tables from a single file to have identical schemas. Yielding tables with mixed schemas (e.g., meta.url: null then meta.url: string) causes ArrowInvalid: Struct child array #0 does not match type field: null vs string.

  2. Actual performance impact is minimal — Ray materializes the full generator output per read task before passing downstream, so latency is unaffected. The memory cost (holding all batches simultaneously) is equivalent to the paj.read_json() fallback path.

  3. Current fix: Buffer batches + unify_schemas for the happy path; fall back to paj.read_json(path) when PyArrow's streaming reader throws ArrowInvalid("changed from") at block boundaries (the [Bug] JSONStreamDatasource locks first-batch schema and fails on later null -> concrete type evolution #936 case where null → concrete type evolution can't be handled mid-stream).

@cyruszhang cyruszhang left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling #936. I reproduced the PyArrow behavior locally: open_json fails on the null -> string transition while read_json succeeds, so the correctness issue here is real.

Two things I think we should address before merging:

  1. The fallback reopens with paj.read_json(path, ...). _read_stream receives path after Ray has normalized it against the configured filesystem, so it is not always a standalone local path/URI. When a caller passes a filesystem, this bypasses the already-open stream/filesystem and can fail. I verified this with a pyarrow.fs.SubTreeFileSystem: the absolute-path case succeeds, while the filesystem-relative path case fails with FileNotFoundError: ... schema_evolution.jsonl. The fallback should reopen through the same filesystem/input source or otherwise preserve filesystem support; please add a filesystem-backed regression test for this path.

  2. batches = [] now buffers every record batch for every file, even when the schema never evolves. That changes read_json_stream from streaming to whole-file memory use. Ray's ReadTask yields blocks from the iterator and its datasource contract explicitly allows a single large file to return multiple blocks to avoid OOM, so this can regress large JSONL workloads. If full-file inference is unavoidable for evolving schemas, I think we should make that behavior explicit and/or limit it to the fallback path rather than silently applying it to all reads.

@cyruszhang

Copy link
Copy Markdown
Collaborator

Thanks for tackling #936. I reproduced the PyArrow behavior locally: open_json fails on the null -> string transition while read_json succeeds, so the correctness issue here is real.

Two things I think we should address before merging:

  1. The fallback reopens with paj.read_json(path, ...). _read_stream receives path after Ray has normalized it against the configured filesystem, so it is not always a standalone local path/URI. When a caller passes a filesystem, this bypasses the already-open stream/filesystem and can fail. I verified this with a pyarrow.fs.SubTreeFileSystem: the absolute-path case succeeds, while the filesystem-relative path case fails with FileNotFoundError: ... schema_evolution.jsonl. The fallback should reopen through the same filesystem/input source or otherwise preserve filesystem support; please add a filesystem-backed regression test for this path.
  2. batches = [] now buffers every record batch for every file, even when the schema never evolves. That changes read_json_stream from streaming to whole-file memory use. Ray's ReadTask yields blocks from the iterator and its datasource contract explicitly allows a single large file to return multiple blocks to avoid OOM, so this can regress large JSONL workloads. If full-file inference is unavoidable for evolving schemas, I think we should make that behavior explicit and/or limit it to the fallback path rather than silently applying it to all reads.

One more, independent of the buffering discussion: the unify_schemas happy path is currently broken.

Table.from_batches([batch], schema=unified) does not cast — it requires the batch schema to already equal the target. The moment a batch's schema differs from the unified one, it raises ArrowInvalid: Schema at index 0 was different. Note that message does not contain "changed from", so it escalates straight
to the ValueError instead of recovering.

import pyarrow as pa
b1 = pa.RecordBatch.from_pylist([{"meta": {"url": None}}]) # struct<url: null>
b2 = pa.RecordBatch.from_pylist([{"meta": {"url": "x"}}]) # struct<url: string>
unified = pa.unify_schemas([b1.schema, b2.schema]) # struct<url: string>

pa.Table.from_batches([b1], schema=unified) # ArrowInvalid: Schema at index 0 was different
pa.Table.from_batches([b1]).cast(unified) # OK ✅ (pyarrow 23.0.1)

So the for batch in batches: yield Table.from_batches([batch], schema=schema) loop fails on any batch whose schema isn't already the unified one. #936 never reaches it (it errors earlier inside read_next_batch, via the "changed from" fallback), so this branch is effectively untested dead code today.

Fix is Table.from_batches([batch]).cast(schema).

- Fix happy path casting bug: use .cast(schema) instead of forcing schema in from_batches
- Fix memory regression: streaming fast path without buffering, only buffer on schema evolution
- Fix filesystem fallback: reopen file through same filesystem abstraction instead of using path directly
- Add SubTreeFileSystem regression test to verify filesystem-backed fallback works correctly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] JSONStreamDatasource locks first-batch schema and fails on later null -> concrete type evolution

2 participants