Reactor 的 Flux Fan out 和 Zip 吗?

Reactor's Flux Fan out and Zip it?

我正在尝试对只能读取一次的 Flux/Publisher 执行以下操作(想想只能读取一次的数据库结果)。但是,这个问题足够通用,可以在没有反应器知识的情况下在函数式编程上下文中回答。

  1. 计数独特的项目
  2. 检查元素是否存在
  3. 不要多次调用 publisher/flux 生成器。
distinctAndHasElement(4, Flux.just(1,2,3,3,4,4,5));


Mono<Pair<Long, Boolean>> distinctAndHasElement(int toCheck, Flux<Integer> intsFlux) {
   // Code that doesn't work, Due to use of non final local variable
   boolean found = false;
   return intsFlux.map(x -> {
            if (toCheck == x) {
                found = true;
            }
            return x;
       })
       .distinct()
       .count()
       .map(x -> Pair.of(x, found));
}

我们只需要能够扇出 2 个对相同 type/domain 进行操作的函数,并压缩最终结果。

由于约束#3,以下内容无法正常工作

Flux<Integer> distinct = intsFlux.distinct();
Mono<Boolean> found = distinct.hasElement(toCheck);
Mono<Long> count = distinct.count();
return Mono.zip(count, found);

您尝试做的是缩减 数据集。这意味着您试图通过合并初始元素来创建单个结果。

请注意 count 可以被视为一种减少,在您的情况下,您需要一种高级计数操作,它还检查 至少一个输入元素等于给定值.

使用反应器(以及许多其他流框架),您可以使用 reduce 运算符。

让我们用它来尝试您的第一个示例:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class CountAndCheck {
    static Mono<Tuple2<Long, Boolean>> distinctAndHasElement(int toCheck, Flux<Integer> intsFlux) {
        return intsFlux
                .distinct()
                .reduce(Tuples.of(0L, false), (intermediateResult, nextElement) -> {
                    return Tuples.of(intermediateResult.getT1() + 1L, intermediateResult.getT2() ||  toCheck == nextElement);
                });
    }

    public static void main(String[] args) {
        System.out.println(distinctAndHasElement(2, Flux.just(1, 2, 2, 3, 4, 4)).block());
    }
}

以上程序打印:[4,true]

注意:您可以使用 scan 运算符代替归约,以获得归约操作中每个中间步骤的通量。了解减少的执行方式可能很有用。

您可以按照 documentation 中的说明广播您的 Flux

Flux<Integer> distinct = intsFlux.distinct().publish().autoConnect(2);
Mono<Boolean> found = distinct.hasElement(toCheck);
Mono<Long> count = distinct.count();
return Mono.zip(count, found);