如何创建自定义工作流定义?
How to create custom workflow definitions?
我们要求让我们的用户创建自己的工作流程。这些工作流可以有简单的是/否分支以及等待来自外部事件的信号。如果我们有完善的工作流定义,这就不是问题,但是由于工作流可以是动态的,这会带来一个更棘手的问题。
Temporal 工作流是直接实现业务逻辑的代码。
对于无法在代码中硬编码业务逻辑的用例,应编写外部工作流定义语言的解释器。这种语言通常称为 DSL,因为它们在为特定领域实现时非常有用。 DSL 通常基于 YAML/Json/XML。有时它只是数据库表中的数据。
以下是我构建工作流代码以支持自定义 DSL 的方式:
- 接收当前工作流定义 ID 和状态的 activity 以及要执行的操作列表 return。 activity 将当前状态(包括最近执行的操作的结果)应用于适当的 DSL 实例。结果是一组要执行的下一个操作。操作是特定于 DSL 的,但最常见的是 执行 activity、等待特定信号、休眠一段时间、完成或失败工作流。
- 实现循环的工作流调用上述 activity 并执行请求的操作,直到请求工作流完成操作。
下面是一个普通 DSL 的示例代码,它指定了要执行的一系列活动:
@ActivityInterface
public interface Interpreter {
String getNextStep(String workflowType, String lastActivity);
}
public class SequenceInterpreter implements Interpreter {
// dslWorkflowType->(activityType->nextActivity)
private final Map<String, Map<String, String>> definitions;
public SequenceInterpreter(Map<String, Map<String, String>> definitions) {
this.definitions = definitions;
}
@Override
public String getNextStep(String workflowType, String lastActivity) {
Map<String, String> stateTransitions = definitions.get(workflowType);
return stateTransitions.get(lastActivity);
}
}
@WorkflowInterface
public interface InterpreterWorkflow {
@WorkflowMethod
String execute(String type, String input);
@QueryMethod
String getCurrentActivity();
}
public class InterpreterWorkflowImpl implements InterpreterWorkflow {
private final Interpreter interpreter = Workflow.newActivityStub(Interpreter.class);
private final ActivityStub activities =
Workflow.newUntypedActivityStub(
new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofMinutes(10)).build());
private String currentActivity = "init";
private String lastActivityResult;
@Override
public String execute(String workflowType, String input) {
do {
currentActivity = interpreter.getNextStep(workflowType, currentActivity);
lastActivityResult = activities.execute(currentActivity, String.class, lastActivityResult);
} while (currentActivity != null);
return lastActivityResult;
}
@Override
public String getCurrentActivity() {
return currentActivity;
}
}
显然 real-life 解释器 activity 将接收一个更复杂的状态对象作为参数和 return 一个可能包含多个命令类型列表的结构。
我们要求让我们的用户创建自己的工作流程。这些工作流可以有简单的是/否分支以及等待来自外部事件的信号。如果我们有完善的工作流定义,这就不是问题,但是由于工作流可以是动态的,这会带来一个更棘手的问题。
Temporal 工作流是直接实现业务逻辑的代码。
对于无法在代码中硬编码业务逻辑的用例,应编写外部工作流定义语言的解释器。这种语言通常称为 DSL,因为它们在为特定领域实现时非常有用。 DSL 通常基于 YAML/Json/XML。有时它只是数据库表中的数据。
以下是我构建工作流代码以支持自定义 DSL 的方式:
- 接收当前工作流定义 ID 和状态的 activity 以及要执行的操作列表 return。 activity 将当前状态(包括最近执行的操作的结果)应用于适当的 DSL 实例。结果是一组要执行的下一个操作。操作是特定于 DSL 的,但最常见的是 执行 activity、等待特定信号、休眠一段时间、完成或失败工作流。
- 实现循环的工作流调用上述 activity 并执行请求的操作,直到请求工作流完成操作。
下面是一个普通 DSL 的示例代码,它指定了要执行的一系列活动:
@ActivityInterface
public interface Interpreter {
String getNextStep(String workflowType, String lastActivity);
}
public class SequenceInterpreter implements Interpreter {
// dslWorkflowType->(activityType->nextActivity)
private final Map<String, Map<String, String>> definitions;
public SequenceInterpreter(Map<String, Map<String, String>> definitions) {
this.definitions = definitions;
}
@Override
public String getNextStep(String workflowType, String lastActivity) {
Map<String, String> stateTransitions = definitions.get(workflowType);
return stateTransitions.get(lastActivity);
}
}
@WorkflowInterface
public interface InterpreterWorkflow {
@WorkflowMethod
String execute(String type, String input);
@QueryMethod
String getCurrentActivity();
}
public class InterpreterWorkflowImpl implements InterpreterWorkflow {
private final Interpreter interpreter = Workflow.newActivityStub(Interpreter.class);
private final ActivityStub activities =
Workflow.newUntypedActivityStub(
new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofMinutes(10)).build());
private String currentActivity = "init";
private String lastActivityResult;
@Override
public String execute(String workflowType, String input) {
do {
currentActivity = interpreter.getNextStep(workflowType, currentActivity);
lastActivityResult = activities.execute(currentActivity, String.class, lastActivityResult);
} while (currentActivity != null);
return lastActivityResult;
}
@Override
public String getCurrentActivity() {
return currentActivity;
}
}
显然 real-life 解释器 activity 将接收一个更复杂的状态对象作为参数和 return 一个可能包含多个命令类型列表的结构。