Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions internal/internal_activity_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,28 @@ type (
Priority Priority
// StartDelay - Time to wait before dispatching the activity. This delay is not applied to retry attempts.
StartDelay time.Duration

// responseInfo holds the response Link populated by the server when the activity is started.
// Only settable by the SDK - e.g. [temporalnexus.temporalOperation].
responseInfo *startActivityResponseInfo
// requestID is the request ID used to dedup retried starts.
// Only settable by the SDK - e.g. [temporalnexus.temporalOperation].
requestID string
// callbacks is the set of completion callbacks the server should invoke when the activity
// reaches a terminal state. Only settable by the SDK - e.g. [temporalnexus.temporalOperation].
callbacks []*commonpb.Callback
// links to be associated with the activity. Only settable by the SDK - e.g. [temporalnexus.temporalOperation].
links []*commonpb.Link
// onConflictOptions configures behavior when ActivityIdConflictPolicy is USE_EXISTING and an
// activity with the same ID is already running. Only settable by the SDK - e.g. [temporalnexus.temporalOperation].
onConflictOptions *OnConflictOptions
}

// startActivityResponseInfo can be populated by the SDK to receive additional fields from the
// StartActivityExecution response. Only settable by the SDK.
startActivityResponseInfo struct {
// Link to the started activity event.
Link *commonpb.Link
}

// ClientGetActivityHandleOptions contains input for GetActivityHandle call.
Expand Down Expand Up @@ -610,6 +632,9 @@ func (w *workflowClientInterceptor) ExecuteActivity(
} else {
runID = resp.RunId
}
if in.Options.responseInfo != nil {
in.Options.responseInfo.Link = resp.Link
}

return &clientActivityHandleImpl{
client: w.client,
Expand Down Expand Up @@ -656,9 +681,70 @@ func (options *ClientStartActivityOptions) validateAndSetInRequest(request *work
request.UserMetadata = userMetadata
request.Priority = convertToPBPriority(options.Priority)
request.StartDelay = durationpb.New(options.StartDelay)
if options.requestID != "" {
request.RequestId = options.requestID
}
request.CompletionCallbacks = options.callbacks
request.Links = options.links
if options.onConflictOptions != nil {
request.OnConflictOptions = &commonpb.OnConflictOptions{
AttachRequestId: options.onConflictOptions.AttachRequestID,
AttachCompletionCallbacks: options.onConflictOptions.AttachCompletionCallbacks,
AttachLinks: options.onConflictOptions.AttachLinks,
}
}
return nil
}

// SetRequestIDOnStartActivityOptions is an internal-only method for setting the request ID on
// ClientStartActivityOptions. Used by [temporalnexus.temporalOperation] for retry idempotency.
func SetRequestIDOnStartActivityOptions(opts *ClientStartActivityOptions, requestID string) {
opts.requestID = requestID
}

// SetCallbacksOnStartActivityOptions is an internal-only method for setting completion callbacks on
// ClientStartActivityOptions. Callbacks are purposefully not exposed to users for the time being.
func SetCallbacksOnStartActivityOptions(opts *ClientStartActivityOptions, callbacks []*commonpb.Callback) {
opts.callbacks = callbacks
}

// SetLinksOnStartActivityOptions is an internal-only method for setting links on
// ClientStartActivityOptions. Links are purposefully not exposed to users for the time being.
func SetLinksOnStartActivityOptions(opts *ClientStartActivityOptions, links []*commonpb.Link) {
opts.links = links
}

// SetOnConflictOptionsOnStartActivityOptions is an internal-only method for setting on-conflict
// options on ClientStartActivityOptions. Used to ensure that when an activity with the same ID is
// already running and the conflict policy is USE_EXISTING, the caller's request ID, callback, and
// links are attached to the existing activity.
func SetOnConflictOptionsOnStartActivityOptions(opts *ClientStartActivityOptions) {
opts.onConflictOptions = &OnConflictOptions{
AttachRequestID: true,
AttachCompletionCallbacks: true,
AttachLinks: true,
}
}

// SetResponseInfoOnStartActivityOptions is an internal-only method for setting a response info
// pointer on ClientStartActivityOptions. The returned pointer is populated by ExecuteActivity with
// the start response's Link.
func SetResponseInfoOnStartActivityOptions(opts *ClientStartActivityOptions) *startActivityResponseInfo {
if opts.responseInfo == nil {
opts.responseInfo = &startActivityResponseInfo{}
}
return opts.responseInfo
}

// GetResponseLinkFromStartActivityResponseInfo returns the activity start Link captured from the
// server response, or nil if none was captured.
func GetResponseLinkFromStartActivityResponseInfo(info *startActivityResponseInfo) *commonpb.Link {
if info == nil {
return nil
}
return info.Link
}

func (w *workflowClientInterceptor) GetActivityHandle(
in *ClientGetActivityHandleInput,
) ClientActivityHandle {
Expand Down
74 changes: 69 additions & 5 deletions temporalnexus/link_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (
)

const (
urlSchemeTemporalKey = "temporal"
urlPathNamespaceKey = "namespace"
urlPathWorkflowIDKey = "workflowID"
urlPathRunIDKey = "runID"
urlPathTemplate = "/namespaces/%s/workflows/%s/%s/history"
urlSchemeTemporalKey = "temporal"
urlPathNamespaceKey = "namespace"
urlPathWorkflowIDKey = "workflowID"
urlPathRunIDKey = "runID"
urlPathActivityIDKey = "activityID"
urlPathTemplate = "/namespaces/%s/workflows/%s/%s/history"
urlPathActivityTemplate = "/namespaces/%s/activities/%s/%s/details"

linkWorkflowEventReferenceTypeKey = "referenceType"
linkEventIDKey = "eventID"
Expand All @@ -32,12 +34,19 @@ var (
rePatternNamespace = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathNamespaceKey)
rePatternWorkflowID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathWorkflowIDKey)
rePatternRunID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathRunIDKey)
rePatternActivityID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathActivityIDKey)
urlPathRE = regexp.MustCompile(fmt.Sprintf(
`^/namespaces/%s/workflows/%s/%s/history$`,
rePatternNamespace,
rePatternWorkflowID,
rePatternRunID,
))
urlActivityPathRE = regexp.MustCompile(fmt.Sprintf(
`^/namespaces/%s/activities/%s/%s/details$`,
rePatternNamespace,
rePatternActivityID,
rePatternRunID,
))
eventReferenceType = string((&commonpb.Link_WorkflowEvent_EventReference{}).ProtoReflect().Descriptor().Name())
requestIDReferenceType = string((&commonpb.Link_WorkflowEvent_RequestIdReference{}).ProtoReflect().Descriptor().Name())
)
Expand Down Expand Up @@ -137,6 +146,61 @@ func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_Workfl
return we, nil
}

// ConvertLinkActivityToNexusLink converts a Link_Activity type to a Nexus Link.
//
// NOTE: Experimental
func ConvertLinkActivityToNexusLink(a *commonpb.Link_Activity) nexus.Link {
u := &url.URL{
Scheme: urlSchemeTemporalKey,
Path: fmt.Sprintf(urlPathActivityTemplate, a.GetNamespace(), a.GetActivityId(), a.GetRunId()),
RawPath: fmt.Sprintf(
urlPathActivityTemplate,
url.PathEscape(a.GetNamespace()),
url.PathEscape(a.GetActivityId()),
url.PathEscape(a.GetRunId()),
),
}
return nexus.Link{
URL: u,
Type: string(a.ProtoReflect().Descriptor().FullName()),
}
}

// ConvertNexusLinkToLinkActivity converts a Nexus Link back to a Link_Activity.
//
// NOTE: Experimental
func ConvertNexusLinkToLinkActivity(link nexus.Link) (*commonpb.Link_Activity, error) {
a := &commonpb.Link_Activity{}
if link.Type != string(a.ProtoReflect().Descriptor().FullName()) {
return nil, fmt.Errorf(
"cannot parse link type %q to %q",
link.Type,
a.ProtoReflect().Descriptor().FullName(),
)
}
if link.URL.Scheme != urlSchemeTemporalKey {
return nil, fmt.Errorf("failed to parse link to Link_Activity: invalid scheme: %s", link.URL.Scheme)
}
matches := urlActivityPathRE.FindStringSubmatch(link.URL.EscapedPath())
if len(matches) != 4 {
return nil, fmt.Errorf("failed to parse link to Link_Activity: malformed URL path")
}
var err error
a.Namespace, err = url.PathUnescape(matches[urlActivityPathRE.SubexpIndex(urlPathNamespaceKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_Activity: %w", err)
}
a.ActivityId, err = url.PathUnescape(matches[urlActivityPathRE.SubexpIndex(urlPathActivityIDKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_Activity: %w", err)
}
a.RunId, err = url.PathUnescape(matches[urlActivityPathRE.SubexpIndex(urlPathRunIDKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_Activity: %w", err)
}
return a, nil
}

func convertLinkWorkflowEventEventReferenceToURLQuery(eventRef *commonpb.Link_WorkflowEvent_EventReference) string {
values := url.Values{}
values.Set(linkWorkflowEventReferenceTypeKey, eventReferenceType)
Expand Down
10 changes: 10 additions & 0 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ func convertNexusLinks(nexusLinks []nexus.Link, log log.Logger) ([]*common.Link,
WorkflowEvent: link,
},
})
case string((&common.Link_Activity{}).ProtoReflect().Descriptor().FullName()):
link, err := ConvertNexusLinkToLinkActivity(nexusLink)
if err != nil {
return nil, err
}
links = append(links, &common.Link{
Variant: &common.Link_Activity_{
Activity: link,
},
})
case string((&common.Link_NexusOperation{}).ProtoReflect().Descriptor().FullName()):
// TODO: forward Link_NexusOperation variants once frontend validateLinks accepts them.
default:
Expand Down
Loading
Loading