Flux.zip 方法不发射所有元素

Flux.zip method not emitting all elements

我正在使用 Reactive Stream 和 Publishers(Mono 和 Flux),并使用 Flux 的 zip 和 zipWith 方法组合这两个发布器,如下所示:

Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");
Flux.zip(flux1, flux2,
                    (itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " )
            .subscribe(System.out::print);

这是输出:

[  {1} : |A| ] [ {2} : |B|  ] [ {3} : |C|  ] 

AS flux1 有四个元素,flux2 有三个元素,flux1 中的第四个元素丢失了。当我尝试打印通量日志时,没有关于第四个元素发生了什么的信息。

打印日志语句如下:

Flux.zip(flux1, flux2,
                (itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " ).log()
        .subscribe(System.out::print);

这是使用日志方法的控制台日志:

[info] onSubscribe(FluxZip.ZipCoordinator)
[info] request(unbounded)
[info] onNext([  {1} : |A| ] )
[  {1} : |A| ] [info] onNext([ {2} : |B|  ] )
[ {2} : |B|  ] [info] onNext([ {3} : |C|  ] )
[ {3} : |C|  ] [info] onComplete()

从zip方法的文档中,我得到了

The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

但在我的例子中,它没有记录任何错误,也没有记录任何关于丢失元素的消息。

如何获取丢失元素的信息?

求推荐。

这是此操作员的预期行为。 对于 Flux.zip,提供的 Flux 之一可能是无限的;一个常见的例子是使用 Flux.interval(Duration duration) 实例(无限)压缩 Flux 数据。

如果您遇到这种情况,可能意味着您需要使用不同的运算符。

解释文档的内容
错误将立即被转发。 --> 意思是如果组合器函数中有错误,那么它会被立即转发。您可以通过将任一 Flux 中的条目之一设置为空来检查这一点。

无法获取丢失的元素。因为当 Stream 之一结束时,不会进一步读取流。希望它是清楚的。如果你真的想获取流的最后一个元素,请尝试其他运算符。

zip/zipWith 将输出与最短 Flux 中的元素 一样多的对。它在最小的终止时取消较长的 Flux,如果您将 log() 放在源 Flux 而不是压缩的

上,它应该是可见的

这个片段证明了这一点(它被调整为显示 1-by-1 请求和 运行 作为单元测试,因此 hide()/zipWith(..., 1)blockLast()):

@Test
public void test() {
    Flux<Integer> flux1 = Flux.range(1, 4).hide().log("\tFLUX 1");
    Flux<Integer> flux2 = Flux.range(10, 2).hide().log("\tFlux 2");

    flux1.zipWith(flux2, 1)
        .log("zipped")
        .blockLast();
}

输出:

11:57:21.072 [main] INFO  zipped - onSubscribe(FluxZip.ZipCoordinator)
11:57:21.077 [main] INFO  zipped - request(unbounded)
11:57:21.079 [main] INFO    FLUX 1 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.079 [main] INFO    FLUX 1 - request(1)
11:57:21.079 [main] INFO    FLUX 1 - onNext(1)
11:57:21.079 [main] INFO    Flux 2 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.080 [main] INFO    Flux 2 - request(1)
11:57:21.080 [main] INFO    Flux 2 - onNext(10)
11:57:21.080 [main] INFO  zipped - onNext([1,10])
11:57:21.080 [main] INFO    FLUX 1 - request(1)
11:57:21.080 [main] INFO    FLUX 1 - onNext(2)
11:57:21.080 [main] INFO    Flux 2 - request(1)
11:57:21.080 [main] INFO    Flux 2 - onNext(11)
11:57:21.080 [main] INFO  zipped - onNext([2,11])
11:57:21.080 [main] INFO    FLUX 1 - request(1)
11:57:21.080 [main] INFO    FLUX 1 - onNext(3)
11:57:21.080 [main] INFO    Flux 2 - request(1)
11:57:21.080 [main] INFO    Flux 2 - onComplete()
11:57:21.081 [main] INFO    FLUX 1 - cancel() <----- HERE
11:57:21.081 [main] INFO    Flux 2 - cancel()
11:57:21.081 [main] INFO  zipped - onComplete()

这是"until any of the sources completes"部分。