NEXUS-474: Support UpdateWorkflow as a Nexus Operation#2417
Conversation
|
|
7b25ad4 to
a5d2648
Compare
| @@ -1,6 +1,6 @@ | |||
| module go.temporal.io/sdk/contrib/aws/lambdaworker | |||
|
|
|||
| go 1.24.0 | |||
There was a problem hiding this comment.
I'd avoid bumping the required Go Version in this PR since this declares the min. Go Version our users need.
There was a problem hiding this comment.
version bump seems to be from here - temporalio/api-go@d1423d2 (in-turn here)- went in v1.62.13+
We could maybe fix it all the way back but not sure as there might be others already using it- is there another way? Or should we flag this for further discussion?
There was a problem hiding this comment.
AH okay it came from the API bump I understand, bumping to 1.25.x is fine then since it is within the min Go Version we support
| setCallbacks(callbacks []*commonpb.Callback) | ||
| } | ||
|
|
||
| func SetLinksOnNexusOperation(opts nexusOperationOptions, links []*commonpb.Link) { |
|
|
||
| // workflow is async iff update stage is WorkflowUpdateStageAccepted- | ||
| // as WorkflowUpdateStageCompleted blocks on UpdateWorkflow itself | ||
| func (w WorkflowUpdateStage) IsAsyncUpdateWorkflow() bool { |
There was a problem hiding this comment.
WorkflowUpdateStage is a type we expose to users, don't think we want to expose this helper to users so I would at least make it private or inline it. I would personally lean towards inlining it since it is so simple
| WorkflowID: wfToken.WorkflowID, | ||
| }, options) | ||
| case operationTokenTypeUpdateWorkflow: | ||
| return &nexus.HandlerError{ |
There was a problem hiding this comment.
The default may be to fail, but we should still allow users to customize the cancellation behaviour, so here we should call o.options.CancelWorkflowUpdate and then the default value for that lambda would be to error
There was a problem hiding this comment.
Could it be confusing for users to trigger a cancel UpdateWorkflow on async? I was thinking that because there isnt a way currently to cancel an UpdateWorkflow, this also remains consistent with that - please correct/add if needed
maybe for another discussion- but could there be some compensating ops/events for such events?
There was a problem hiding this comment.
I was thinking that because there isnt a way currently to cancel an UpdateWorkflow
There isn't a built in way to cancel an UpdateWorkflow yes, but a user could build their own way to cancel a WorkflowUpdate
| }, options) | ||
| case operationTokenTypeUpdateWorkflow: | ||
| return &nexus.HandlerError{ | ||
| Type: nexus.HandlerErrorTypeBadRequest, |
There was a problem hiding this comment.
I think nexus.HandlerErrorTypeNotImplemented fits this situation better, what do you think?
| operationTokenTypeWorkflowRun = operationTokenType(1) | ||
| operationTokenTypeReserved operationTokenType = iota | ||
| operationTokenTypeWorkflowRun | ||
| operationTokenTypeUpdateWorkflow |
There was a problem hiding this comment.
operationTokenTypeUpdateWorkflow should be 3 per https://app.notion.com/p/temporalio/Nexus-Operation-Token-Format-3738fc56773880d483dcd02931d6f90a?source=copy_link
|
|
||
| // updateWorkflow contains only meta - because it cannot be cancelled | ||
| type updateWorkflowOperationToken struct { | ||
| operationToken |
There was a problem hiding this comment.
Missing Workflow ID and Update ID , the expected token format is defined here https://app.notion.com/p/temporalio/Nexus-Operation-Token-Format-3738fc56773880d483dcd02931d6f90a?source=copy_link
| } | ||
|
|
||
| func getAddOperation(isAsync bool) (nexus.Operation[updateAddInput, updateAddOutput], error) { | ||
| updateStage := client.WorkflowUpdateStageCompleted |
There was a problem hiding this comment.
I think for now we should only support the client.WorkflowUpdateStageAccepted , client.WorkflowUpdateStageCompleted is a big footgun for users since it required the entire update to complete in under 10s
There was a problem hiding this comment.
Was thinking it should be 1:1 with primitives because there could be edge use-cases - like a best effort update that still uses the validation logic and has the provenance in caller etc so they dont use signal nexus op
Eg pickup-driver request matching service that needs to be done in 10s or it backoff/retries with a different payload - validation may not be enough as it checks some other subsystem(nexus op chaining) later on etc
There was a problem hiding this comment.
So I don't think we want to be 1:1 with the primitive in the Nexus Handler because the Nexus handler has certain limits that normal callers do not have. A nexus handler only has 10s to run so if a user combines client.WorkflowUpdateStageCompleted with a long running workflow update then they could get themselves into a bad state where their handler just keeps timing out over and over. From a performance perspective I am not sure there is any value of supporting client.WorkflowUpdateStageCompleted for Nexus since the operation will always wait for the operation to complete
| } | ||
|
|
||
| // run both via "go run . integration-test -dev-server -run UpdateWorkflowOperation" | ||
| func testUpdateWorkflowOperation(t *testing.T, isAsync bool) { |
There was a problem hiding this comment.
Please also assert the forward and backlinks are present
| } | ||
|
|
||
| // run both via "go run . integration-test -dev-server -run UpdateWorkflowOperation" | ||
| func testUpdateWorkflowOperation(t *testing.T, isAsync bool) { |
There was a problem hiding this comment.
I would test a few other scenarios to make sure the SDK handles these scenarios logically:
- Test what happens if the user tried an invalid update handler (ie not registered)
- Test what happens if the update handler has a validator that rejects the update
- Test what happens if the update handler returns an error
- Test what happens if the update handler is sync (ie it returns immediately and doesn't wait)
- Test what happens if Nexus operation targets the same, runing, workflow update (same workflow ID same update ID)
- Test what happens if a Nexus operation targets a already completed workflow update
- Test what happens if the user targets a invalid workflow (ie workflow already completed)
Let me know if you want to talk through any of these scenarios with me or what the expected behaviour should be
| if internal.IsUpdateWorkflowCompleted(handle) { | ||
| // if workflow handle is completed and it has an error => its an unretriable error | ||
| // like validation failing on the update handler | ||
| if err := handle.Get(ctx, nil); err != nil { |
There was a problem hiding this comment.
later we do
if err := handle.Get(ctx, &result); err != nil {
return TemporalOperationResult[R]{}, err
}
I would unify these two code paths, if the handle is complete we can just return the result or error
There was a problem hiding this comment.
We may need to move the link code though
| // submitted that would have (100%) failed later on required because there isnt a | ||
| // way for handler to otherwise tell that something has failed in a non-recoverable way. | ||
| // draft-review: should there be some kind of error from handler that can inform better | ||
| func validateUpdateWorkflowNexusOperation(u client.UpdateWorkflowOptions) error { |
There was a problem hiding this comment.
The SDK should already do this validation when you call client.UpdateWorkflow
| } | ||
|
|
||
| // fails validation, shouldnt be retried | ||
| invalidUpdateWorkflowRun, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ |
There was a problem hiding this comment.
Can you break these up into different sub tests
| @@ -0,0 +1,350 @@ | |||
| package test_test | |||
There was a problem hiding this comment.
The convention would be to make these part of IntegrationTestSuite
|
|
||
| updateHandler := func(ctx workflow.Context, amount int) (updateAddOutput, error) { | ||
| counter += amount | ||
| _ = workflow.Sleep(ctx, time.Second) |
There was a problem hiding this comment.
A couple this we are missing is if this sleep is removed the update handler should return immediately with the result , we should also test if an error is returned here (both immediately and after sleeping)
| }}} | ||
| counterWorkflowLinks := getEventLinks(ctx, tc.client, counterWorkflowRun, eventsFilter) | ||
| callerWorkflowLinks := getEventLinks(ctx, tc.client, counterUpdateWorkflowRun, eventsFilter) | ||
| // draft-review: check why the links arent promoted on a sync request - seems server related |
There was a problem hiding this comment.
I think you need to update the link handling code temporalnexus/link_converter.go to handle the type of links you get from sync updates. Specifically Workflow links temporalio/api@6487d66#diff-de60adc512f72d87dc1e38c0f30b41a16dede9243dfaf6209b4af9b5447fa551R260
What was changed
Adds support for UpdateWorkflow as a Nexus operation
Why?
Part of effort to expose all Temporal primitives via Nexus operations
Checklist
Closes
How was this tested:
go run . integration-test -dev-server -run UpdateWorkflowOperation