在反应堆中顺序执行 Reactive 任务 Java
Sequential execution of Reactive tasks in reactor Java
我正在努力将阻塞顺序编排框架转换为反应式。现在,这些任务是动态的,并通过 JSON 输入输入到引擎中。引擎拉取 类 并执行 run()
方法并保存每个任务响应的状态。
我如何在反应器中实现相同的链接?如果这是静态 DAG,我会用 flatMap
或 then
运算符链接它,但由于它是动态的,我如何继续执行反应任务并收集每个任务的输出?
示例:
非反应性接口:
public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}
核心引擎
public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}
按照上面的示例,如果我将任务转换为 return Mono> 那么,我该如何等待或链接其他任务以对先前任务的结果进行操作?
任何帮助表示赞赏。谢谢。
更新::
响应式任务示例。
public class SampleTask implements OrchestrationTask {
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}
所以我将有一系列任务来完成各种事情,但每个任务的响应都取决于前一个任务并存储在 Orchestration Context 中。任何时候发生错误,编排上下文标志都将设置为 false,流量应该停止。
当然,我们可以:
- 从任务列表创建通量(如果适合反应式生成任务列表,则可以直接用通量替换该数组列表,否则保持原样);
flatMap()
您的 task.run()
方法的每个任务(根据现在的问题 returns a Mono
;
- 确保我们只在
evaluateResult()
为真时消耗元素;
- ...然后最后只是 return 和以前一样的
SUCCESS
状态。
因此,将所有这些放在一起,只需将循环和 return 语句替换为:
Flux.fromIterable(tasks)
.flatMap(task -> task.run(ctx))
.takeWhile(stepContext -> evaluateResult(stepContext))
.then(Mono.just(Status.SUCCESS));
(由于我们已将其设为响应式,您的方法显然需要 return 一个 Mono<Status>
而不仅仅是 Status
。)
根据评论更新 - 如果您只想“一次执行一个”而不是同时执行多个,则可以使用 concatMap()
而不是 flatMap()
。
我正在努力将阻塞顺序编排框架转换为反应式。现在,这些任务是动态的,并通过 JSON 输入输入到引擎中。引擎拉取 类 并执行 run()
方法并保存每个任务响应的状态。
我如何在反应器中实现相同的链接?如果这是静态 DAG,我会用 flatMap
或 then
运算符链接它,但由于它是动态的,我如何继续执行反应任务并收集每个任务的输出?
示例:
非反应性接口:
public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}
核心引擎
public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}
按照上面的示例,如果我将任务转换为 return Mono> 那么,我该如何等待或链接其他任务以对先前任务的结果进行操作? 任何帮助表示赞赏。谢谢。
更新::
响应式任务示例。
public class SampleTask implements OrchestrationTask {
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}
所以我将有一系列任务来完成各种事情,但每个任务的响应都取决于前一个任务并存储在 Orchestration Context 中。任何时候发生错误,编排上下文标志都将设置为 false,流量应该停止。
当然,我们可以:
- 从任务列表创建通量(如果适合反应式生成任务列表,则可以直接用通量替换该数组列表,否则保持原样);
flatMap()
您的task.run()
方法的每个任务(根据现在的问题 returns aMono
;- 确保我们只在
evaluateResult()
为真时消耗元素; - ...然后最后只是 return 和以前一样的
SUCCESS
状态。
因此,将所有这些放在一起,只需将循环和 return 语句替换为:
Flux.fromIterable(tasks)
.flatMap(task -> task.run(ctx))
.takeWhile(stepContext -> evaluateResult(stepContext))
.then(Mono.just(Status.SUCCESS));
(由于我们已将其设为响应式,您的方法显然需要 return 一个 Mono<Status>
而不仅仅是 Status
。)
根据评论更新 - 如果您只想“一次执行一个”而不是同时执行多个,则可以使用 concatMap()
而不是 flatMap()
。