何时使用 selector.AddReceive、selector.Select
When to use selector.AddReceive, selector.Select
如果我应该使用 selector.AddReceive
和 selector.Select
,希望得到一些说明。这可能不是 Cadence 的问题,但也许我缺少一些关于 Golang 的知识。
对于 selector.Select
我认为基本思想是我们等待通道的下一个输出。不完全确定 selector.AddRecieve
的作用。
例如,在节奏示例中,local_activity
link 并粘贴在下方:
func signalHandlingWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
ch := workflow.GetSignalChannel(ctx, SignalName)
for {
var signal string
if more := ch.Receive(ctx, &signal); !more {
logger.Info("Signal channel closed")
return cadence.NewCustomError("signal_channel_closed")
}
logger.Info("Signal received.", zap.String("signal", signal))
if signal == "exit" {
break
}
cwo := workflow.ChildWorkflowOptions{
ExecutionStartToCloseTimeout: time.Minute,
// TaskStartToCloseTimeout must be larger than all local activity execution time, because DecisionTask won't
// return until all local activities completed.
TaskStartToCloseTimeout: time.Second * 30,
}
childCtx := workflow.WithChildOptions(ctx, cwo)
var processResult string
err := workflow.ExecuteChildWorkflow(childCtx, processingWorkflow, signal).Get(childCtx, &processResult)
if err != nil {
return err
}
logger.Sugar().Infof("Processed signal: %v, result: %v", signal, processResult)
}
return nil
}
我们不使用任何 selector.AddReceive
但是,在此处的示例中,它也使用信号通道:
我也把代码贴在这里
func SampleTimerWorkflow(ctx workflow.Context, timerDelay time.Duration) error
{
logger := workflow.GetLogger(ctx)
resetCh := workflow.GetSignalChannel(ctx, "reset")
timerFired := false
delay := timerDelay
for ;!timerFired; {
selector := workflow.NewSelector(ctx)
logger.Sugar().Infof("Setting up a timer to fire after: %v", delay)
timerCancelCtx, cancelTimerHandler := workflow.WithCancel(ctx)
timerFuture := workflow.NewTimer(timerCancelCtx, delay)
selector.AddFuture(timerFuture, func(f workflow.Future) {
logger.Info("Timer Fired.")
timerFired = true
})
selector.AddReceive(resetCh, func(c workflow.Channel, more bool) {
logger.Info("Reset signal received.")
logger.Info("Cancel outstanding timer.")
cancelTimerHandler()
var t int
c.Receive(ctx, &t)
logger.Sugar().Infof("Reset delay: %v seconds", t)
delay = time.Second * time.Duration(t)
})
logger.Info("Waiting for timer to fire.")
selector.Select(ctx)
}
workflow.GetLogger(ctx).Info("Workflow completed.")
return nil
}
你可以看到有selector.AddReceive
,我不太清楚它的用途是什么,我应该什么时候使用它。
我正在尝试向我的工作流程发送一个允许我延长到期时间的信号。意思是,它会延迟 ExpirationActivity
的调用
并且当按照这个例子(结合我当前的代码)时,只要我发送重置信号,timerFired
似乎立即设置为 true。
我当前的代码如下(我删除了一些不相关的 if 语句),之前,我只使用了 selector.Select
的一个实例,但我的代码在某处运行不正常。
func Workflow(ctx workflow.Context) (string, error) {
// local state per bonus workflow
bonusAcceptanceState := pending
logger := workflow.GetLogger(ctx).Sugar()
logger.Info("Bonus workflow started")
timerCreated := false
timerFired := false
delay := timerDelay
// To query state in Cadence GUI
err := workflow.SetQueryHandler(ctx, "bonusAcceptanceState", func(input []byte) (string, error) {
return bonusAcceptanceState, nil
})
if err != nil {
logger.Info("SetQueryHandler failed: " + err.Error())
return "", err
}
info := workflow.GetInfo(ctx)
executionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
// decisionTimeout := time.Duration(info.TaskStartToCloseTimeoutSeconds) * time.Second
decisionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
maxRetryTime := executionTimeout // retry for the entire time
retryPolicy := &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: executionTimeout,
ExpirationInterval: maxRetryTime,
MaximumAttempts: 0, // unlimited, bound by maxRetryTime
NonRetriableErrorReasons: []string{},
}
ao := workflow.ActivityOptions{
TaskList: taskList,
ScheduleToStartTimeout: executionTimeout, // time until a task has to be picked up by a worker
ScheduleToCloseTimeout: executionTimeout, // total execution timeout
StartToCloseTimeout: decisionTimeout, // time that a worker can take to process a task
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)
selector := workflow.NewSelector(ctx)
timerCancelCtx, cancelTimerHandler := workflow.WithCancel(ctx)
var signal *singalType
for {
signalChan := workflow.GetSignalChannel(ctx, signalName)
// resetCh := workflow.GetSignalChannel(ctx, "reset")
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
c.Receive(ctx, &signal)
})
selector.Select(ctx)
if signal.Type == "exit" {
return "", nil
}
// We can check the age and return an appropriate response
if signal.Type == "ACCEPT" {
if bonusAcceptanceState == pending {
logger.Info("Bonus Accepted")
bonusAcceptanceState = accepted
var status string
future := workflow.ExecuteActivity(ctx, AcceptActivity)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
}
// Start expiration timer
if !timerCreated {
timerCreated = true
timerFuture := workflow.NewTimer(timerCancelCtx, delay)
selector.AddFuture(timerFuture, func(f workflow.Future) {
logger.Info("Timer Fired.")
timerFired = true
})
}
}
}
if signal.Type == "ROLLOVER_1X" && bonusAcceptanceState == accepted {
var status string
future := workflow.ExecuteActivity(ctx, Rollover1x)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
}
selector.Select(ctx)
}
if signal.Type == "ROLLOVER_COMPLETE" && bonusAcceptanceState == accepted {
var status string
future := workflow.ExecuteActivity(ctx, RolloverComplete)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
return "", err
}
// Workflow is terminated on return result
return status, nil
}
for; !timerFired && bonusAcceptanceState == accepted && signal.Type == "RESET" {
cancelTimerHandler()
i, err := strconv.Atoi(signal.Value)
if err != nil {
logger.Infow("error in converting")
}
logger.Infof("Reset delay: %v seconds", i)
delay = time.Minute * time.Duration(i)
timerFuture := workflow.NewTimer(timerCancelCtx, delay)
selector.AddFuture(timerFuture, func(f workflow.Future) {
logger.Info("Timer Fired.")
timerFired = true
})
selector.Select(ctx)
}
if timerFired {
var status string
future := workflow.ExecuteActivity(ctx, ExpirationActivity)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
}
return status, nil
}
}
}
检查未来return结果
selector.AddFuture(timerFuture, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err == nil {
logger.Info("Timer Fired.")
timerFired = true
}
})
TL;DR:
- 只有当您需要让 select 或收听频道时,您才会使用
selector.AddReceive
,就像在您的第二个代码片段中一样。如果你只需要直接处理来自通道的信号而不需要selector,那么你不需要使用它。
selector.Select
就是让代码等待一些事件的发生。因为你不想使用忙循环等待。
有关何时使用它们的更多详细信息
本质上,这与Golang select statement完全相同的概念。 Golang select 允许你等待定时器和通道。除了 Golang 没有 selector.Select()
只是因为它融入了语言本身,但 Cadence 是一个库。
与在 golang 中一样,您不必使用 select
语句来使用计时器或通道。只有当你必须编写一些代码来监听多个事件源时才需要它。
比如你有两个通道,你想写一些通用的逻辑来处理这两个通道,比如增加一个计数器。该计数器不属于任何通道。这是一个普通的柜台。然后使用 selector
看起来不错。
chA := workflow.GetSignalChannel(ctx, SignalNameA)
chB := workflow.GetSignalChannel(ctx, SignalNameB)
counter := 0
selector.AddReceive(chA)
selector.AddReceive(chB)
For {
selector.Select()
counter += 1
}
带有 selector 的工作流代码在 Golang 中看起来与此非常相似:
counter := 0
for {
select {
case _ := <- chA:
counter += 1
case _ := <- chB:
counter += 1
}
}
否则你可能不得不使用两个 goroutines 来监听每个频道,并进行计数。 golang 代码如下所示:
counter := 0
go func(){
for{
_ := <- chA
counter += 1
}
}()
go func(){
for{
_ := <- chB
counter += 1
}
}()
这可能是竞争条件的问题。除非计数器被很好地实现为线程安全的。
而在 Cadence 工作流代码中,它是这样的:
chA := workflow.GetSignalChannel(ctx, SignalNameA)
chB := workflow.GetSignalChannel(ctx, SignalNameB)
counter := 0
Workflow.Go(ctx){
for{
chA.Receive(ctx,nil)
counter +=1
}
}
Workflow.Go(ctx){
for{
chB.Receive(ctx,nil)
counter +=1
}
}
然而,Cadence中没有这种竞争条件,因为Cadence的coroutine(started byWorkflow.Go()
)并不是真正的并发。上面的两个工作流代码都应该可以完美运行。
但 Cadence 仍然提供与 Golang 相同的 selector
,主要是因为第一种写代码更自然。
如果我应该使用 selector.AddReceive
和 selector.Select
,希望得到一些说明。这可能不是 Cadence 的问题,但也许我缺少一些关于 Golang 的知识。
对于 selector.Select
我认为基本思想是我们等待通道的下一个输出。不完全确定 selector.AddRecieve
的作用。
例如,在节奏示例中,local_activity
link 并粘贴在下方:
func signalHandlingWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
ch := workflow.GetSignalChannel(ctx, SignalName)
for {
var signal string
if more := ch.Receive(ctx, &signal); !more {
logger.Info("Signal channel closed")
return cadence.NewCustomError("signal_channel_closed")
}
logger.Info("Signal received.", zap.String("signal", signal))
if signal == "exit" {
break
}
cwo := workflow.ChildWorkflowOptions{
ExecutionStartToCloseTimeout: time.Minute,
// TaskStartToCloseTimeout must be larger than all local activity execution time, because DecisionTask won't
// return until all local activities completed.
TaskStartToCloseTimeout: time.Second * 30,
}
childCtx := workflow.WithChildOptions(ctx, cwo)
var processResult string
err := workflow.ExecuteChildWorkflow(childCtx, processingWorkflow, signal).Get(childCtx, &processResult)
if err != nil {
return err
}
logger.Sugar().Infof("Processed signal: %v, result: %v", signal, processResult)
}
return nil
}
我们不使用任何 selector.AddReceive
但是,在此处的示例中,它也使用信号通道:
我也把代码贴在这里
func SampleTimerWorkflow(ctx workflow.Context, timerDelay time.Duration) error
{
logger := workflow.GetLogger(ctx)
resetCh := workflow.GetSignalChannel(ctx, "reset")
timerFired := false
delay := timerDelay
for ;!timerFired; {
selector := workflow.NewSelector(ctx)
logger.Sugar().Infof("Setting up a timer to fire after: %v", delay)
timerCancelCtx, cancelTimerHandler := workflow.WithCancel(ctx)
timerFuture := workflow.NewTimer(timerCancelCtx, delay)
selector.AddFuture(timerFuture, func(f workflow.Future) {
logger.Info("Timer Fired.")
timerFired = true
})
selector.AddReceive(resetCh, func(c workflow.Channel, more bool) {
logger.Info("Reset signal received.")
logger.Info("Cancel outstanding timer.")
cancelTimerHandler()
var t int
c.Receive(ctx, &t)
logger.Sugar().Infof("Reset delay: %v seconds", t)
delay = time.Second * time.Duration(t)
})
logger.Info("Waiting for timer to fire.")
selector.Select(ctx)
}
workflow.GetLogger(ctx).Info("Workflow completed.")
return nil
}
你可以看到有selector.AddReceive
,我不太清楚它的用途是什么,我应该什么时候使用它。
我正在尝试向我的工作流程发送一个允许我延长到期时间的信号。意思是,它会延迟 ExpirationActivity
并且当按照这个例子(结合我当前的代码)时,只要我发送重置信号,timerFired
似乎立即设置为 true。
我当前的代码如下(我删除了一些不相关的 if 语句),之前,我只使用了 selector.Select
的一个实例,但我的代码在某处运行不正常。
func Workflow(ctx workflow.Context) (string, error) {
// local state per bonus workflow
bonusAcceptanceState := pending
logger := workflow.GetLogger(ctx).Sugar()
logger.Info("Bonus workflow started")
timerCreated := false
timerFired := false
delay := timerDelay
// To query state in Cadence GUI
err := workflow.SetQueryHandler(ctx, "bonusAcceptanceState", func(input []byte) (string, error) {
return bonusAcceptanceState, nil
})
if err != nil {
logger.Info("SetQueryHandler failed: " + err.Error())
return "", err
}
info := workflow.GetInfo(ctx)
executionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
// decisionTimeout := time.Duration(info.TaskStartToCloseTimeoutSeconds) * time.Second
decisionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
maxRetryTime := executionTimeout // retry for the entire time
retryPolicy := &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: executionTimeout,
ExpirationInterval: maxRetryTime,
MaximumAttempts: 0, // unlimited, bound by maxRetryTime
NonRetriableErrorReasons: []string{},
}
ao := workflow.ActivityOptions{
TaskList: taskList,
ScheduleToStartTimeout: executionTimeout, // time until a task has to be picked up by a worker
ScheduleToCloseTimeout: executionTimeout, // total execution timeout
StartToCloseTimeout: decisionTimeout, // time that a worker can take to process a task
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)
selector := workflow.NewSelector(ctx)
timerCancelCtx, cancelTimerHandler := workflow.WithCancel(ctx)
var signal *singalType
for {
signalChan := workflow.GetSignalChannel(ctx, signalName)
// resetCh := workflow.GetSignalChannel(ctx, "reset")
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
c.Receive(ctx, &signal)
})
selector.Select(ctx)
if signal.Type == "exit" {
return "", nil
}
// We can check the age and return an appropriate response
if signal.Type == "ACCEPT" {
if bonusAcceptanceState == pending {
logger.Info("Bonus Accepted")
bonusAcceptanceState = accepted
var status string
future := workflow.ExecuteActivity(ctx, AcceptActivity)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
}
// Start expiration timer
if !timerCreated {
timerCreated = true
timerFuture := workflow.NewTimer(timerCancelCtx, delay)
selector.AddFuture(timerFuture, func(f workflow.Future) {
logger.Info("Timer Fired.")
timerFired = true
})
}
}
}
if signal.Type == "ROLLOVER_1X" && bonusAcceptanceState == accepted {
var status string
future := workflow.ExecuteActivity(ctx, Rollover1x)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
}
selector.Select(ctx)
}
if signal.Type == "ROLLOVER_COMPLETE" && bonusAcceptanceState == accepted {
var status string
future := workflow.ExecuteActivity(ctx, RolloverComplete)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
return "", err
}
// Workflow is terminated on return result
return status, nil
}
for; !timerFired && bonusAcceptanceState == accepted && signal.Type == "RESET" {
cancelTimerHandler()
i, err := strconv.Atoi(signal.Value)
if err != nil {
logger.Infow("error in converting")
}
logger.Infof("Reset delay: %v seconds", i)
delay = time.Minute * time.Duration(i)
timerFuture := workflow.NewTimer(timerCancelCtx, delay)
selector.AddFuture(timerFuture, func(f workflow.Future) {
logger.Info("Timer Fired.")
timerFired = true
})
selector.Select(ctx)
}
if timerFired {
var status string
future := workflow.ExecuteActivity(ctx, ExpirationActivity)
if err := future.Get(ctx, &status); err != nil {
logger.Errorw("Activity failed", "error", err)
}
return status, nil
}
}
}
检查未来return结果
selector.AddFuture(timerFuture, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err == nil {
logger.Info("Timer Fired.")
timerFired = true
}
})
TL;DR:
- 只有当您需要让 select 或收听频道时,您才会使用
selector.AddReceive
,就像在您的第二个代码片段中一样。如果你只需要直接处理来自通道的信号而不需要selector,那么你不需要使用它。 selector.Select
就是让代码等待一些事件的发生。因为你不想使用忙循环等待。
有关何时使用它们的更多详细信息
本质上,这与Golang select statement完全相同的概念。 Golang select 允许你等待定时器和通道。除了 Golang 没有 selector.Select()
只是因为它融入了语言本身,但 Cadence 是一个库。
与在 golang 中一样,您不必使用 select
语句来使用计时器或通道。只有当你必须编写一些代码来监听多个事件源时才需要它。
比如你有两个通道,你想写一些通用的逻辑来处理这两个通道,比如增加一个计数器。该计数器不属于任何通道。这是一个普通的柜台。然后使用 selector
看起来不错。
chA := workflow.GetSignalChannel(ctx, SignalNameA)
chB := workflow.GetSignalChannel(ctx, SignalNameB)
counter := 0
selector.AddReceive(chA)
selector.AddReceive(chB)
For {
selector.Select()
counter += 1
}
带有 selector 的工作流代码在 Golang 中看起来与此非常相似:
counter := 0
for {
select {
case _ := <- chA:
counter += 1
case _ := <- chB:
counter += 1
}
}
否则你可能不得不使用两个 goroutines 来监听每个频道,并进行计数。 golang 代码如下所示:
counter := 0
go func(){
for{
_ := <- chA
counter += 1
}
}()
go func(){
for{
_ := <- chB
counter += 1
}
}()
这可能是竞争条件的问题。除非计数器被很好地实现为线程安全的。
而在 Cadence 工作流代码中,它是这样的:
chA := workflow.GetSignalChannel(ctx, SignalNameA)
chB := workflow.GetSignalChannel(ctx, SignalNameB)
counter := 0
Workflow.Go(ctx){
for{
chA.Receive(ctx,nil)
counter +=1
}
}
Workflow.Go(ctx){
for{
chB.Receive(ctx,nil)
counter +=1
}
}
然而,Cadence中没有这种竞争条件,因为Cadence的coroutine(started byWorkflow.Go()
)并不是真正的并发。上面的两个工作流代码都应该可以完美运行。
但 Cadence 仍然提供与 Golang 相同的 selector
,主要是因为第一种写代码更自然。