Skip to content

Harden frame processing on large data#69125

Open
uranusjr wants to merge 1 commit into
apache:mainfrom
astronomer:chunk-java-comm
Open

Harden frame processing on large data#69125
uranusjr wants to merge 1 commit into
apache:mainfrom
astronomer:chunk-java-comm

Conversation

@uranusjr

Copy link
Copy Markdown
Member

Previously, frame encoding and decoding are done against an in-memory byte array. This is simple, but may cause issues with very large amount of data, since the frame protocol allows 2^32 bytes of data per frame with the potential to clog the entire JVM.

This uses the MessagePack library's MessageBuffer helper to encode to and decode from a MessagePack message into multiple lazy buffers, converting each buffer to a byte array separately on demand to reduce peak memory usage.

I also cleaned up some abstractions since they are already pretty empty prior to this change.

Previously, frame encoding and decoding are done against an in-memory
byte array. This is simple, but may cause issues with very large amount
of data, since the frame protocol allows 2^32 bytes of data per frame
with the potential to clog the entire JVM.

This uses the MessagePack library's MessageBuffer helper to encode to
and decode from a MessagePack message into multiple lazy buffers,
converting each buffer to a byte array separately on demand to reduce
peak memory usage.

I also cleaned up some abstractions since they are already pretty empty
prior to this change.
@uranusjr uranusjr marked this pull request as ready for review June 29, 2026 19:23
@uranusjr uranusjr requested a review from jason810496 as a code owner June 29, 2026 19:23
Comment on lines +109 to +118
try {
Frame.decode(ChannelFrameInput(reader, declaredLength))
} catch (e: Exception) {
logger.error(
"Failed to read or decode frame",
mapOf("length" to declaredLength, "exception" to e),
)
shutDownRequested = true
return
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try {
Frame.decode(ChannelFrameInput(reader, declaredLength))
} catch (e: Exception) {
logger.error(
"Failed to read or decode frame",
mapOf("length" to declaredLength, "exception" to e),
)
shutDownRequested = true
return
}
try {
Frame.decode(ChannelFrameInput(reader, declaredLength))
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.error(
"Failed to read or decode frame",
mapOf("length" to declaredLength, "exception" to e),
)
shutDownRequested = true
return
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(needs import kotlinx.coroutines.CancellationException)

catch (e: Exception) also catches CancellationException, so a cancellation during decode becomes a clean shutdown instead of propagating. Minor and
pre-existing in this file, but the new broad catch makes it worth rethrowing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants