在 Cadence 工作流程的循环中调用相同的 activity
Invoking the same activity inside a loop in cadence workflow
我对 Cadence 工作流有疑问,我们可以在 for 循环中调用具有不同输入的相同 activity 吗?该代码是确定性的吗?如果执行工作流的 worker 在执行过程中停止并稍后重新启动,cadence 在重新构建工作流时是否能够重放事件。
比如我有下面的代码
func init() {
workflow.RegisterWithOptions(SampleWorkFlow, workflow.RegisterOptions{Name: "SampleWorkFlow"})
activity.RegisterWithOptions(SampleActivity, activity.RegisterOptions{Name: "SampleActivity"})
activity.RegisterWithOptions(SecondActivity, activity.RegisterOptions{Name: "SecondActivity"})
}
// SampleWorkFlow comment
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
for i := 1; i <= 10; i++ {
value := i
workflow.Go(ctx, func(ctx workflow.Context) {
err := workflow.ExecuteActivity(ctx, "SecondActivity", input, value).Get(ctx, &result)
if err != nil {
log.Println("err=", err)
}
})
}
return nil
}
// SampleActivity comment
func SampleActivity(ctx context.Context, value, v1 string) (string, error) {
fmt.Println("Sample activity start")
for i := 0; i <= 10; i++ {
fmt.Println(i)
}
return "Hello " + value, nil
}
// SecondActivity comment
func SecondActivity(ctx context.Context, value int) (string, error) {
fmt.Println("Second activity start")
fmt.Println("value=", value)
fmt.Println("Second activity going to end")
return "Hello " + fmt.Sprintf("%d", value), nil
}
此处,第二个 activity 在 for 循环内并行调用。
我的第一个问题是,这段代码是确定性的吗?
假设在循环的 5 次迭代后,当 i = 5 时,执行此工作流的工作人员终止,如果工作流在另一个中启动,cadence 是否能够重播事件
工人?
你能回答我的问题吗?
是的,这段代码是确定性的。它不调用任何 non-deterministic 操作(如随机或 UUID 生成)并使用 workflow.Go
启动 goroutine。所以它是确定性的。代码的复杂性在定义其确定性方面没有发挥作用。
Unrelated nit. 无需在示例中使用 goroutine,因为 ExecuteActivity
调用已经 non-blocking 通过返回 Future。
所以样本可以简化为:
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
for i := 1; i <= 10; i++ {
workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
}
return nil
}
请注意,此示例仍然可能不会按照您预期的方式执行,因为它在不等待活动完成的情况下完成工作流。所以这些活动都还没开始呢
这是等待活动完成的代码:
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
var results []workflow.Future
for i := 1; i <= 10; i++ {
future := workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
results = append(results, future)
}
for i := 0; i < 10; i++ {
var result string
err := results[i].Get(ctx, &result)
if err != nil {
log.Println("err=", err)
}
}
return nil
}
我对 Cadence 工作流有疑问,我们可以在 for 循环中调用具有不同输入的相同 activity 吗?该代码是确定性的吗?如果执行工作流的 worker 在执行过程中停止并稍后重新启动,cadence 在重新构建工作流时是否能够重放事件。
比如我有下面的代码
func init() {
workflow.RegisterWithOptions(SampleWorkFlow, workflow.RegisterOptions{Name: "SampleWorkFlow"})
activity.RegisterWithOptions(SampleActivity, activity.RegisterOptions{Name: "SampleActivity"})
activity.RegisterWithOptions(SecondActivity, activity.RegisterOptions{Name: "SecondActivity"})
}
// SampleWorkFlow comment
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
for i := 1; i <= 10; i++ {
value := i
workflow.Go(ctx, func(ctx workflow.Context) {
err := workflow.ExecuteActivity(ctx, "SecondActivity", input, value).Get(ctx, &result)
if err != nil {
log.Println("err=", err)
}
})
}
return nil
}
// SampleActivity comment
func SampleActivity(ctx context.Context, value, v1 string) (string, error) {
fmt.Println("Sample activity start")
for i := 0; i <= 10; i++ {
fmt.Println(i)
}
return "Hello " + value, nil
}
// SecondActivity comment
func SecondActivity(ctx context.Context, value int) (string, error) {
fmt.Println("Second activity start")
fmt.Println("value=", value)
fmt.Println("Second activity going to end")
return "Hello " + fmt.Sprintf("%d", value), nil
}
此处,第二个 activity 在 for 循环内并行调用。 我的第一个问题是,这段代码是确定性的吗?
假设在循环的 5 次迭代后,当 i = 5 时,执行此工作流的工作人员终止,如果工作流在另一个中启动,cadence 是否能够重播事件 工人?
你能回答我的问题吗?
是的,这段代码是确定性的。它不调用任何 non-deterministic 操作(如随机或 UUID 生成)并使用 workflow.Go
启动 goroutine。所以它是确定性的。代码的复杂性在定义其确定性方面没有发挥作用。
Unrelated nit. 无需在示例中使用 goroutine,因为 ExecuteActivity
调用已经 non-blocking 通过返回 Future。
所以样本可以简化为:
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
for i := 1; i <= 10; i++ {
workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
}
return nil
}
请注意,此示例仍然可能不会按照您预期的方式执行,因为它在不等待活动完成的情况下完成工作流。所以这些活动都还没开始呢
这是等待活动完成的代码:
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
var results []workflow.Future
for i := 1; i <= 10; i++ {
future := workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
results = append(results, future)
}
for i := 0; i < 10; i++ {
var result string
err := results[i].Get(ctx, &result)
if err != nil {
log.Println("err=", err)
}
}
return nil
}