Reactor 的 Flux Fan out 和 Zip 吗?
Reactor's Flux Fan out and Zip it?
我正在尝试对只能读取一次的 Flux/Publisher 执行以下操作(想想只能读取一次的数据库结果)。但是,这个问题足够通用,可以在没有反应器知识的情况下在函数式编程上下文中回答。
- 计数独特的项目
- 检查元素是否存在
- 不要多次调用 publisher/flux 生成器。
distinctAndHasElement(4, Flux.just(1,2,3,3,4,4,5));
Mono<Pair<Long, Boolean>> distinctAndHasElement(int toCheck, Flux<Integer> intsFlux) {
// Code that doesn't work, Due to use of non final local variable
boolean found = false;
return intsFlux.map(x -> {
if (toCheck == x) {
found = true;
}
return x;
})
.distinct()
.count()
.map(x -> Pair.of(x, found));
}
我们只需要能够扇出 2 个对相同 type/domain 进行操作的函数,并压缩最终结果。
由于约束#3,以下内容无法正常工作
Flux<Integer> distinct = intsFlux.distinct();
Mono<Boolean> found = distinct.hasElement(toCheck);
Mono<Long> count = distinct.count();
return Mono.zip(count, found);
您尝试做的是缩减 数据集。这意味着您试图通过合并初始元素来创建单个结果。
请注意 count 可以被视为一种减少,在您的情况下,您需要一种高级计数操作,它还检查 至少一个输入元素等于给定值.
使用反应器(以及许多其他流框架),您可以使用 reduce 运算符。
让我们用它来尝试您的第一个示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class CountAndCheck {
static Mono<Tuple2<Long, Boolean>> distinctAndHasElement(int toCheck, Flux<Integer> intsFlux) {
return intsFlux
.distinct()
.reduce(Tuples.of(0L, false), (intermediateResult, nextElement) -> {
return Tuples.of(intermediateResult.getT1() + 1L, intermediateResult.getT2() || toCheck == nextElement);
});
}
public static void main(String[] args) {
System.out.println(distinctAndHasElement(2, Flux.just(1, 2, 2, 3, 4, 4)).block());
}
}
以上程序打印:[4,true]
注意:您可以使用 scan 运算符代替归约,以获得归约操作中每个中间步骤的通量。了解减少的执行方式可能很有用。
您可以按照 documentation 中的说明广播您的 Flux
。
Flux<Integer> distinct = intsFlux.distinct().publish().autoConnect(2);
Mono<Boolean> found = distinct.hasElement(toCheck);
Mono<Long> count = distinct.count();
return Mono.zip(count, found);
我正在尝试对只能读取一次的 Flux/Publisher 执行以下操作(想想只能读取一次的数据库结果)。但是,这个问题足够通用,可以在没有反应器知识的情况下在函数式编程上下文中回答。
- 计数独特的项目
- 检查元素是否存在
- 不要多次调用 publisher/flux 生成器。
distinctAndHasElement(4, Flux.just(1,2,3,3,4,4,5));
Mono<Pair<Long, Boolean>> distinctAndHasElement(int toCheck, Flux<Integer> intsFlux) {
// Code that doesn't work, Due to use of non final local variable
boolean found = false;
return intsFlux.map(x -> {
if (toCheck == x) {
found = true;
}
return x;
})
.distinct()
.count()
.map(x -> Pair.of(x, found));
}
我们只需要能够扇出 2 个对相同 type/domain 进行操作的函数,并压缩最终结果。
由于约束#3,以下内容无法正常工作
Flux<Integer> distinct = intsFlux.distinct();
Mono<Boolean> found = distinct.hasElement(toCheck);
Mono<Long> count = distinct.count();
return Mono.zip(count, found);
您尝试做的是缩减 数据集。这意味着您试图通过合并初始元素来创建单个结果。
请注意 count 可以被视为一种减少,在您的情况下,您需要一种高级计数操作,它还检查 至少一个输入元素等于给定值.
使用反应器(以及许多其他流框架),您可以使用 reduce 运算符。
让我们用它来尝试您的第一个示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class CountAndCheck {
static Mono<Tuple2<Long, Boolean>> distinctAndHasElement(int toCheck, Flux<Integer> intsFlux) {
return intsFlux
.distinct()
.reduce(Tuples.of(0L, false), (intermediateResult, nextElement) -> {
return Tuples.of(intermediateResult.getT1() + 1L, intermediateResult.getT2() || toCheck == nextElement);
});
}
public static void main(String[] args) {
System.out.println(distinctAndHasElement(2, Flux.just(1, 2, 2, 3, 4, 4)).block());
}
}
以上程序打印:[4,true]
注意:您可以使用 scan 运算符代替归约,以获得归约操作中每个中间步骤的通量。了解减少的执行方式可能很有用。
您可以按照 documentation 中的说明广播您的 Flux
。
Flux<Integer> distinct = intsFlux.distinct().publish().autoConnect(2);
Mono<Boolean> found = distinct.hasElement(toCheck);
Mono<Long> count = distinct.count();
return Mono.zip(count, found);