如何在同一管道中使用多个反应流?

How do I use multiple reactive streams in the same pipeline?

我正在使用 WebFlux 从两个不同的 REST 端点提取数据,并尝试将一个流中的一些数据与另一个相关联。我有 Flux 个名为 eventsegvs 的实例,对于每个事件,我想找到时间戳最近的 EGV。

final Flux<Tuple2<Double,Object>> data = events
        .map(e -> Tuples.of(e.getValue(),
                egvs.map(egv -> Tuples.of(egv.getValue(),
                                Math.abs(Duration.between(e.getDisplayTime(),
                                        egv.getDisplayTime()).toSeconds())))
                        .sort(Comparator.comparingLong(Tuple2::getT2))
                        .take(1)
                        .map(v -> v.getT1())));

当我将 data 发送到我的 Thymeleaf 模板时,元组的第一个元素呈现为数字,如我所料,但第二个元素呈现为 FluxMapFuseable。管道的 egvs.map(...) 部分似乎没有执行。我如何让那部分管道执行?

更新

谢谢,@Toerktumlare - 你的回答帮助我弄清楚我的方法是错误的。在映射操作的每次迭代中,事件需要 整个 组 EGV 的上下文来找到与之匹配的那个。所以工作代码如下所示:

final Flux<Tuple2<Double, Double>> data =
        Flux.zip(events, egvs.collectList().repeat())
                .map(t -> Tuples.of(
                        // Grab the event
                        t.getT1().getValue(),
                        // Find the EGV (from the full set of EGVs) with the closest timestamp
                        t.getT2().stream()
                                .map(egv -> Tuples.of(
                                        egv.getValue(),
                                        Math.abs(Duration.between(
                                                        t.getT1().getDisplayTime(),
                                                egv.getDisplayTime()).toSeconds())))
                                // Sort the stream of (value, time difference) tuples and
                                // take the smallest time difference.
                                .sorted(Comparator.comparingLong(Tuple2::getT2))
                                .map(Tuple2::getT1)
                                .findFirst()
                                .orElse(0.)));

我认为你正在做的是你正在打破反应链。

assembly phase 期间,反应器将向后调用每个操作员,直到找到可以开始生产物品的 producer,我认为你在这里打破了链条:

egvs.map(egv -> Tuples.of( ..., ... )

你看到了 egvs return 一些你需要注意的东西并链接到 events.map

的 return

我举个例子:

// This works because we always return from flatMap
// we keep the chain intact
Mono.just("foobar").flatMap(f -> {
    return Mono.just(f)
}.subscribe(s -> {
    System.out.println(s)
});

另一方面,这表现不同:

Mono.just("foobar").flatMap(f -> {
    Mono.just("foo").doOnSuccess(s -> { System.out.println("this will never print"); });
    return Mono.just(f);
});

因为在这个例子中你可以看到我们忽略了从内部 Mono 处理 return 从而 打破链条 .

你还没有真正透露 evg 到底是什么,所以我无法给你一个完整的答案,但你很可能应该这样做:

final Flux<Tuple2<Double,Object>> data = events
            // chain on egv here instead
            // and then return your full tuple object instead
            .map(e -> egvs.map(egv -> Tuples.of(e.getValue(), Tuples.of(egv.getValue(), Math.abs(Duration.between(e.getDisplayTime(), egv.getDisplayTime()).toSeconds())))
            .sort(Comparator.comparingLong(Tuple2::getT2))
            .take(1)
            .map(v -> v.getT1())));

我没有编译器来检查 atm。但我相信这至少是你的问题。阅读您的代码有点棘手。