依次将 List<Mono<Boolean>> 缩减为单个 Mono<Boolean>,如果有 return false,则中止处理

Sequentially reduce List<Mono<Boolean>> to single Mono<Boolean>, and abort processing if any return false

我有一个按顺序执行的任务列表,如果任务成功则返回 true,如果失败则返回 false(非例外情况 - 第三种情况它遇到了某种异常,它也应该中止处理,但我想将其视为不同的第三种情况。

这些任务表示为 Mono<Boolean> 个实例的列表。可能有任何数量。从这个列表中,我想生成一个具有以下特征的 Mono<Boolean>

这是 3 个任务的示例列表 - 第一个成功,第二个失败 - 所以我不希望第三个 运行:

Mono<Boolean> task1 = Mono.create(sink -> {
    try {
        System.out.println("Executing task 1...");

        // Simulate some work being done
        Thread.sleep(1000);

        System.out.println("Finished executing task 1.");

        // This task is successful, return true
        sink.success(true);
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

Mono<Boolean> task2 = Mono.create(sink -> {
    try {
        System.out.println("Executing task 2...");

        // Simulate some work being done
        Thread.sleep(1000);

        System.out.println("Finished executing task 2.");

        // This task is NOT successful, return false
        sink.success(false);
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

Mono<Boolean> task3 = Mono.create(sink -> {
    try {
        System.out.println("Executing task 3...");

        // Simulate some work being done
        Thread.sleep(1000);

        System.out.println("Finished executing task 3.");

        // This task is successful, return true
        sink.success(true);
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);

我的第一个想法是将 Mono 的列表转换为 Flux,然后使用 Flux#reduce:

Mono<Boolean> process = Flux.concat(tasks)
    .reduce(true, (accum, value) -> accum && value);

process.subscribe(System.out::println);

这会正确生成 false,这表明过程中的一个步骤没有成功(第 2 步)。但是,给出输出,很明显任务3还在执行,这是不正确的:

Executing task 1...
Finished executing task 1.
Executing task 2...
Finished executing task 2.
Executing task 3...
Finished executing task 3.
false

如果我回到同步世界并使用 Mono#block,我可以获得所需的行为,但这感觉不是很地道:

Mono<Boolean> process = Mono.create(sink -> {
    Boolean result = true;

    for (Mono<Boolean> task: tasks) {
        result = task.block();

        if (!result) {
            break;
        }
    }

    sink.success(result);
});

process.subscribe(System.out::println);

是否有更好的方法使用 reducemapflatMap 等内置运算符来执行此操作?

一种方法是在任务不成功时接收错误:

Mono<Boolean> task2 = Mono.create(sink -> {
    try {
        //...   
        // This task is NOT successful
        sink.error(new FailedTaskException());
    } catch (InterruptedException e) {
        sink.error(e);
    }
});

然后使用.onErrorResume运算符恢复:

List<Mono<Boolean>> tasks = Arrays.asList(task1, task2, task3);
    Mono<Boolean> process = Flux.concat(tasks)
            .onErrorResume(FailedTaskException.class, (t) ->  Mono.just(false))
            .reduce(true, (accum, value) -> accum && value);

您可以使用 takeUntil 根据给定条件停止处理:

Mono<Boolean> process = Flux.concat(tasks)
    .takeUntil(x -> !x)
    .reduce(true, (accum, value) -> accum && value);