依次将 List<Mono<Boolean>> 缩减为单个 Mono<Boolean>,如果有 return false,则中止处理
Sequentially reduce List<Mono<Boolean>> to single Mono<Boolean>, and abort processing if any return false
我有一个按顺序执行的任务列表,如果任务成功则返回 true
,如果失败则返回 false
(非例外情况 - 第三种情况它遇到了某种异常,它也应该中止处理,但我想将其视为不同的第三种情况。
这些任务表示为 Mono<Boolean>
个实例的列表。可能有任何数量。从这个列表中,我想生成一个具有以下特征的 Mono<Boolean>
:
- 它按顺序执行每个任务。
- 如果列表 returns
false
中的任何任务或遇到异常,它应该中止任何进一步任务的处理。
- 它returns
true
如果所有任务都成功; false
如果任何任务非异常失败;如果任何任务遇到异常,它应该发出错误。
这是 3 个任务的示例列表 - 第一个成功,第二个失败 - 所以我不希望第三个 运行:
Mono<Boolean> task1 = Mono.create(sink -> {
try {
System.out.println("Executing task 1...");
// Simulate some work being done
Thread.sleep(1000);
System.out.println("Finished executing task 1.");
// This task is successful, return true
sink.success(true);
} catch (InterruptedException e) {
sink.error(e);
}
});
Mono<Boolean> task2 = Mono.create(sink -> {
try {
System.out.println("Executing task 2...");
// Simulate some work being done
Thread.sleep(1000);
System.out.println("Finished executing task 2.");
// This task is NOT successful, return false
sink.success(false);
} catch (InterruptedException e) {
sink.error(e);
}
});
Mono<Boolean> task3 = Mono.create(sink -> {
try {
System.out.println("Executing task 3...");
// Simulate some work being done
Thread.sleep(1000);
System.out.println("Finished executing task 3.");
// This task is successful, return true
sink.success(true);
} catch (InterruptedException e) {
sink.error(e);
}
});
List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);
我的第一个想法是将 Mono
的列表转换为 Flux
,然后使用 Flux#reduce
:
Mono<Boolean> process = Flux.concat(tasks)
.reduce(true, (accum, value) -> accum && value);
process.subscribe(System.out::println);
这会正确生成 false
,这表明过程中的一个步骤没有成功(第 2 步)。但是,给出输出,很明显任务3还在执行,这是不正确的:
Executing task 1...
Finished executing task 1.
Executing task 2...
Finished executing task 2.
Executing task 3...
Finished executing task 3.
false
如果我回到同步世界并使用 Mono#block
,我可以获得所需的行为,但这感觉不是很地道:
Mono<Boolean> process = Mono.create(sink -> {
Boolean result = true;
for (Mono<Boolean> task: tasks) {
result = task.block();
if (!result) {
break;
}
}
sink.success(result);
});
process.subscribe(System.out::println);
是否有更好的方法使用 reduce
、map
、flatMap
等内置运算符来执行此操作?
一种方法是在任务不成功时接收错误:
Mono<Boolean> task2 = Mono.create(sink -> {
try {
//...
// This task is NOT successful
sink.error(new FailedTaskException());
} catch (InterruptedException e) {
sink.error(e);
}
});
然后使用.onErrorResume
运算符恢复:
List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);
Mono<Boolean> process = Flux.concat(tasks)
.onErrorResume(FailedTaskException.class, (t) -> Mono.just(false))
.reduce(true, (accum, value) -> accum && value);
您可以使用 takeUntil
根据给定条件停止处理:
Mono<Boolean> process = Flux.concat(tasks)
.takeUntil(x -> !x)
.reduce(true, (accum, value) -> accum && value);
我有一个按顺序执行的任务列表,如果任务成功则返回 true
,如果失败则返回 false
(非例外情况 - 第三种情况它遇到了某种异常,它也应该中止处理,但我想将其视为不同的第三种情况。
这些任务表示为 Mono<Boolean>
个实例的列表。可能有任何数量。从这个列表中,我想生成一个具有以下特征的 Mono<Boolean>
:
- 它按顺序执行每个任务。
- 如果列表 returns
false
中的任何任务或遇到异常,它应该中止任何进一步任务的处理。 - 它returns
true
如果所有任务都成功;false
如果任何任务非异常失败;如果任何任务遇到异常,它应该发出错误。
这是 3 个任务的示例列表 - 第一个成功,第二个失败 - 所以我不希望第三个 运行:
Mono<Boolean> task1 = Mono.create(sink -> {
try {
System.out.println("Executing task 1...");
// Simulate some work being done
Thread.sleep(1000);
System.out.println("Finished executing task 1.");
// This task is successful, return true
sink.success(true);
} catch (InterruptedException e) {
sink.error(e);
}
});
Mono<Boolean> task2 = Mono.create(sink -> {
try {
System.out.println("Executing task 2...");
// Simulate some work being done
Thread.sleep(1000);
System.out.println("Finished executing task 2.");
// This task is NOT successful, return false
sink.success(false);
} catch (InterruptedException e) {
sink.error(e);
}
});
Mono<Boolean> task3 = Mono.create(sink -> {
try {
System.out.println("Executing task 3...");
// Simulate some work being done
Thread.sleep(1000);
System.out.println("Finished executing task 3.");
// This task is successful, return true
sink.success(true);
} catch (InterruptedException e) {
sink.error(e);
}
});
List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);
我的第一个想法是将 Mono
的列表转换为 Flux
,然后使用 Flux#reduce
:
Mono<Boolean> process = Flux.concat(tasks)
.reduce(true, (accum, value) -> accum && value);
process.subscribe(System.out::println);
这会正确生成 false
,这表明过程中的一个步骤没有成功(第 2 步)。但是,给出输出,很明显任务3还在执行,这是不正确的:
Executing task 1...
Finished executing task 1.
Executing task 2...
Finished executing task 2.
Executing task 3...
Finished executing task 3.
false
如果我回到同步世界并使用 Mono#block
,我可以获得所需的行为,但这感觉不是很地道:
Mono<Boolean> process = Mono.create(sink -> {
Boolean result = true;
for (Mono<Boolean> task: tasks) {
result = task.block();
if (!result) {
break;
}
}
sink.success(result);
});
process.subscribe(System.out::println);
是否有更好的方法使用 reduce
、map
、flatMap
等内置运算符来执行此操作?
一种方法是在任务不成功时接收错误:
Mono<Boolean> task2 = Mono.create(sink -> {
try {
//...
// This task is NOT successful
sink.error(new FailedTaskException());
} catch (InterruptedException e) {
sink.error(e);
}
});
然后使用.onErrorResume
运算符恢复:
List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);
Mono<Boolean> process = Flux.concat(tasks)
.onErrorResume(FailedTaskException.class, (t) -> Mono.just(false))
.reduce(true, (accum, value) -> accum && value);
您可以使用 takeUntil
根据给定条件停止处理:
Mono<Boolean> process = Flux.concat(tasks)
.takeUntil(x -> !x)
.reduce(true, (accum, value) -> accum && value);