From 0fbf9a36409ff41676422d41ac16ae9a830ee1f3 Mon Sep 17 00:00:00 2001 From: Maruthi ChandraSekhar Vemuri Date: Tue, 23 Jun 2026 14:12:35 -0700 Subject: [PATCH 1/3] NEXUS-474: Support UpdateWorkflow as a Nexus Operation --- go.mod | 5 +- go.sum | 6 +- internal/client.go | 26 ++++ internal/cmd/build/go.mod | 5 +- internal/cmd/build/go.sum | 6 +- internal/cmd/build/main.go | 3 +- internal/interceptor.go | 7 ++ internal/internal_update.go | 6 + internal/internal_workflow_client.go | 36 ++++++ temporalnexus/temporal_operation.go | 102 +++++++++++++++ temporalnexus/temporal_operation_test.go | 36 +++++- temporalnexus/token.go | 54 ++++++-- temporalnexus/token_test.go | 8 +- test/go.mod | 5 +- test/go.sum | 6 +- test/nexus_update_op_test.go | 153 +++++++++++++++++++++++ 16 files changed, 433 insertions(+), 31 deletions(-) create mode 100644 test/nexus_update_op_test.go diff --git a/go.mod b/go.mod index bc603be81..215f142da 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk -go 1.24.0 +go 1.25.4 require ( github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a @@ -12,7 +12,7 @@ require ( github.com/nexus-rpc/sdk-go v0.6.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.12 + go.temporal.io/api v1.62.14 golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 golang.org/x/time v0.3.0 @@ -23,6 +23,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect golang.org/x/net v0.49.0 // indirect diff --git a/go.sum b/go.sum index 162ad9ad3..8bd20356b 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -55,8 +57,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/client.go b/internal/client.go index 8b16a069d..f6f0a18cc 100644 --- a/internal/client.go +++ b/internal/client.go @@ -1855,6 +1855,22 @@ func SetLinksOnStartWorkflowOptions(opts *StartWorkflowOptions, links []*commonp opts.links = links } +// interface utility wrapper to allow setting links and callbacks +// on temporal primitive operation options (UpdateWorkflowOptions, etc) +// draft-review: set it on StartWorkflowOptions above as well +type nexusOperationOptions interface { + setLinks(links []*commonpb.Link) + setCallbacks(callbacks []*commonpb.Callback) +} + +func SetLinksOnNexusOperation(opts nexusOperationOptions, links []*commonpb.Link) { + opts.setLinks(links) +} + +func SetCallbacksOnNexusOperation(opts nexusOperationOptions, callbacks []*commonpb.Callback) { + opts.setCallbacks(callbacks) +} + // SetOnConflictOptionsOnStartWorkflowOptions is an internal only method for setting conflict // options on StartWorkflowOptions. // OnConflictOptions are purposefully not exposed to users for the time being. @@ -1876,6 +1892,16 @@ func SetResponseInfoOnStartWorkflowOptions(opts *StartWorkflowOptions) *startWor return opts.responseInfo } +// SetResponseInfoOnUpdateWorkflowOptions is an internal only method to set and return a +// responseInfo pointer. This is done to capture links from the response RPC to be used +// for nexus forward links on UpdateWorkflow Nexus Operations +func SetResponseInfoOnUpdateWorkflowOptions(opts *UpdateWorkflowOptions) *updateWorkflowResponseInfo { + if opts.responseInfo == nil { + opts.responseInfo = &updateWorkflowResponseInfo{} + } + return opts.responseInfo +} + // collectStorageDriverTypes returns deduplicated driver types from the given drivers. func collectStorageDriverTypes(drivers []converter.StorageDriver) []string { if len(drivers) == 0 { diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index af6f1c820..b34b36e1d 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/internal/cmd/build -go 1.24.0 +go 1.25.4 require ( github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c @@ -17,12 +17,13 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.31.0 // indirect golang.org/x/net v0.49.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index d5be1328d..2408fcc15 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -32,6 +32,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -59,8 +61,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index d80c7056b..13a22c079 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -165,9 +165,10 @@ func (b *builder) integrationTest() error { "--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`, "--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`, "--dynamic-config-value", "nexusoperation.enableStandalone=true", - "--dynamic-config-value", "history.enableChasmCallbacks=true", + "--dynamic-config-value", "history.enableCHASMCallbacks=true", "--dynamic-config-value", "frontend.ListWorkersEnabled=true", "--dynamic-config-value", "activity.startDelayEnabled=true", + "--dynamic-config-value", "history.enableUpdateCallbacks=true", }, }) if err != nil { diff --git a/internal/interceptor.go b/internal/interceptor.go index a6f26ec1b..17e83bfcc 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -509,6 +509,13 @@ type ClientUpdateWorkflowInput struct { FirstExecutionRunID string // WaitForStage is the stage to wait for. WaitForStage WorkflowUpdateStage + + // links. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + links []*commonpb.Link + // for backward links from the target namespace sent via operation options. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + callbacks []*commonpb.Callback + // gRPC request response trap for nexus forward links + responseInfo *updateWorkflowResponseInfo } // Exposed as: [go.temporal.io/sdk/interceptor.ClientUpdateWithStartWorkflowInput] diff --git a/internal/internal_update.go b/internal/internal_update.go index 4020c4884..0f29e8564 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -30,6 +30,12 @@ const ( WorkflowUpdateStageCompleted ) +// workflow is async iff update stage is WorkflowUpdateStageAccepted- +// as WorkflowUpdateStageCompleted blocks on UpdateWorkflow itself +func (w WorkflowUpdateStage) IsAsyncUpdateWorkflow() bool { + return w == WorkflowUpdateStageAccepted +} + const ( updateStateNew updateState = "New" updateStateRequestInitiated updateState = "RequestScheduled" diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 0667df235..39c941fe1 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -994,6 +994,26 @@ type UpdateWorkflowOptions struct { // then the server will reject the update request with an error. // Note that it is incompatible with UpdateWithStartWorkflowOperation. FirstExecutionRunID string + + // workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + callbacks []*commonpb.Callback + // for backward links from the target namespace sent via operation options. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + links []*commonpb.Link + // gRPC request response trap for nexus forward links + responseInfo *updateWorkflowResponseInfo +} + +type updateWorkflowResponseInfo struct { + // Link to the workflow event. + Link *commonpb.Link +} + +func (u *UpdateWorkflowOptions) setLinks(links []*commonpb.Link) { + u.links = links +} + +func (u *UpdateWorkflowOptions) setCallbacks(callbacks []*commonpb.Callback) { + u.callbacks = callbacks } // UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow. @@ -2837,6 +2857,10 @@ func (w *workflowClientInterceptor) UpdateWorkflow( return nil, err } + if responseInfo := in.responseInfo; responseInfo != nil { + responseInfo.Link = resp.GetLink() + } + // Here we know the update is at least accepted desiredLifecycleStage := updateLifeCycleStageToProto(in.WaitForStage) return w.updateHandleFromResponse(ctx, desiredLifecycleStage, resp) @@ -2864,6 +2888,12 @@ func createUpdateWorkflowInput(options *UpdateWorkflowOptions) (*ClientUpdateWor return nil, errors.New("WaitForStage WorkflowUpdateStageAdmitted is not supported") } + // draft-review: check if this could be valid for some edge cases + if options.WaitForStage == WorkflowUpdateStageCompleted && len(options.callbacks) > 0 { + return nil, errors.New("WaitForStage WorkflowUpdateStageCompleted does not support callbacks " + + "as it is already a synchronous operation") + } + return &ClientUpdateWorkflowInput{ UpdateID: updateID, WorkflowID: options.WorkflowID, @@ -2872,6 +2902,9 @@ func createUpdateWorkflowInput(options *UpdateWorkflowOptions) (*ClientUpdateWor RunID: options.RunID, FirstExecutionRunID: options.FirstExecutionRunID, WaitForStage: options.WaitForStage, + links: options.links, + callbacks: options.callbacks, + responseInfo: options.responseInfo, }, nil } @@ -2906,6 +2939,9 @@ func (w *workflowClientInterceptor) createUpdateWorkflowRequest( }, FirstExecutionRunId: in.FirstExecutionRunID, Request: &updatepb.Request{ + RequestId: in.UpdateID, + CompletionCallbacks: in.callbacks, + Links: in.links, Meta: &updatepb.Meta{ UpdateId: in.UpdateID, Identity: w.client.identity, diff --git a/temporalnexus/temporal_operation.go b/temporalnexus/temporal_operation.go index 3ada4bc66..4a1a93b7c 100644 --- a/temporalnexus/temporal_operation.go +++ b/temporalnexus/temporal_operation.go @@ -7,11 +7,16 @@ import ( "sync/atomic" "github.com/nexus-rpc/sdk-go/nexus" + "go.temporal.io/api/common/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/internal" "go.temporal.io/sdk/workflow" ) +const ( + nexusHeaderOperationId = "nexus-operation-id" // draft-review: check if this belongs in some other package/use this in workflow as well +) + // StartTemporalOperationOptions are options provided to the Start callback of a Temporal Nexus operation. // Mirrors [nexus.StartOperationOptions]. // @@ -155,6 +160,98 @@ func StartUntypedWorkflow[R any]( return NewAsyncResult[R](handle.token()), nil } +func StartUpdateWorkflow[R any]( + ctx context.Context, + nc NexusClient, + updateWorkflowOptions client.UpdateWorkflowOptions, +) (TemporalOperationResult[R], error) { + var encodedToken string + isAsyncUpdate := updateWorkflowOptions.WaitForStage.IsAsyncUpdateWorkflow() + asyncOpFailed := true + if isAsyncUpdate { + if nc.startOperationOptions.CallbackURL == "" { + return TemporalOperationResult[R]{}, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeBadRequest, + "callback URL required for async UpdateWorkflow operation invocations") + } + if nc.asyncStarted != nil && !nc.asyncStarted.CompareAndSwap(false, true) { + return TemporalOperationResult[R]{}, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeBadRequest, + "only one async operation can be started per operation invocation") + } + token, err := generateUpdateOperationToken() + if err != nil { + return TemporalOperationResult[R]{}, err + } + encodedToken = token + defer func() { + if asyncOpFailed && nc.asyncStarted != nil { + nc.asyncStarted.Store(false) + } + }() + } + + links, err := convertNexusLinks(nc.startOperationOptions.Links, GetLogger(ctx)) + if err != nil { + return TemporalOperationResult[R]{}, &nexus.HandlerError{ + Type: nexus.HandlerErrorTypeBadRequest, + Message: "could not convert links for update workflow", + Cause: err, + } + } + internal.SetLinksOnNexusOperation(&updateWorkflowOptions, links) + + // unsure what this means, check in review + // ctx = context.WithValue(ctx, internal.IsWorkflowRunOpContextKey, true) + + // set callbacks only for async + if isAsyncUpdate { + if nc.startOperationOptions.CallbackHeader == nil { + nc.startOperationOptions.CallbackHeader = make(nexus.Header) + } + // This field is expected to be populated by servers older than 1.27.0. + // draft-review: can this check be removed now? at least for UpdateWorkflow + nc.startOperationOptions.CallbackHeader.Set(nexusHeaderOperationId, encodedToken) + nc.startOperationOptions.CallbackHeader.Set(nexus.HeaderOperationToken, encodedToken) + internal.SetCallbacksOnNexusOperation(&updateWorkflowOptions, []*common.Callback{ + { + Variant: &common.Callback_Nexus_{ + Nexus: &common.Callback_Nexus{ + Url: nc.startOperationOptions.CallbackURL, + Header: nc.startOperationOptions.CallbackHeader, + }, + }, + Links: links, + }, + }) + } + + responseInfo := internal.SetResponseInfoOnUpdateWorkflowOptions(&updateWorkflowOptions) + + handle, err := GetClient(ctx).UpdateWorkflow(ctx, updateWorkflowOptions) + if err != nil { + return TemporalOperationResult[R]{}, err + } + + if responseInfo.Link == nil { + return TemporalOperationResult[R]{}, &nexus.HandlerError{ + Type: nexus.HandlerErrorTypeInternal, + Cause: errors.New("unexpected error retrieving links from UpdateWorkflow response"), + } + } + + nexus.AddHandlerLinks(ctx, ConvertLinkWorkflowEventToNexusLink(responseInfo.Link.GetWorkflowEvent())) + + if isAsyncUpdate { + asyncOpFailed = false + return NewAsyncResult[R](encodedToken), nil + } else { + var result R + if err := handle.Get(ctx, &result); err != nil { + return TemporalOperationResult[R]{}, err + } + return NewSyncResult(result), nil + } +} + // TemporalOperationOptions configures a generic Temporal Nexus operation. // // Asynchronous workflow-backed operation: @@ -299,6 +396,11 @@ func (o *temporalOperation[I, O]) Cancel(ctx context.Context, token string, opti return o.options.CancelWorkflowRun(ctx, GetClient(ctx), CancelTemporalWorkflowRunOptions{ WorkflowID: wfToken.WorkflowID, }, options) + case operationTokenTypeUpdateWorkflow: + return &nexus.HandlerError{ + Type: nexus.HandlerErrorTypeBadRequest, + Cause: errors.New("cannot cancel an UpdateWorkflow operation"), + } default: return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeBadRequest, "unknown operation token type: %d", tokenType) } diff --git a/temporalnexus/temporal_operation_test.go b/temporalnexus/temporal_operation_test.go index e13ee76e7..026501cd6 100644 --- a/temporalnexus/temporal_operation_test.go +++ b/temporalnexus/temporal_operation_test.go @@ -116,9 +116,8 @@ func TestLoadTokenType(t *testing.T) { // Unknown type=99 unknownToken := base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(`{"t":99}`)) - tokenType, err = loadTokenType(unknownToken) - require.NoError(t, err) - require.Equal(t, operationTokenType(99), tokenType) + _, err = loadTokenType(unknownToken) + require.EqualError(t, err, "invalid operation token: 99") // Empty token _, err = loadTokenType("") @@ -136,7 +135,7 @@ func TestLoadTokenType(t *testing.T) { // Missing type field (t=0) missingType := base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(`{"ns":"ns"}`)) _, err = loadTokenType(missingType) - require.ErrorContains(t, err, "missing or zero token type") + require.ErrorContains(t, err, "invalid operation token: 0") } func TestDoubleStartGuard(t *testing.T) { @@ -155,6 +154,35 @@ func TestDoubleStartGuard(t *testing.T) { require.Equal(t, nexus.HandlerErrorTypeBadRequest, handlerErr.Type) } +func TestStartUpdateWorkflowGuards(t *testing.T) { + nc := NexusClient{ + asyncStarted: &atomic.Bool{}, + startOperationOptions: nexus.StartOperationOptions{CallbackURL: "temporal://dummy"}, + } + // attempt running async with an exhuasted nexusClient handler + nc.asyncStarted.Store(true) + _, err := StartUpdateWorkflow[string](context.Background(), nc, client.UpdateWorkflowOptions{ + WaitForStage: client.WorkflowUpdateStageAccepted, + }) + var handlerErr *nexus.HandlerError + require.ErrorAs(t, err, &handlerErr) + require.Equal(t, nexus.HandlerErrorTypeBadRequest, handlerErr.Type) + require.ErrorContains(t, err, "only one async operation") + // attempt to run async without callback + _, err = StartUpdateWorkflow[string]( + context.Background(), + NexusClient{ + asyncStarted: &atomic.Bool{}, + }, + client.UpdateWorkflowOptions{ + WaitForStage: client.WorkflowUpdateStageAccepted, + }, + ) + require.ErrorAs(t, err, &handlerErr) + require.Equal(t, nexus.HandlerErrorTypeBadRequest, handlerErr.Type) + require.ErrorContains(t, err, "callback URL required") +} + func strPtr(s string) *string { return &s } diff --git a/temporalnexus/token.go b/temporalnexus/token.go index 70949cb41..aad6a16c1 100644 --- a/temporalnexus/token.go +++ b/temporalnexus/token.go @@ -10,31 +10,63 @@ import ( type operationTokenType int const ( - operationTokenTypeWorkflowRun = operationTokenType(1) + operationTokenTypeReserved operationTokenType = iota + operationTokenTypeWorkflowRun + operationTokenTypeUpdateWorkflow + // also Update With Start, Get Workflow Result, Stand Alone Activities + operationTokenTypeMaxVal ) -// workflowRunOperationToken is the decoded form of the workflow run operation token. -type workflowRunOperationToken struct { +func (o operationTokenType) IsValid() bool { + return 0 < o && o < operationTokenTypeMaxVal +} + +// commmon meta for all operation token types +type operationToken struct { // Version of the token, by default we assume we're on version 1, this field is not emitted as part of the output, // it's only used to reject newer token versions on load. Version int `json:"v,omitempty"` - // Type of the operation. Must be operationTypeWorkflowRun. - Type operationTokenType `json:"t"` - NamespaceName string `json:"ns"` - WorkflowID string `json:"wid"` + // Type of the operation. + Type operationTokenType `json:"t"` +} + +// workflowRunOperationToken is the decoded form of the workflow run operation token. +type workflowRunOperationToken struct { + operationToken + NamespaceName string `json:"ns"` + WorkflowID string `json:"wid"` +} + +// updateWorkflow contains only meta - because it cannot be cancelled +type updateWorkflowOperationToken struct { + operationToken } func generateWorkflowRunOperationToken(namespace, workflowID string) (string, error) { token := workflowRunOperationToken{ - Type: operationTokenTypeWorkflowRun, NamespaceName: namespace, WorkflowID: workflowID, } + token.Type = operationTokenTypeWorkflowRun data, err := json.Marshal(token) if err != nil { return "", fmt.Errorf("failed to marshal workflow run operation token: %w", err) } - return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(data), nil + return base64EncodedString(data), nil +} + +func generateUpdateOperationToken() (string, error) { + token := updateWorkflowOperationToken{} + token.Type = operationTokenTypeUpdateWorkflow + data, err := json.Marshal(token) + if err != nil { + return "", fmt.Errorf("failed to marshal operation token: %w", err) + } + return base64EncodedString(data), nil +} + +func base64EncodedString(data []byte) string { + return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(data) } // loadTokenType decodes just the type field from an operation token without full validation. @@ -53,8 +85,8 @@ func loadTokenType(data string) (operationTokenType, error) { if err := json.Unmarshal(b, &partial); err != nil { return 0, fmt.Errorf("failed to unmarshal operation token: %w", err) } - if partial.Type == 0 { - return 0, errors.New("invalid operation token: missing or zero token type") + if !partial.Type.IsValid() { + return 0, fmt.Errorf("invalid operation token: %d", partial.Type) } return partial.Type, nil } diff --git a/temporalnexus/token_test.go b/temporalnexus/token_test.go index 9b71fb879..67157a7b6 100644 --- a/temporalnexus/token_test.go +++ b/temporalnexus/token_test.go @@ -10,7 +10,9 @@ import ( func TestEncodeDecodeWorkflowRunOperationToken(t *testing.T) { wrt := workflowRunOperationToken{ - Type: operationTokenTypeWorkflowRun, + operationToken: operationToken{ + Type: operationTokenTypeWorkflowRun, + }, NamespaceName: "ns", WorkflowID: "w", } @@ -50,9 +52,9 @@ func TestDecodeWorkflowRunOperationTokenErrors(t *testing.T) { _, err = loadWorkflowRunOperationToken(invalidJSONToken) require.ErrorContains(t, err, "failed to unmarshal workflow run operation token: invalid character 'i' looking for beginning of value") - invalidTypeToken := base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(`{"t":2}`)) + invalidTypeToken := base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(`{"t":3}`)) _, err = loadWorkflowRunOperationToken(invalidTypeToken) - require.ErrorContains(t, err, "invalid workflow token type: 2, expected: 1") + require.ErrorContains(t, err, "invalid workflow token type") missingWIDToken := base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString([]byte(`{"t":1}`)) _, err = loadWorkflowRunOperationToken(missingWIDToken) diff --git a/test/go.mod b/test/go.mod index 41dd0124e..2d3800107 100644 --- a/test/go.mod +++ b/test/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/test -go 1.24.0 +go 1.25.4 require ( github.com/golang/mock v1.6.0 @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/otel v1.41.0 go.opentelemetry.io/otel/sdk v1.41.0 go.opentelemetry.io/otel/trace v1.41.0 - go.temporal.io/api v1.62.12 + go.temporal.io/api v1.62.14 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 @@ -39,6 +39,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/test/go.sum b/test/go.sum index d87f6d755..7c90c5c26 100644 --- a/test/go.sum +++ b/test/go.sum @@ -99,6 +99,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= @@ -176,8 +178,8 @@ go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3Tri go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/test/nexus_update_op_test.go b/test/nexus_update_op_test.go new file mode 100644 index 000000000..f241bafa3 --- /dev/null +++ b/test/nexus_update_op_test.go @@ -0,0 +1,153 @@ +package test_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporalnexus" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +type updateAddInput struct { + CounterID string + Amount int +} + +type updateAddOutput struct { + NewValue int +} + +const ( + serviceName = "counter-update-service" + addOperation = "addOperation" + addUpdate = "addUpdate" + doneSignal = "done" +) + +func TestAsyncUpdateWorkflowOperation(t *testing.T) { + testUpdateWorkflowOperation(t, true) +} + +func TestSyncUpdateWorkflowOperation(t *testing.T) { + testUpdateWorkflowOperation(t, false) +} + +// run both via "go run . integration-test -dev-server -run UpdateWorkflowOperation" +func testUpdateWorkflowOperation(t *testing.T, isAsync bool) { + ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) + defer cancel() + tc := newTestContext(t, ctx) + + addOp, err := getAddOperation(isAsync) + require.NoError(t, err) + + callerWorkflow := getCallerWorkflow(tc, addOp, isAsync) + + w := worker.New(tc.client, tc.taskQueue, worker.Options{}) + service := nexus.NewService(serviceName) + require.NoError(t, service.Register(addOp)) + + w.RegisterNexusService(service) + w.RegisterWorkflow(counterWorkflow) + w.RegisterWorkflow(callerWorkflow) + require.NoError(t, w.Start()) + defer w.Stop() + + counterWorkflowID := "counter-" + uuid.NewString() + counterRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: counterWorkflowID, + TaskQueue: tc.taskQueue, + }, counterWorkflow) + require.NoError(t, err) + + defer func() { + require.NoError(t, tc.client.SignalWorkflow(ctx, counterWorkflowID, "", doneSignal, nil)) + require.NoError(t, counterRun.Get(ctx, nil)) + }() + + counterUpdateWorkflowRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "caller-" + uuid.NewString(), + TaskQueue: tc.taskQueue, + WorkflowTaskTimeout: time.Second, + }, callerWorkflow, updateAddInput{CounterID: counterWorkflowID, Amount: 5}) + require.NoError(t, err) + + var out updateAddOutput + require.NoError(t, counterUpdateWorkflowRun.Get(ctx, &out)) + require.Equal(t, 5, out.NewValue) +} + +func counterWorkflow(ctx workflow.Context) (int, error) { + counter := 0 + + if err := workflow.SetUpdateHandler(ctx, addUpdate, func(ctx workflow.Context, amount int) (updateAddOutput, error) { + counter += amount + _ = workflow.Sleep(ctx, time.Second) + return updateAddOutput{NewValue: counter}, nil + }); err != nil { + return 0, err + } + + workflow.GetSignalChannel(ctx, doneSignal).Receive(ctx, nil) + workflow.GetLogger(ctx).Info("finished workflow, exiting now...", "final counter", counter) + return counter, nil +} + +func getAddOperation(isAsync bool) (nexus.Operation[updateAddInput, updateAddOutput], error) { + updateStage := client.WorkflowUpdateStageCompleted + if isAsync { + updateStage = client.WorkflowUpdateStageAccepted + } + return temporalnexus.NewTemporalOperation(temporalnexus.TemporalOperationOptions[updateAddInput, updateAddOutput]{ + Name: addOperation, + Start: func(ctx context.Context, nc temporalnexus.NexusClient, input updateAddInput, _ temporalnexus.StartTemporalOperationOptions) (temporalnexus.TemporalOperationResult[updateAddOutput], error) { + return temporalnexus.StartUpdateWorkflow[updateAddOutput](ctx, nc, client.UpdateWorkflowOptions{ + WorkflowID: input.CounterID, + UpdateName: addUpdate, + Args: []any{input.Amount}, + WaitForStage: updateStage, + }) + }, + }) +} + +// caller workflow that does sync/async specific checks +func getCallerWorkflow( + tc *testContext, + addOp nexus.Operation[updateAddInput, updateAddOutput], + isAsync bool, +) func(workflow.Context, updateAddInput) (updateAddOutput, error) { + return func(ctx workflow.Context, input updateAddInput) (updateAddOutput, error) { + nc := workflow.NewNexusClient(tc.endpoint, serviceName) + fut := nc.ExecuteOperation(ctx, addOp, input, workflow.NexusOperationOptions{}) + + var exec workflow.NexusOperationExecution + if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil { + return updateAddOutput{}, err + } + + if isAsync { + if exec.OperationToken == "" { + return updateAddOutput{}, errors.New("expected a non-empty async operation token") + } + } else { + if exec.OperationToken != "" { + return updateAddOutput{}, errors.New("unexpected operation token on a sync operation") + } + } + + var out updateAddOutput + if err := fut.Get(ctx, &out); err != nil { + return updateAddOutput{}, err + } + + return out, nil + } +} From a5d2648238ac6fd607e5d9e2bc081b959e631ccb Mon Sep 17 00:00:00 2001 From: Maruthi ChandraSekhar Vemuri Date: Tue, 23 Jun 2026 15:43:53 -0700 Subject: [PATCH 2/3] Tidying go.mod for go.temporal.io/api@v1.62.14 --- contrib/aws/lambdaworker/go.mod | 5 +++-- contrib/aws/lambdaworker/go.sum | 6 ++++-- contrib/aws/lambdaworker/otel/go.mod | 5 +++-- contrib/aws/lambdaworker/otel/go.sum | 6 ++++-- contrib/aws/s3driver/awssdkv2/go.mod | 5 +++-- contrib/aws/s3driver/awssdkv2/go.sum | 6 ++++-- contrib/aws/s3driver/go.mod | 5 +++-- contrib/aws/s3driver/go.sum | 6 ++++-- contrib/datadog/go.mod | 7 +++---- contrib/datadog/go.sum | 6 ++++-- contrib/envconfig/go.mod | 5 +++-- contrib/envconfig/go.sum | 6 ++++-- contrib/opentelemetry/go.mod | 5 +++-- contrib/opentelemetry/go.sum | 6 ++++-- contrib/opentracing/go.mod | 5 +++-- contrib/opentracing/go.sum | 6 ++++-- contrib/sysinfo/go.mod | 5 +++-- contrib/sysinfo/go.sum | 6 ++++-- contrib/tally/go.mod | 5 +++-- contrib/tally/go.sum | 6 ++++-- contrib/tools/workflowcheck/go.mod | 1 - contrib/tools/workflowcheck/go.sum | 2 -- contrib/workflowstreams/go.mod | 5 +++-- contrib/workflowstreams/go.sum | 6 ++++-- 24 files changed, 77 insertions(+), 49 deletions(-) diff --git a/contrib/aws/lambdaworker/go.mod b/contrib/aws/lambdaworker/go.mod index 45d7bb1ed..55405e892 100644 --- a/contrib/aws/lambdaworker/go.mod +++ b/contrib/aws/lambdaworker/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/aws/lambdaworker -go 1.24.0 +go 1.25.4 require ( github.com/aws/aws-lambda-go v1.47.0 @@ -19,10 +19,11 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/aws/lambdaworker/go.sum b/contrib/aws/lambdaworker/go.sum index 097e6d774..1abda8bea 100644 --- a/contrib/aws/lambdaworker/go.sum +++ b/contrib/aws/lambdaworker/go.sum @@ -32,6 +32,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -59,8 +61,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/aws/lambdaworker/otel/go.mod b/contrib/aws/lambdaworker/otel/go.mod index 3c0f37c55..fc2311754 100644 --- a/contrib/aws/lambdaworker/otel/go.mod +++ b/contrib/aws/lambdaworker/otel/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/aws/lambdaworker/otel -go 1.25.0 +go 1.25.4 require ( go.opentelemetry.io/contrib/propagators/aws v1.42.0 @@ -25,6 +25,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect @@ -35,7 +36,7 @@ require ( go.opentelemetry.io/otel/metric v1.42.0 // indirect go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/net v0.51.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect diff --git a/contrib/aws/lambdaworker/otel/go.sum b/contrib/aws/lambdaworker/otel/go.sum index 7b6d3a3be..a18b2ccd1 100644 --- a/contrib/aws/lambdaworker/otel/go.sum +++ b/contrib/aws/lambdaworker/otel/go.sum @@ -31,6 +31,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -68,8 +70,8 @@ go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4Len go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/aws/s3driver/awssdkv2/go.mod b/contrib/aws/s3driver/awssdkv2/go.mod index 0709bd246..af18ca29e 100644 --- a/contrib/aws/s3driver/awssdkv2/go.mod +++ b/contrib/aws/s3driver/awssdkv2/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/aws/s3driver/awssdkv2 -go 1.24.0 +go 1.25.4 require ( github.com/aws/aws-sdk-go-v2 v1.41.5 @@ -8,7 +8,7 @@ require ( github.com/aws/smithy-go v1.24.2 github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.12 + go.temporal.io/api v1.62.14 go.temporal.io/sdk v1.25.1 go.temporal.io/sdk/contrib/aws/s3driver v0.0.0 ) @@ -25,6 +25,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d // indirect diff --git a/contrib/aws/s3driver/awssdkv2/go.sum b/contrib/aws/s3driver/awssdkv2/go.sum index f6e7df42a..5af7f9471 100644 --- a/contrib/aws/s3driver/awssdkv2/go.sum +++ b/contrib/aws/s3driver/awssdkv2/go.sum @@ -52,6 +52,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= @@ -80,8 +82,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d h1:Ns9kd1Rwzw7t0BR8XMphenji4SmIoNZPn8zhYmaVKP8= go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d/go.mod h1:92Uoe3l++MlthCm+koNi0tcUCX3anayogF0Pa/sp24k= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/aws/s3driver/go.mod b/contrib/aws/s3driver/go.mod index 59843f99a..082960a85 100644 --- a/contrib/aws/s3driver/go.mod +++ b/contrib/aws/s3driver/go.mod @@ -1,10 +1,10 @@ module go.temporal.io/sdk/contrib/aws/s3driver -go 1.24.0 +go 1.25.4 require ( github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.12 + go.temporal.io/api v1.62.14 go.temporal.io/sdk v1.25.1 golang.org/x/sync v0.19.0 google.golang.org/protobuf v1.36.11 @@ -14,6 +14,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/aws/s3driver/go.sum b/contrib/aws/s3driver/go.sum index 718123b4e..fb48af433 100644 --- a/contrib/aws/s3driver/go.sum +++ b/contrib/aws/s3driver/go.sum @@ -22,6 +22,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= @@ -42,8 +44,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index 3a00f8adb..f848ebd9c 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -1,8 +1,6 @@ module go.temporal.io/sdk/contrib/datadog -go 1.24.0 - -toolchain go1.24.5 +go 1.25.4 require ( github.com/DataDog/dd-trace-go/v2 v2.4.0 @@ -53,6 +51,7 @@ require ( github.com/minio/simdjson-go v0.4.5 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/outcaste-io/ristretto v0.2.3 // indirect github.com/philhofer/fwd v1.2.0 // indirect @@ -81,7 +80,7 @@ require ( go.opentelemetry.io/otel/metric v1.41.0 // indirect go.opentelemetry.io/otel/sdk v1.41.0 // indirect go.opentelemetry.io/otel/trace v1.41.0 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index eccce4945..47a547544 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -112,6 +112,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.133.0 h1:iPei+89a2EK4LuN4HeIRzZNE6XxCyrKfBKG3BkK/ViU= @@ -233,8 +235,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:T go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index bcd2c39b5..d0c685270 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/envconfig -go 1.24.0 +go 1.25.4 require ( github.com/BurntSushi/toml v1.4.0 @@ -16,11 +16,12 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index a0c3b0434..55cc39016 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -30,6 +30,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -57,8 +59,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index b0a2a7bd3..e4e2c5684 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/opentelemetry -go 1.24.0 +go 1.25.4 require ( github.com/stretchr/testify v1.11.1 @@ -15,6 +15,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect golang.org/x/sync v0.19.0 // indirect @@ -32,7 +33,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.41.0 go.opentelemetry.io/otel/sdk/metric v1.41.0 - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.33.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index 8f1bef189..be68426fc 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -29,6 +29,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -56,8 +58,8 @@ go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3Tri go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index 782d735a6..892bc7c45 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/opentracing -go 1.24.0 +go 1.25.4 require ( github.com/opentracing/opentracing-go v1.2.0 @@ -16,11 +16,12 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 2cd993de6..52830c2e4 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -29,6 +29,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= @@ -60,8 +62,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/sysinfo/go.mod b/contrib/sysinfo/go.mod index 4d3f6f277..53be02f2f 100644 --- a/contrib/sysinfo/go.mod +++ b/contrib/sysinfo/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/sysinfo -go 1.24.0 +go 1.25.4 require ( github.com/containerd/cgroups/v3 v3.0.3 @@ -22,6 +22,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -33,7 +34,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/contrib/sysinfo/go.sum b/contrib/sysinfo/go.sum index 9e34607ab..574436f89 100644 --- a/contrib/sysinfo/go.sum +++ b/contrib/sysinfo/go.sum @@ -44,6 +44,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= @@ -91,8 +93,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index d67a42369..ecefcdead 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/contrib/tally -go 1.24.0 +go 1.25.4 require ( github.com/stretchr/testify v1.10.0 @@ -16,12 +16,13 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.62.12 // indirect + go.temporal.io/api v1.62.14 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 39de47c38..45b1ffced 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -83,6 +83,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -139,8 +141,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/contrib/tools/workflowcheck/go.mod b/contrib/tools/workflowcheck/go.mod index c01a7ff07..3712b2a5b 100644 --- a/contrib/tools/workflowcheck/go.mod +++ b/contrib/tools/workflowcheck/go.mod @@ -11,7 +11,6 @@ require ( github.com/google/go-cmp v0.7.0 // indirect github.com/kr/pretty v0.1.0 // indirect github.com/kr/text v0.2.0 // indirect - go.temporal.io/api v1.62.11 // indirect golang.org/x/mod v0.30.0 // indirect golang.org/x/sync v0.19.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect diff --git a/contrib/tools/workflowcheck/go.sum b/contrib/tools/workflowcheck/go.sum index 1a5106624..29a097436 100644 --- a/contrib/tools/workflowcheck/go.sum +++ b/contrib/tools/workflowcheck/go.sum @@ -7,8 +7,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -go.temporal.io/api v1.62.11 h1:MWDaooDvOJCIRb1atqeZX2ErDPNTsNc3/mMEVEvvaVU= -go.temporal.io/api v1.62.11/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk= golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= diff --git a/contrib/workflowstreams/go.mod b/contrib/workflowstreams/go.mod index 249d4caa3..1534c7780 100644 --- a/contrib/workflowstreams/go.mod +++ b/contrib/workflowstreams/go.mod @@ -1,11 +1,11 @@ module go.temporal.io/sdk/contrib/workflowstreams -go 1.24.0 +go 1.25.4 require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 - go.temporal.io/api v1.62.12 + go.temporal.io/api v1.62.14 go.temporal.io/sdk v1.45.0 google.golang.org/protobuf v1.36.11 ) @@ -17,6 +17,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.1.0 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect diff --git a/contrib/workflowstreams/go.sum b/contrib/workflowstreams/go.sum index 831c156f2..089b333a7 100644 --- a/contrib/workflowstreams/go.sum +++ b/contrib/workflowstreams/go.sum @@ -28,6 +28,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0 h1:2fELd+9sqUtNu6Fg//pw8YFsxOvp8vZ8hfP0nHhNI80= +github.com/nexus-rpc/nexus-proto-annotations v0.1.0/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -55,8 +57,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= -go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.14 h1:Tree3eqoKRt5Vv+nvYHMPp/ROiGmSOTtFUD9d0w8yJE= +go.temporal.io/api v1.62.14/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= From f9672be0ca5ff0ea66a94e690ad8eae4b2a361fc Mon Sep 17 00:00:00 2001 From: Maruthi ChandraSekhar Vemuri Date: Thu, 25 Jun 2026 16:02:11 -0700 Subject: [PATCH 3/3] address feedback 1 --- internal/client.go | 27 ++- internal/interceptor.go | 6 +- internal/internal_update.go | 6 - internal/internal_workflow_client.go | 30 ++- internal/nexus_operations.go | 8 + temporalnexus/temporal_operation.go | 82 +++++-- temporalnexus/temporal_operation_test.go | 21 +- temporalnexus/token.go | 35 ++- temporalnexus/token_test.go | 6 +- test/nexus_update_op_test.go | 263 ++++++++++++++++++++--- 10 files changed, 409 insertions(+), 75 deletions(-) diff --git a/internal/client.go b/internal/client.go index f6f0a18cc..f7a707aa7 100644 --- a/internal/client.go +++ b/internal/client.go @@ -1858,19 +1858,40 @@ func SetLinksOnStartWorkflowOptions(opts *StartWorkflowOptions, links []*commonp // interface utility wrapper to allow setting links and callbacks // on temporal primitive operation options (UpdateWorkflowOptions, etc) // draft-review: set it on StartWorkflowOptions above as well -type nexusOperationOptions interface { +type nexusTemporalOperationOptions interface { + setRequestID(requestID string) setLinks(links []*commonpb.Link) setCallbacks(callbacks []*commonpb.Callback) } -func SetLinksOnNexusOperation(opts nexusOperationOptions, links []*commonpb.Link) { +// Set links on any [nexusOperationOptions] interface via the setLinks API. +// +// Intended to be used only internally as a consistent way of setting +// links on all Nexus Operations +func SetLinksOnNexusOperation(opts nexusTemporalOperationOptions, links []*commonpb.Link) { opts.setLinks(links) } -func SetCallbacksOnNexusOperation(opts nexusOperationOptions, callbacks []*commonpb.Callback) { +// Set callbacks on any [nexusOperationOptions] interface via the setCallbacks API. +// +// Intended to be used only internally as a consistent way of setting +// callbacks on all Nexus Operations +func SetCallbacksOnNexusOperation(opts nexusTemporalOperationOptions, callbacks []*commonpb.Callback) { opts.setCallbacks(callbacks) } +// Set non-empty requestID on any [nexusOperationOptions] interface via the setRequestID API. +// Used for deduping requests server-side +// +// Intended to be used only internally as a consistent way of setting +// requestIDs on all Nexus Operations +func SetRequestIDOnNexusOperation(opts nexusTemporalOperationOptions, requestID string) { + if requestID == "" { + return + } + opts.setRequestID(requestID) +} + // SetOnConflictOptionsOnStartWorkflowOptions is an internal only method for setting conflict // options on StartWorkflowOptions. // OnConflictOptions are purposefully not exposed to users for the time being. diff --git a/internal/interceptor.go b/internal/interceptor.go index 17e83bfcc..12c3ae311 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -510,9 +510,11 @@ type ClientUpdateWorkflowInput struct { // WaitForStage is the stage to wait for. WaitForStage WorkflowUpdateStage - // links. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + // request ID for server de-duplication. Only settable by the SDK - e.g. [temporalnexus.updateWorkflowOperation]. + requestID string + // links. Only settable by the SDK - e.g. [temporalnexus.updateWorkflowOperation]. links []*commonpb.Link - // for backward links from the target namespace sent via operation options. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + // callbacks. Only settable by the SDK - e.g. [temporalnexus.updateWorkflowOperation]. callbacks []*commonpb.Callback // gRPC request response trap for nexus forward links responseInfo *updateWorkflowResponseInfo diff --git a/internal/internal_update.go b/internal/internal_update.go index 0f29e8564..4020c4884 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -30,12 +30,6 @@ const ( WorkflowUpdateStageCompleted ) -// workflow is async iff update stage is WorkflowUpdateStageAccepted- -// as WorkflowUpdateStageCompleted blocks on UpdateWorkflow itself -func (w WorkflowUpdateStage) IsAsyncUpdateWorkflow() bool { - return w == WorkflowUpdateStageAccepted -} - const ( updateStateNew updateState = "New" updateStateRequestInitiated updateState = "RequestScheduled" diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 39c941fe1..8df8a1a96 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -995,9 +995,11 @@ type UpdateWorkflowOptions struct { // Note that it is incompatible with UpdateWithStartWorkflowOperation. FirstExecutionRunID string - // workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + // request ID for de-duplication during server processing. Only settable by the SDK - e.g. [temporalnexus.updateWorkflowOperation]. + requestID string + // callbacks. Only settable by the SDK - e.g. [temporalnexus.updateWorkflowOperation]. callbacks []*commonpb.Callback - // for backward links from the target namespace sent via operation options. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. + // for backward links from the target namespace sent via operation options. Only settable by the SDK - e.g. [temporalnexus.updateWorkflowOperation]. links []*commonpb.Link // gRPC request response trap for nexus forward links responseInfo *updateWorkflowResponseInfo @@ -1016,6 +1018,10 @@ func (u *UpdateWorkflowOptions) setCallbacks(callbacks []*commonpb.Callback) { u.callbacks = callbacks } +func (u *UpdateWorkflowOptions) setRequestID(requestID string) { + u.requestID = requestID +} + // UpdateWithStartWorkflowOptions encapsulates the parameters used by UpdateWithStartWorkflow. // See UpdateWithStartWorkflow and NewWithStartWorkflowOperation. type UpdateWithStartWorkflowOptions struct { @@ -1075,6 +1081,16 @@ type lazyUpdateHandle struct { client *WorkflowClient } +// IsUpdateWorkflowCompleted is a utility to detect if an operation has immediately +// completed. Used for Nexus operations that back into operations at a later stage +// in a non-retriable manner. Eg. UpdateWorkflow could fail at Accepted(failed validation) +// but its still Admitted and isnt captured in the rpc errors and keeps getting retried +// draft-review: is there a better way to do it instead? +func IsUpdateWorkflowCompleted(handle WorkflowUpdateHandle) bool { + _, ok := handle.(*completedUpdateHandle) + return ok +} + // QueryWorkflowWithOptionsRequest is the request to QueryWorkflowWithOptions type QueryWorkflowWithOptionsRequest struct { // WorkflowID is a required field indicating the workflow which should be queried. @@ -2888,7 +2904,7 @@ func createUpdateWorkflowInput(options *UpdateWorkflowOptions) (*ClientUpdateWor return nil, errors.New("WaitForStage WorkflowUpdateStageAdmitted is not supported") } - // draft-review: check if this could be valid for some edge cases + // draft-review: check if this could be valid for some edge cases if options.WaitForStage == WorkflowUpdateStageCompleted && len(options.callbacks) > 0 { return nil, errors.New("WaitForStage WorkflowUpdateStageCompleted does not support callbacks " + "as it is already a synchronous operation") @@ -2905,6 +2921,7 @@ func createUpdateWorkflowInput(options *UpdateWorkflowOptions) (*ClientUpdateWor links: options.links, callbacks: options.callbacks, responseInfo: options.responseInfo, + requestID: options.requestID, }, nil } @@ -2930,6 +2947,11 @@ func (w *workflowClientInterceptor) createUpdateWorkflowRequest( return nil, err } + if in.requestID == "" { + // for non-nexus op UpdateWorkflows, de-dup on updateID itself + in.requestID = in.UpdateID + } + return &workflowservice.UpdateWorkflowExecutionRequest{ WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: updateLifeCycleStageToProto(in.WaitForStage)}, Namespace: w.client.namespace, @@ -2939,7 +2961,7 @@ func (w *workflowClientInterceptor) createUpdateWorkflowRequest( }, FirstExecutionRunId: in.FirstExecutionRunID, Request: &updatepb.Request{ - RequestId: in.UpdateID, + RequestId: in.requestID, CompletionCallbacks: in.callbacks, Links: in.links, Meta: &updatepb.Meta{ diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index a372f2b3e..3c8d5d599 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -215,6 +215,14 @@ func NexusOperationContextFromGoContext(ctx context.Context) (nctx *NexusOperati return } +// ContextWithNexusOperationContext adds the [NexusOperationContext] into the given [context.Context] +func ContextWithNexusOperationContext(ctx context.Context, nctx *NexusOperationContext) context.Context { + if nctx == nil { + return ctx + } + return context.WithValue(ctx, nexusOperationContextKey, nctx) +} + // nexusMiddleware constructs an adapter from Temporal WorkerInterceptors to a Nexus MiddlewareFunc. func nexusMiddleware(interceptors []WorkerInterceptor) nexus.MiddlewareFunc { return func(ctx context.Context, next nexus.OperationHandler[any, any]) (nexus.OperationHandler[any, any], error) { diff --git a/temporalnexus/temporal_operation.go b/temporalnexus/temporal_operation.go index 4a1a93b7c..b08a9f0b1 100644 --- a/temporalnexus/temporal_operation.go +++ b/temporalnexus/temporal_operation.go @@ -6,6 +6,7 @@ import ( "strings" "sync/atomic" + "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/api/common/v1" "go.temporal.io/sdk/client" @@ -165,8 +166,26 @@ func StartUpdateWorkflow[R any]( nc NexusClient, updateWorkflowOptions client.UpdateWorkflowOptions, ) (TemporalOperationResult[R], error) { + if updateWorkflowOptions.UpdateID == "" { + updateWorkflowOptions.UpdateID = uuid.NewString() + } + + if err := validateUpdateWorkflowNexusOperation(updateWorkflowOptions); err != nil { + return TemporalOperationResult[R]{}, &nexus.OperationError{ + State: nexus.OperationStateFailed, + Message: err.Error(), + Cause: err, + } + } + + nctx, ok := internal.NexusOperationContextFromGoContext(ctx) + if !ok { + return TemporalOperationResult[R]{}, nexus.NewHandlerErrorf( + nexus.HandlerErrorTypeInternal, "internal error") + } + var encodedToken string - isAsyncUpdate := updateWorkflowOptions.WaitForStage.IsAsyncUpdateWorkflow() + isAsyncUpdate := updateWorkflowOptions.WaitForStage == client.WorkflowUpdateStageAccepted asyncOpFailed := true if isAsyncUpdate { if nc.startOperationOptions.CallbackURL == "" { @@ -177,16 +196,17 @@ func StartUpdateWorkflow[R any]( return TemporalOperationResult[R]{}, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeBadRequest, "only one async operation can be started per operation invocation") } - token, err := generateUpdateOperationToken() - if err != nil { - return TemporalOperationResult[R]{}, err - } - encodedToken = token defer func() { - if asyncOpFailed && nc.asyncStarted != nil { + if nc.asyncStarted != nil && asyncOpFailed { nc.asyncStarted.Store(false) } }() + token, err := generateUpdateOperationToken(nctx.Namespace, updateWorkflowOptions.WorkflowID, + updateWorkflowOptions.UpdateID) + if err != nil { + return TemporalOperationResult[R]{}, err + } + encodedToken = token } links, err := convertNexusLinks(nc.startOperationOptions.Links, GetLogger(ctx)) @@ -198,6 +218,7 @@ func StartUpdateWorkflow[R any]( } } internal.SetLinksOnNexusOperation(&updateWorkflowOptions, links) + internal.SetRequestIDOnNexusOperation(&updateWorkflowOptions, nc.startOperationOptions.RequestID) // unsure what this means, check in review // ctx = context.WithValue(ctx, internal.IsWorkflowRunOpContextKey, true) @@ -231,6 +252,18 @@ func StartUpdateWorkflow[R any]( return TemporalOperationResult[R]{}, err } + if internal.IsUpdateWorkflowCompleted(handle) { + // if workflow handle is completed and it has an error => its an unretriable error + // like validation failing on the update handler + if err := handle.Get(ctx, nil); err != nil { + return TemporalOperationResult[R]{}, &nexus.OperationError{ + State: nexus.OperationStateFailed, + Message: err.Error(), + Cause: err, + } + } + } + if responseInfo.Link == nil { return TemporalOperationResult[R]{}, &nexus.HandlerError{ Type: nexus.HandlerErrorTypeInternal, @@ -242,13 +275,34 @@ func StartUpdateWorkflow[R any]( if isAsyncUpdate { asyncOpFailed = false - return NewAsyncResult[R](encodedToken), nil - } else { - var result R - if err := handle.Get(ctx, &result); err != nil { - return TemporalOperationResult[R]{}, err + if !internal.IsUpdateWorkflowCompleted(handle) { + // if the update workflow handle is completed already, return a sync response even + // if its an async request. This is required for correctness on retried UpdateWorkflow + // rpc with same updateID. Also handles case where multiple update workflows with + // same updateID are pending- in that case, both get back same token as async result + return NewAsyncResult[R](encodedToken), nil } - return NewSyncResult(result), nil + } + var result R + if err := handle.Get(ctx, &result); err != nil { + return TemporalOperationResult[R]{}, err + } + return NewSyncResult(result), nil +} + +// validations to be performed specifically for UpdateWorkflow as a Nexus Operation. +// Acts as a no-retriable fast fail for cases where invalid configurations were +// submitted that would have (100%) failed later on required because there isnt a +// way for handler to otherwise tell that something has failed in a non-recoverable way. +// draft-review: should there be some kind of error from handler that can inform better +func validateUpdateWorkflowNexusOperation(u client.UpdateWorkflowOptions) error { + switch { + case u.WorkflowID == "": + return errors.New("workflow ID cannot be empty") + case u.WaitForStage <= client.WorkflowUpdateStageAdmitted: + return errors.New("update workflow wait for stage has to be Accepted or Completed") + default: + return nil } } @@ -398,7 +452,7 @@ func (o *temporalOperation[I, O]) Cancel(ctx context.Context, token string, opti }, options) case operationTokenTypeUpdateWorkflow: return &nexus.HandlerError{ - Type: nexus.HandlerErrorTypeBadRequest, + Type: nexus.HandlerErrorTypeNotImplemented, Cause: errors.New("cannot cancel an UpdateWorkflow operation"), } default: diff --git a/temporalnexus/temporal_operation_test.go b/temporalnexus/temporal_operation_test.go index 026501cd6..24d8cc58e 100644 --- a/temporalnexus/temporal_operation_test.go +++ b/temporalnexus/temporal_operation_test.go @@ -9,6 +9,7 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/require" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/internal" "go.temporal.io/sdk/workflow" ) @@ -161,7 +162,9 @@ func TestStartUpdateWorkflowGuards(t *testing.T) { } // attempt running async with an exhuasted nexusClient handler nc.asyncStarted.Store(true) - _, err := StartUpdateWorkflow[string](context.Background(), nc, client.UpdateWorkflowOptions{ + ctx := internal.ContextWithNexusOperationContext(context.Background(), &internal.NexusOperationContext{}) + _, err := StartUpdateWorkflow[string](ctx, nc, client.UpdateWorkflowOptions{ + WorkflowID: "emptyWorkflowID!", WaitForStage: client.WorkflowUpdateStageAccepted, }) var handlerErr *nexus.HandlerError @@ -170,17 +173,31 @@ func TestStartUpdateWorkflowGuards(t *testing.T) { require.ErrorContains(t, err, "only one async operation") // attempt to run async without callback _, err = StartUpdateWorkflow[string]( - context.Background(), + ctx, NexusClient{ asyncStarted: &atomic.Bool{}, }, client.UpdateWorkflowOptions{ + WorkflowID: "anotherEmptyWorkflowID!", WaitForStage: client.WorkflowUpdateStageAccepted, }, ) require.ErrorAs(t, err, &handlerErr) require.Equal(t, nexus.HandlerErrorTypeBadRequest, handlerErr.Type) require.ErrorContains(t, err, "callback URL required") + // attempt running a bad update - no workflowID + _, err = StartUpdateWorkflow[string]( + ctx, + NexusClient{ + asyncStarted: &atomic.Bool{}, + }, + client.UpdateWorkflowOptions{ + WorkflowID: "dontTriggerValidation", + }, + ) + var opError *nexus.OperationError + require.ErrorAs(t, err, &opError) + require.Equal(t, opError.Message, "update workflow wait for stage has to be Accepted or Completed") } func strPtr(s string) *string { diff --git a/temporalnexus/token.go b/temporalnexus/token.go index aad6a16c1..34b88f4ed 100644 --- a/temporalnexus/token.go +++ b/temporalnexus/token.go @@ -12,12 +12,16 @@ type operationTokenType int const ( operationTokenTypeReserved operationTokenType = iota operationTokenTypeWorkflowRun + operationTokenTypeActivity operationTokenTypeUpdateWorkflow // also Update With Start, Get Workflow Result, Stand Alone Activities operationTokenTypeMaxVal ) func (o operationTokenType) IsValid() bool { + if o == operationTokenTypeActivity { // temporary until activity is added + return false + } return 0 < o && o < operationTokenTypeMaxVal } @@ -27,27 +31,35 @@ type operationToken struct { // it's only used to reject newer token versions on load. Version int `json:"v,omitempty"` // Type of the operation. - Type operationTokenType `json:"t"` + Type operationTokenType `json:"t"` + NamespaceName string `json:"ns"` } // workflowRunOperationToken is the decoded form of the workflow run operation token. type workflowRunOperationToken struct { operationToken - NamespaceName string `json:"ns"` - WorkflowID string `json:"wid"` + WorkflowID string `json:"wid"` } // updateWorkflow contains only meta - because it cannot be cancelled type updateWorkflowOperationToken struct { operationToken + WorkflowID string `json:"wid"` + UpdateID string `json:"uid"` +} + +func generateOperationToken(opType operationTokenType, namespace string) operationToken { + return operationToken{ + Type: opType, + NamespaceName: namespace, + } } func generateWorkflowRunOperationToken(namespace, workflowID string) (string, error) { token := workflowRunOperationToken{ - NamespaceName: namespace, - WorkflowID: workflowID, + WorkflowID: workflowID, } - token.Type = operationTokenTypeWorkflowRun + token.operationToken = generateOperationToken(operationTokenTypeWorkflowRun, namespace) data, err := json.Marshal(token) if err != nil { return "", fmt.Errorf("failed to marshal workflow run operation token: %w", err) @@ -55,9 +67,16 @@ func generateWorkflowRunOperationToken(namespace, workflowID string) (string, er return base64EncodedString(data), nil } -func generateUpdateOperationToken() (string, error) { - token := updateWorkflowOperationToken{} +func generateUpdateOperationToken(namespace, workflowID, updateID string) (string, error) { + if namespace == "" || workflowID == "" || updateID == "" { + return "", fmt.Errorf("missing required param[s]: ns %s, wid: %s, uid: %s", namespace, workflowID, updateID) + } + token := updateWorkflowOperationToken{ + WorkflowID: workflowID, + UpdateID: updateID, + } token.Type = operationTokenTypeUpdateWorkflow + token.operationToken = generateOperationToken(operationTokenTypeUpdateWorkflow, namespace) data, err := json.Marshal(token) if err != nil { return "", fmt.Errorf("failed to marshal operation token: %w", err) diff --git a/temporalnexus/token_test.go b/temporalnexus/token_test.go index 67157a7b6..76db14908 100644 --- a/temporalnexus/token_test.go +++ b/temporalnexus/token_test.go @@ -11,10 +11,10 @@ import ( func TestEncodeDecodeWorkflowRunOperationToken(t *testing.T) { wrt := workflowRunOperationToken{ operationToken: operationToken{ - Type: operationTokenTypeWorkflowRun, + Type: operationTokenTypeWorkflowRun, + NamespaceName: "ns", }, - NamespaceName: "ns", - WorkflowID: "w", + WorkflowID: "w", } token, err := generateWorkflowRunOperationToken("ns", "w") require.NoError(t, err) diff --git a/test/nexus_update_op_test.go b/test/nexus_update_op_test.go index f241bafa3..833c334d5 100644 --- a/test/nexus_update_op_test.go +++ b/test/nexus_update_op_test.go @@ -3,25 +3,36 @@ package test_test import ( "context" "errors" + "slices" + "sync" "testing" "time" "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/require" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporalnexus" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) +var ( + invalidIncrementError = errors.New("invalid increment") +) + type updateAddInput struct { - CounterID string - Amount int + UpdateID string + WorkflowID string + Amount int + InvocationCount int // hint to the caller workflow that the async token will not be set if != 0 (a retried completed update is sync) } type updateAddOutput struct { - NewValue int + Count int } const ( @@ -45,7 +56,22 @@ func testUpdateWorkflowOperation(t *testing.T, isAsync bool) { defer cancel() tc := newTestContext(t, ctx) - addOp, err := getAddOperation(isAsync) + updateStage := client.WorkflowUpdateStageCompleted + if isAsync { + updateStage = client.WorkflowUpdateStageAccepted + } + addOp, err := temporalnexus.NewTemporalOperation(temporalnexus.TemporalOperationOptions[updateAddInput, updateAddOutput]{ + Name: addOperation, + Start: func(ctx context.Context, nc temporalnexus.NexusClient, input updateAddInput, opts temporalnexus.StartTemporalOperationOptions) (temporalnexus.TemporalOperationResult[updateAddOutput], error) { + return temporalnexus.StartUpdateWorkflow[updateAddOutput](ctx, nc, client.UpdateWorkflowOptions{ + WorkflowID: input.WorkflowID, + UpdateID: input.UpdateID, + UpdateName: addUpdate, + Args: []any{input.Amount}, + WaitForStage: updateStage, + }) + }, + }) require.NoError(t, err) callerWorkflow := getCallerWorkflow(tc, addOp, isAsync) @@ -61,37 +87,157 @@ func testUpdateWorkflowOperation(t *testing.T, isAsync bool) { defer w.Stop() counterWorkflowID := "counter-" + uuid.NewString() - counterRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + counterWorkflowRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ ID: counterWorkflowID, TaskQueue: tc.taskQueue, }, counterWorkflow) require.NoError(t, err) - defer func() { + stopCounterWorkflow := func() { require.NoError(t, tc.client.SignalWorkflow(ctx, counterWorkflowID, "", doneSignal, nil)) - require.NoError(t, counterRun.Get(ctx, nil)) - }() + require.NoError(t, counterWorkflowRun.Get(ctx, nil)) + } + + // fails validation, shouldnt be retried + invalidUpdateWorkflowRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "caller-" + uuid.NewString(), + TaskQueue: tc.taskQueue, + WorkflowTaskTimeout: time.Second, + }, callerWorkflow, updateAddInput{WorkflowID: counterWorkflowID, Amount: 6}) + require.NoError(t, err, invalidIncrementError) + require.ErrorContains(t, invalidUpdateWorkflowRun.Get(ctx, nil), invalidIncrementError.Error()) + + updateID := "consistentID-" + uuid.NewString() counterUpdateWorkflowRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ ID: "caller-" + uuid.NewString(), TaskQueue: tc.taskQueue, WorkflowTaskTimeout: time.Second, - }, callerWorkflow, updateAddInput{CounterID: counterWorkflowID, Amount: 5}) + }, callerWorkflow, updateAddInput{WorkflowID: counterWorkflowID, Amount: 5, UpdateID: updateID}) require.NoError(t, err) var out updateAddOutput require.NoError(t, counterUpdateWorkflowRun.Get(ctx, &out)) - require.Equal(t, 5, out.NewValue) + require.Equal(t, 5, out.Count) + + eventsFilter := func(e *history.HistoryEvent) bool { + filterOpTypes := []enums.EventType{ + enums.EVENT_TYPE_NEXUS_OPERATION_STARTED, // for forward events links on async + enums.EVENT_TYPE_NEXUS_OPERATION_COMPLETED, // for forward events links on sync + enums.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, // for backward events links on handler + } + return slices.Contains(filterOpTypes, e.EventType) + } + callerRequestID, err := getNexusOpRequestID(ctx, tc.client, counterUpdateWorkflowRun) + require.NoError(t, err) + // from caller ns to target ns + forwardLink := &common.Link{Variant: &common.Link_WorkflowEvent_{WorkflowEvent: &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: counterWorkflowID, + RunId: counterWorkflowRun.GetRunID(), + Reference: &common.Link_WorkflowEvent_RequestIdRef{RequestIdRef: &common.Link_WorkflowEvent_RequestIdReference{ + RequestId: callerRequestID, + EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, + }}, + }}} + // from target ns back to caller ns + backwardLink := &common.Link{Variant: &common.Link_WorkflowEvent_{WorkflowEvent: &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: counterUpdateWorkflowRun.GetID(), + RunId: counterUpdateWorkflowRun.GetRunID(), + Reference: &common.Link_WorkflowEvent_EventRef{EventRef: &common.Link_WorkflowEvent_EventReference{ + EventId: getEventIDByType(ctx, tc.client, counterUpdateWorkflowRun, enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED), + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }}, + }}} + counterWorkflowLinks := getEventLinks(ctx, tc.client, counterWorkflowRun, eventsFilter) + callerWorkflowLinks := getEventLinks(ctx, tc.client, counterUpdateWorkflowRun, eventsFilter) + // draft-review: check why the links arent promoted on a sync request - seems server related + if isAsync { + require.True(t, checkForLink(counterWorkflowLinks, backwardLink)) + } + require.True(t, checkForLink(callerWorkflowLinks, forwardLink)) + + counterUpdateWorkflowRun, err = tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "caller-" + uuid.NewString(), + TaskQueue: tc.taskQueue, + WorkflowTaskTimeout: time.Second, + }, callerWorkflow, updateAddInput{WorkflowID: counterWorkflowID, Amount: 5, UpdateID: updateID, InvocationCount: 1}) + require.NoError(t, err) + + require.NoError(t, counterUpdateWorkflowRun.Get(ctx, &out)) + require.Equal(t, 5, out.Count) // shouldnt increment again as its same updateID + + if isAsync { + // simulate multiple updates with same ID hitting at same time + // the counter sleeps for second so all of them should get + // an async result- dont set the invocationID for that reason + gate := make(chan struct{}) + wg := sync.WaitGroup{} + parallelUpdateID := "consistent-" + uuid.NewString() + numParallelUpdates := 3 + wg.Add(numParallelUpdates) + errs := make([]error, numParallelUpdates) + vals := make([]updateAddOutput, numParallelUpdates) + for i := range numParallelUpdates { + go func(i int) { + defer wg.Done() + <-gate + run, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "parallel-caller-" + uuid.NewString(), + TaskQueue: tc.taskQueue, + WorkflowTaskTimeout: time.Second, + }, callerWorkflow, updateAddInput{WorkflowID: counterWorkflowID, Amount: 5, UpdateID: parallelUpdateID}) + if err != nil { + errs[i] = err + return + } + errs[i] = run.Get(ctx, &vals[i]) + }(i) + } + close(gate) + wg.Wait() + for i := range numParallelUpdates { + require.NoError(t, errs[i]) + require.Equal(t, 10, vals[i].Count) + } + } + + stopCounterWorkflow() + // updates on finished workflows should fail + counterUpdateWorkflowRun, err = tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "failing-caller-" + uuid.NewString(), + TaskQueue: tc.taskQueue, + WorkflowTaskTimeout: time.Second, + }, callerWorkflow, updateAddInput{WorkflowID: counterWorkflowID, Amount: 5}) + require.NoError(t, err) + require.Error(t, counterUpdateWorkflowRun.Get(ctx, nil)) } func counterWorkflow(ctx workflow.Context) (int, error) { counter := 0 - if err := workflow.SetUpdateHandler(ctx, addUpdate, func(ctx workflow.Context, amount int) (updateAddOutput, error) { + updateHandler := func(ctx workflow.Context, amount int) (updateAddOutput, error) { counter += amount _ = workflow.Sleep(ctx, time.Second) - return updateAddOutput{NewValue: counter}, nil - }); err != nil { + return updateAddOutput{Count: counter}, nil + } + + // used for testing invalid updates + updateValidator := func(ctx workflow.Context, amount int) error { + if amount%5 != 0 { + return invalidIncrementError + } + return nil + } + + if err := workflow.SetUpdateHandlerWithOptions(ctx, + addUpdate, + updateHandler, + workflow.UpdateHandlerOptions{ + Validator: updateValidator, + }, + ); err != nil { return 0, err } @@ -100,24 +246,6 @@ func counterWorkflow(ctx workflow.Context) (int, error) { return counter, nil } -func getAddOperation(isAsync bool) (nexus.Operation[updateAddInput, updateAddOutput], error) { - updateStage := client.WorkflowUpdateStageCompleted - if isAsync { - updateStage = client.WorkflowUpdateStageAccepted - } - return temporalnexus.NewTemporalOperation(temporalnexus.TemporalOperationOptions[updateAddInput, updateAddOutput]{ - Name: addOperation, - Start: func(ctx context.Context, nc temporalnexus.NexusClient, input updateAddInput, _ temporalnexus.StartTemporalOperationOptions) (temporalnexus.TemporalOperationResult[updateAddOutput], error) { - return temporalnexus.StartUpdateWorkflow[updateAddOutput](ctx, nc, client.UpdateWorkflowOptions{ - WorkflowID: input.CounterID, - UpdateName: addUpdate, - Args: []any{input.Amount}, - WaitForStage: updateStage, - }) - }, - }) -} - // caller workflow that does sync/async specific checks func getCallerWorkflow( tc *testContext, @@ -126,14 +254,16 @@ func getCallerWorkflow( ) func(workflow.Context, updateAddInput) (updateAddOutput, error) { return func(ctx workflow.Context, input updateAddInput) (updateAddOutput, error) { nc := workflow.NewNexusClient(tc.endpoint, serviceName) - fut := nc.ExecuteOperation(ctx, addOp, input, workflow.NexusOperationOptions{}) + fut := nc.ExecuteOperation(ctx, addOp, input, workflow.NexusOperationOptions{ + Summary: "TODO", + }) var exec workflow.NexusOperationExecution if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil { return updateAddOutput{}, err } - if isAsync { + if isAsync && input.InvocationCount == 0 { if exec.OperationToken == "" { return updateAddOutput{}, errors.New("expected a non-empty async operation token") } @@ -151,3 +281,70 @@ func getCallerWorkflow( return out, nil } } + +func checkForLink(links []*common.Link, requiredLink *common.Link) bool { + for _, link := range links { + if link.Equal(requiredLink) { + return true + } + } + return false +} + +// draft-review: check if better filter exists of if this filter func can be added to GetWorkflowHistory itself +func getEventLinks(ctx context.Context, + c client.Client, workflowRun client.WorkflowRun, + filter func(e *history.HistoryEvent) bool, +) []*common.Link { + events := getEvents(ctx, c, workflowRun, filter) + eventLinks := make([]*common.Link, 0, len(events)) + for _, event := range events { + eventLinks = append(eventLinks, event.GetLinks()...) + } + return eventLinks +} + +// draft-review: check if theres another way +func getEventIDByType(ctx context.Context, c client.Client, workflowRun client.WorkflowRun, eventType enums.EventType) int64 { + events := getEvents(ctx, c, workflowRun, func(e *history.HistoryEvent) bool { + return e.GetEventType() == eventType + }) + if len(events) == 0 { + return -1 + } + return events[0].EventId +} + +// get the request ID of the singular nexus op in this workflow run +func getNexusOpRequestID(ctx context.Context, c client.Client, workflowRun client.WorkflowRun) (string, error) { + events := getEvents(ctx, c, workflowRun, func(e *history.HistoryEvent) bool { + return e.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED + }) + if len(events) == 0 { + return "", errors.New("no nexus op event found") + } + if len(events) > 1 { + return "", errors.New("multiple nexus ops events found, cannot determine specific requestID") + } + return events[0].GetNexusOperationScheduledEventAttributes().RequestId, nil +} + +func getEvents(ctx context.Context, + c client.Client, workflowRun client.WorkflowRun, + filter func(e *history.HistoryEvent) bool, +) []*history.HistoryEvent { + iter := c.GetWorkflowHistory(ctx, + workflowRun.GetID(), workflowRun.GetRunID(), + false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + events := []*history.HistoryEvent{} + for iter.HasNext() { + event, err := iter.Next() + if err != nil { + continue + } + if filter(event) { + events = append(events, event) + } + } + return events +}