如何在同一管道中使用多个反应流?
How do I use multiple reactive streams in the same pipeline?
我正在使用 WebFlux
从两个不同的 REST 端点提取数据,并尝试将一个流中的一些数据与另一个相关联。我有 Flux
个名为 events
和 egvs
的实例,对于每个事件,我想找到时间戳最近的 EGV。
final Flux<Tuple2<Double,Object>> data = events
.map(e -> Tuples.of(e.getValue(),
egvs.map(egv -> Tuples.of(egv.getValue(),
Math.abs(Duration.between(e.getDisplayTime(),
egv.getDisplayTime()).toSeconds())))
.sort(Comparator.comparingLong(Tuple2::getT2))
.take(1)
.map(v -> v.getT1())));
当我将 data
发送到我的 Thymeleaf 模板时,元组的第一个元素呈现为数字,如我所料,但第二个元素呈现为 FluxMapFuseable
。管道的 egvs.map(...)
部分似乎没有执行。我如何让那部分管道执行?
更新
谢谢,@Toerktumlare - 你的回答帮助我弄清楚我的方法是错误的。在映射操作的每次迭代中,事件需要 整个 组 EGV 的上下文来找到与之匹配的那个。所以工作代码如下所示:
final Flux<Tuple2<Double, Double>> data =
Flux.zip(events, egvs.collectList().repeat())
.map(t -> Tuples.of(
// Grab the event
t.getT1().getValue(),
// Find the EGV (from the full set of EGVs) with the closest timestamp
t.getT2().stream()
.map(egv -> Tuples.of(
egv.getValue(),
Math.abs(Duration.between(
t.getT1().getDisplayTime(),
egv.getDisplayTime()).toSeconds())))
// Sort the stream of (value, time difference) tuples and
// take the smallest time difference.
.sorted(Comparator.comparingLong(Tuple2::getT2))
.map(Tuple2::getT1)
.findFirst()
.orElse(0.)));
我认为你正在做的是你正在打破反应链。
在 assembly phase
期间,反应器将向后调用每个操作员,直到找到可以开始生产物品的 producer
,我认为你在这里打破了链条:
egvs.map(egv -> Tuples.of( ..., ... )
你看到了 egvs
return 一些你需要注意的东西并链接到 events.map
的 return
我举个例子:
// This works because we always return from flatMap
// we keep the chain intact
Mono.just("foobar").flatMap(f -> {
return Mono.just(f)
}.subscribe(s -> {
System.out.println(s)
});
另一方面,这表现不同:
Mono.just("foobar").flatMap(f -> {
Mono.just("foo").doOnSuccess(s -> { System.out.println("this will never print"); });
return Mono.just(f);
});
因为在这个例子中你可以看到我们忽略了从内部 Mono
处理 return 从而 打破链条 .
你还没有真正透露 evg
到底是什么,所以我无法给你一个完整的答案,但你很可能应该这样做:
final Flux<Tuple2<Double,Object>> data = events
// chain on egv here instead
// and then return your full tuple object instead
.map(e -> egvs.map(egv -> Tuples.of(e.getValue(), Tuples.of(egv.getValue(), Math.abs(Duration.between(e.getDisplayTime(), egv.getDisplayTime()).toSeconds())))
.sort(Comparator.comparingLong(Tuple2::getT2))
.take(1)
.map(v -> v.getT1())));
我没有编译器来检查 atm。但我相信这至少是你的问题。阅读您的代码有点棘手。
我正在使用 WebFlux
从两个不同的 REST 端点提取数据,并尝试将一个流中的一些数据与另一个相关联。我有 Flux
个名为 events
和 egvs
的实例,对于每个事件,我想找到时间戳最近的 EGV。
final Flux<Tuple2<Double,Object>> data = events
.map(e -> Tuples.of(e.getValue(),
egvs.map(egv -> Tuples.of(egv.getValue(),
Math.abs(Duration.between(e.getDisplayTime(),
egv.getDisplayTime()).toSeconds())))
.sort(Comparator.comparingLong(Tuple2::getT2))
.take(1)
.map(v -> v.getT1())));
当我将 data
发送到我的 Thymeleaf 模板时,元组的第一个元素呈现为数字,如我所料,但第二个元素呈现为 FluxMapFuseable
。管道的 egvs.map(...)
部分似乎没有执行。我如何让那部分管道执行?
更新
谢谢,@Toerktumlare - 你的回答帮助我弄清楚我的方法是错误的。在映射操作的每次迭代中,事件需要 整个 组 EGV 的上下文来找到与之匹配的那个。所以工作代码如下所示:
final Flux<Tuple2<Double, Double>> data =
Flux.zip(events, egvs.collectList().repeat())
.map(t -> Tuples.of(
// Grab the event
t.getT1().getValue(),
// Find the EGV (from the full set of EGVs) with the closest timestamp
t.getT2().stream()
.map(egv -> Tuples.of(
egv.getValue(),
Math.abs(Duration.between(
t.getT1().getDisplayTime(),
egv.getDisplayTime()).toSeconds())))
// Sort the stream of (value, time difference) tuples and
// take the smallest time difference.
.sorted(Comparator.comparingLong(Tuple2::getT2))
.map(Tuple2::getT1)
.findFirst()
.orElse(0.)));
我认为你正在做的是你正在打破反应链。
在 assembly phase
期间,反应器将向后调用每个操作员,直到找到可以开始生产物品的 producer
,我认为你在这里打破了链条:
egvs.map(egv -> Tuples.of( ..., ... )
你看到了 egvs
return 一些你需要注意的东西并链接到 events.map
我举个例子:
// This works because we always return from flatMap
// we keep the chain intact
Mono.just("foobar").flatMap(f -> {
return Mono.just(f)
}.subscribe(s -> {
System.out.println(s)
});
另一方面,这表现不同:
Mono.just("foobar").flatMap(f -> {
Mono.just("foo").doOnSuccess(s -> { System.out.println("this will never print"); });
return Mono.just(f);
});
因为在这个例子中你可以看到我们忽略了从内部 Mono
处理 return 从而 打破链条 .
你还没有真正透露 evg
到底是什么,所以我无法给你一个完整的答案,但你很可能应该这样做:
final Flux<Tuple2<Double,Object>> data = events
// chain on egv here instead
// and then return your full tuple object instead
.map(e -> egvs.map(egv -> Tuples.of(e.getValue(), Tuples.of(egv.getValue(), Math.abs(Duration.between(e.getDisplayTime(), egv.getDisplayTime()).toSeconds())))
.sort(Comparator.comparingLong(Tuple2::getT2))
.take(1)
.map(v -> v.getT1())));
我没有编译器来检查 atm。但我相信这至少是你的问题。阅读您的代码有点棘手。