Add workflowstreams contrib package#2386
Conversation
Introduces a new contrib/workflowstreams package along with a reserved name prefix exception allowing the __temporal_workflow_stream prefix for signal, update, and query handlers. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Move the truncation/offset check into the workflow.Await closure so it is re-evaluated against baseOffset on every wake. baseOffset can advance via Truncate while a poll is waiting; capturing logOffset once up front left the Await condition permanently unsatisfiable if a truncation passed the waiting offset. The closure now returns when draining, when the requested offset has been truncated away, or when items at/after the offset are available, and the truncation ApplicationError is raised after the wait. Also modernize the "from the beginning" clamp using the built-in max. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a new contrib/workflowstreams module implementing a durable publish/subscribe log hosted inside a Temporal workflow, and introduces an internal reserved-name prefix exception so the contrib package can register its __temporal_workflow_stream_* signal/update/query handlers.
Changes:
- Add
contrib/workflowstreams(workflow-side stream, external client, publisher batching/dedup, wire codec) plus tests and documentation. - Permit a reserved handler-name prefix exception for
__temporal_workflow_stream_*in signal/update/query registration checks. - Add CODEOWNERS for the new contrib package.
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
internal/workflow.go |
Allows reserved-name exception for signal channel, update handler, and query handler names. |
internal/internal_utils.go |
Introduces isReservedNamePrefixException helper for reserved-name bypass. |
internal/reserved_name_exception_test.go |
Adds coverage ensuring the exception is allowed and other reserved names are still rejected. |
contrib/workflowstreams/workflow.go |
Implements the workflow-side durable log, poll update handler, offset query, truncation, and continue-as-new support. |
contrib/workflowstreams/workflow_test.go |
Tests publish, offset query, dedup, and topic-filtered polling behavior in the workflow environment. |
contrib/workflowstreams/types.go |
Defines the cross-language wire protocol types, handler names, and option defaults. |
contrib/workflowstreams/topic_handle.go |
Adds a per-topic external handle wrapper over the client. |
contrib/workflowstreams/README.md |
Documents workflow/client usage, options, and cross-language protocol details. |
contrib/workflowstreams/publisher.go |
Implements client-side buffering, batching, dedup sequencing, and retry/timeout behavior. |
contrib/workflowstreams/client.go |
Implements external publish/subscribe client (signals, poll updates, offset query, CAN/terminal handling). |
contrib/workflowstreams/client_test.go |
Tests publisher flushing, sequencing, force flush, close draining, and timeout behavior using a fake client. |
contrib/workflowstreams/codec.go |
Implements base64-of-proto payload wire encoding/decoding and sizing helper. |
contrib/workflowstreams/codec_test.go |
Tests payload wire round-trips and base64/proto format expectations. |
contrib/workflowstreams/doc.go |
Adds package-level GoDoc with workflow/client usage and protocol notes. |
contrib/workflowstreams/go.mod |
Adds a standalone module definition for the contrib package with a local replace to the repo root. |
contrib/workflowstreams/go.sum |
Adds dependency checksums for the contrib module. |
.github/CODEOWNERS |
Adds ownership for /contrib/workflowstreams/. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Cover the previously untested retry/termination logic in Client.Subscribe using a scripted fake client: offset advancement, truncation reset, clean terminal end, continue-as-new retry, non-terminal error surfacing, context cancellation, and cooldown timing/interruptibility. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
TestFlushTimeoutAfterMaxRetryDuration used MaxRetryDuration: time.Nanosecond and relied on time.Since advancing between rapid back-to-back flushes. On Windows the monotonic timer granularity (~15ms) makes those reads identical, so time.Since returns 0, the retry window is never exceeded, and no FlushTimeoutError is produced — failing CI on windows-latest. Establish the pending batch with one failing flush, then sleep well past a small (1ms) retry window before the flush expected to time out, so the test no longer depends on sub-tick durations. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
followContinueAsNew described the workflow with an empty run id, which returns the current run. After a continue-as-new that is the new RUNNING run, never CONTINUED_AS_NEW (which only sits on the old, closed run), so the check never fired and Subscribe stopped or surfaced an error during a routine rollover instead of following the stream into the successor run. Capture the run id each poll's update is admitted to (wait only for ACCEPTED so the handle — and its RunID — is returned even when the run continues-as-new before the outcome), and describe that specific run on failure. A rolled-over run reports CONTINUED_AS_NEW, a terminal run reports a terminal status, and a still-RUNNING run means a transient error that should surface. This also avoids mistaking an unrelated new execution that reused the workflow id for a successor. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The poll update's validator rejected new polls during detach-for-CAN with an untyped error, which Subscribe did not classify and surfaced to the caller — ending the subscription with an error during a routine rollover. Give the validator a well-known ErrTypeStreamDraining ApplicationError type and have Subscribe back off and retry on it, so the poll lands on the successor run once the rollover completes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
|
||
| // Run your workflow; the stream serves external publishers and subscribers | ||
| // for as long as the workflow is running. | ||
| return workflow.Await(ctx, func() bool { return done }) |
There was a problem hiding this comment.
Where does done get defined?
There was a problem hiding this comment.
It's supposed to be a generic exit condition, set by a signal for example. Let me add an explanation to the comment.
|
|
||
| ```go | ||
| return stream.ContinueAsNew(ctx, MyWorkflow, func(state *workflowstreams.WorkflowStreamState) []any { | ||
| return []any{state} |
There was a problem hiding this comment.
When would someone want to add different logic into this callback?
There was a problem hiding this comment.
The idea is to give the user a place to combine the workflow streams CAN state with their own CAN state. Typically you would just combine these into a list of two, but you might want something more complex like agent history compaction.
There was a problem hiding this comment.
Updating the text to make that more clear...
| // State is captured with the default 15-minute publisher TTL. For a custom TTL, | ||
| // use the manual recipe: DetachPollers, Await(AllHandlersFinished), GetState, | ||
| // then workflow.NewContinueAsNewError. | ||
| func (s *WorkflowStream) ContinueAsNew(ctx workflow.Context, wfn any, buildArgs func(state *WorkflowStreamState) []any) error { |
There was a problem hiding this comment.
Should this be named NewContinueAsNewError to mirror workflow.NewContinueAsNewError?
The current ContinueAsNew matches Python/TS, but the Go method returns an error that callers must return. On the other hand, it does more than construct an error because it detaches pollers and waits for handlers, so New... may understate the side effects.
There was a problem hiding this comment.
Went ahead with the rename. Cross-SDK compatibility is less important than maintaining existing patterns from what I can tell. Most developers are just using one Temporal SDK.
Rename NewStream to NewWorkflowStream and the internal reserved-name helper isReservedNamePrefixException to isWorkflowStreamReservedName (plus its test file). Rework the continue-as-new docs in README.md and doc.go into an explicit capture/restore round-trip that threads the caller's own state alongside the captured stream state. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per-item values were serialized with a full converter.DataConverter, so a codec-enabled converter (converter.NewCodecDataConverter) would run the codec chain per item and again on the signal/update envelope, double- encoding every item. Both sides now take PayloadConverters and build a codec-free CompositeDataConverter, making codec-per-item impossible by type and matching the Python/TypeScript packages: - client: Options.DataConverter -> Options.PayloadConverters - workflow: NewWorkflowStream WithPayloadConverters option (the workflow context's converter has no public accessor, so it is configured here) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Mirror workflow.NewContinueAsNewError so the Go-specific contract is clear: the caller must return the result to end the run (unlike the Python/TS continue_as_new, which self-executes by raising). The doc comment notes the added side effects — it drains pollers and waits for in-flight handlers before capturing state. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What
Adds a new
contrib/workflowstreamspackage: a durable publish/subscribe log hosted inside a Temporal workflow.External code (activities, starters, other workflows) publishes messages to named topics via signals; subscribers long-poll for new items via updates; a query exposes the current offset. The stream is backed by Temporal's durable execution, giving exactly-once, ordered, cross-language delivery with client-side batching, publisher dedup, continue-as-new survival, truncation, and response paging.
The handler names, JSON envelope field names, and per-item payload encoding match the Python (
temporalio.contrib.workflow_streams) and TypeScript (@temporalio/workflow-streams) packages, so a Go publisher/subscriber interoperates with a Python/TypeScript workflow and vice versa.Supporting changes
__temporal_workflow_streamprefix for signal, update, and query handlers (internal/internal_utils.go,internal/workflow.go), whichworkflowstreamsrelies on for its internal handler names.internal/reserved_name_exception_test.gocovering the exception./contrib/workflowstreams/(@temporalio/sdk @temporalio/ai-sdk).🤖 Generated with Claude Code