将外部事件发送到工作流
Sending external events to a workflow
在我们的 Cadence 工作流中,我们经常需要在继续之前等待外部事件一定的时间(即电子邮件已读、link 单击等)。
我想知道将这些事件通知我们的工作流的最佳方式是什么。信号是正确的方式,还是我们应该创建一个 activity 来等待事件?
据我所知,我们需要创建一个信号通道 ch := workflow.GetSignalChannel(ctx, SignalName)
,但是上下文在活动中不可用。
信号是向工作流发送事件的推荐方式。
Go 工作流的常用模式是使用 Selector 等待多个信号通道以及定时器未来。
去样本:
sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)
s := workflow.NewSelector(ctx)
var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
c.Receive(ctx, signal2)
})
s.Select(ctx)
if signal1 == nil && signal2 == nil {
// handle timeout
} else {
// process signals
}
Java样本:
public interface MyWorkflow {
@WorkflowMethod
void main();
@SignalMethod
void signal1(Signal1Struct signal);
@SignalMethod
void signal2(Signal2Struct signal);
}
public class MyWorkflowImpl implements MyWorkflow {
private Signal1Struct signal1;
private Signal2Struct signal2;
@Override
public void main() {
Workflow.await(Duration.ofMinutes(30),
() -> signal1 != null || signal2 != null);
if (signal1 == null && signal2 == null) {
// handle timeout
}
// process signals
}
@Override
public void signal1(Signal1Struct signal) {
signal1 = signal;
}
@Override
public void signal2(Signal2Struct signal) {
signal2 = signal;
}
}
请注意,考虑工作流工作人员中断是个好主意。例如,假设上述工作流已启动,并且在启动后 40 分钟收到信号,而所有工作流工作人员都已关闭。在这种情况下,当工人被带回来时,timeout
future 和 signCh
都不是空的。由于 Selector 不保证顺序,因此即使信号是在计时器之后收到的,信号也有可能在计时器之前传送。所以你的代码逻辑应该考虑到这一点。例如,硬性要求必须忽略工作流开始后 30 分钟后收到的信号。那么上面的例子要修改为:
去样本:
...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
// handle timeout
} else {
// process signals
}
Java样本:
public void main() {
long start = Workflow.currentTimeMillis(); // must use workflow clock
Duration timeout = Duration.ofMinutes(30);
Workflow.await(timeout, () -> signal1 != null || signal2 != null);
long duration = Workflow.currentTimeMillis() - start;
if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
// handle timeout
}
// process signals
}
即使工作流执行延迟一个小时,更新后的代码也能正常运行。
编辑:添加了信号发送示例
去样本:
c, err := client.NewClient(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
err := c.SignalWorkflow(context.Background(), <workflowId>, "", "signal1", Signal1Struct{})
Java样本:
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
GreetingWorkflow myWorkflow =
client.newWorkflowStub(MyWorkflow.class, <workflowId>);
myWorkflow.signal1(new Signal1Struct());
在我们的 Cadence 工作流中,我们经常需要在继续之前等待外部事件一定的时间(即电子邮件已读、link 单击等)。
我想知道将这些事件通知我们的工作流的最佳方式是什么。信号是正确的方式,还是我们应该创建一个 activity 来等待事件?
据我所知,我们需要创建一个信号通道 ch := workflow.GetSignalChannel(ctx, SignalName)
,但是上下文在活动中不可用。
信号是向工作流发送事件的推荐方式。
Go 工作流的常用模式是使用 Selector 等待多个信号通道以及定时器未来。
去样本:
sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)
s := workflow.NewSelector(ctx)
var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
c.Receive(ctx, signal2)
})
s.Select(ctx)
if signal1 == nil && signal2 == nil {
// handle timeout
} else {
// process signals
}
Java样本:
public interface MyWorkflow {
@WorkflowMethod
void main();
@SignalMethod
void signal1(Signal1Struct signal);
@SignalMethod
void signal2(Signal2Struct signal);
}
public class MyWorkflowImpl implements MyWorkflow {
private Signal1Struct signal1;
private Signal2Struct signal2;
@Override
public void main() {
Workflow.await(Duration.ofMinutes(30),
() -> signal1 != null || signal2 != null);
if (signal1 == null && signal2 == null) {
// handle timeout
}
// process signals
}
@Override
public void signal1(Signal1Struct signal) {
signal1 = signal;
}
@Override
public void signal2(Signal2Struct signal) {
signal2 = signal;
}
}
请注意,考虑工作流工作人员中断是个好主意。例如,假设上述工作流已启动,并且在启动后 40 分钟收到信号,而所有工作流工作人员都已关闭。在这种情况下,当工人被带回来时,timeout
future 和 signCh
都不是空的。由于 Selector 不保证顺序,因此即使信号是在计时器之后收到的,信号也有可能在计时器之前传送。所以你的代码逻辑应该考虑到这一点。例如,硬性要求必须忽略工作流开始后 30 分钟后收到的信号。那么上面的例子要修改为:
去样本:
...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
// handle timeout
} else {
// process signals
}
Java样本:
public void main() {
long start = Workflow.currentTimeMillis(); // must use workflow clock
Duration timeout = Duration.ofMinutes(30);
Workflow.await(timeout, () -> signal1 != null || signal2 != null);
long duration = Workflow.currentTimeMillis() - start;
if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
// handle timeout
}
// process signals
}
即使工作流执行延迟一个小时,更新后的代码也能正常运行。
编辑:添加了信号发送示例
去样本:
c, err := client.NewClient(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
err := c.SignalWorkflow(context.Background(), <workflowId>, "", "signal1", Signal1Struct{})
Java样本:
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
GreetingWorkflow myWorkflow =
client.newWorkflowStub(MyWorkflow.class, <workflowId>);
myWorkflow.signal1(new Signal1Struct());