Flux.zip 方法不发射所有元素
Flux.zip method not emitting all elements
我正在使用 Reactive Stream 和 Publishers(Mono 和 Flux),并使用 Flux 的 zip 和 zipWith 方法组合这两个发布器,如下所示:
Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");
Flux.zip(flux1, flux2,
(itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " )
.subscribe(System.out::print);
这是输出:
[ {1} : |A| ] [ {2} : |B| ] [ {3} : |C| ]
AS flux1 有四个元素,flux2 有三个元素,flux1 中的第四个元素丢失了。当我尝试打印通量日志时,没有关于第四个元素发生了什么的信息。
打印日志语句如下:
Flux.zip(flux1, flux2,
(itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " ).log()
.subscribe(System.out::print);
这是使用日志方法的控制台日志:
[info] onSubscribe(FluxZip.ZipCoordinator)
[info] request(unbounded)
[info] onNext([ {1} : |A| ] )
[ {1} : |A| ] [info] onNext([ {2} : |B| ] )
[ {2} : |B| ] [info] onNext([ {3} : |C| ] )
[ {3} : |C| ] [info] onComplete()
从zip方法的文档中,我得到了
The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
但在我的例子中,它没有记录任何错误,也没有记录任何关于丢失元素的消息。
如何获取丢失元素的信息?
求推荐。
这是此操作员的预期行为。
对于 Flux.zip
,提供的 Flux
之一可能是无限的;一个常见的例子是使用 Flux.interval(Duration duration)
实例(无限)压缩 Flux
数据。
如果您遇到这种情况,可能意味着您需要使用不同的运算符。
解释文档的内容
错误将立即被转发。 --> 意思是如果组合器函数中有错误,那么它会被立即转发。您可以通过将任一 Flux 中的条目之一设置为空来检查这一点。
无法获取丢失的元素。因为当 Stream 之一结束时,不会进一步读取流。希望它是清楚的。如果你真的想获取流的最后一个元素,请尝试其他运算符。
zip
/zipWith
将输出与最短 Flux
中的元素 一样多的对。它在最小的终止时取消较长的 Flux
,如果您将 log()
放在源 Flux
而不是压缩的
上,它应该是可见的
这个片段证明了这一点(它被调整为显示 1-by-1 请求和 运行 作为单元测试,因此 hide()
/zipWith(..., 1)
和 blockLast()
):
@Test
public void test() {
Flux<Integer> flux1 = Flux.range(1, 4).hide().log("\tFLUX 1");
Flux<Integer> flux2 = Flux.range(10, 2).hide().log("\tFlux 2");
flux1.zipWith(flux2, 1)
.log("zipped")
.blockLast();
}
输出:
11:57:21.072 [main] INFO zipped - onSubscribe(FluxZip.ZipCoordinator)
11:57:21.077 [main] INFO zipped - request(unbounded)
11:57:21.079 [main] INFO FLUX 1 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.079 [main] INFO FLUX 1 - request(1)
11:57:21.079 [main] INFO FLUX 1 - onNext(1)
11:57:21.079 [main] INFO Flux 2 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onNext(10)
11:57:21.080 [main] INFO zipped - onNext([1,10])
11:57:21.080 [main] INFO FLUX 1 - request(1)
11:57:21.080 [main] INFO FLUX 1 - onNext(2)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onNext(11)
11:57:21.080 [main] INFO zipped - onNext([2,11])
11:57:21.080 [main] INFO FLUX 1 - request(1)
11:57:21.080 [main] INFO FLUX 1 - onNext(3)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onComplete()
11:57:21.081 [main] INFO FLUX 1 - cancel() <----- HERE
11:57:21.081 [main] INFO Flux 2 - cancel()
11:57:21.081 [main] INFO zipped - onComplete()
这是"until any of the sources completes"
部分。
我正在使用 Reactive Stream 和 Publishers(Mono 和 Flux),并使用 Flux 的 zip 和 zipWith 方法组合这两个发布器,如下所示:
Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");
Flux.zip(flux1, flux2,
(itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " )
.subscribe(System.out::print);
这是输出:
[ {1} : |A| ] [ {2} : |B| ] [ {3} : |C| ]
AS flux1 有四个元素,flux2 有三个元素,flux1 中的第四个元素丢失了。当我尝试打印通量日志时,没有关于第四个元素发生了什么的信息。
打印日志语句如下:
Flux.zip(flux1, flux2,
(itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " ).log()
.subscribe(System.out::print);
这是使用日志方法的控制台日志:
[info] onSubscribe(FluxZip.ZipCoordinator)
[info] request(unbounded)
[info] onNext([ {1} : |A| ] )
[ {1} : |A| ] [info] onNext([ {2} : |B| ] )
[ {2} : |B| ] [info] onNext([ {3} : |C| ] )
[ {3} : |C| ] [info] onComplete()
从zip方法的文档中,我得到了
The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
但在我的例子中,它没有记录任何错误,也没有记录任何关于丢失元素的消息。
如何获取丢失元素的信息?
求推荐。
这是此操作员的预期行为。
对于 Flux.zip
,提供的 Flux
之一可能是无限的;一个常见的例子是使用 Flux.interval(Duration duration)
实例(无限)压缩 Flux
数据。
如果您遇到这种情况,可能意味着您需要使用不同的运算符。
解释文档的内容
错误将立即被转发。 --> 意思是如果组合器函数中有错误,那么它会被立即转发。您可以通过将任一 Flux 中的条目之一设置为空来检查这一点。
无法获取丢失的元素。因为当 Stream 之一结束时,不会进一步读取流。希望它是清楚的。如果你真的想获取流的最后一个元素,请尝试其他运算符。
zip
/zipWith
将输出与最短 Flux
中的元素 一样多的对。它在最小的终止时取消较长的 Flux
,如果您将 log()
放在源 Flux
而不是压缩的
这个片段证明了这一点(它被调整为显示 1-by-1 请求和 运行 作为单元测试,因此 hide()
/zipWith(..., 1)
和 blockLast()
):
@Test
public void test() {
Flux<Integer> flux1 = Flux.range(1, 4).hide().log("\tFLUX 1");
Flux<Integer> flux2 = Flux.range(10, 2).hide().log("\tFlux 2");
flux1.zipWith(flux2, 1)
.log("zipped")
.blockLast();
}
输出:
11:57:21.072 [main] INFO zipped - onSubscribe(FluxZip.ZipCoordinator)
11:57:21.077 [main] INFO zipped - request(unbounded)
11:57:21.079 [main] INFO FLUX 1 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.079 [main] INFO FLUX 1 - request(1)
11:57:21.079 [main] INFO FLUX 1 - onNext(1)
11:57:21.079 [main] INFO Flux 2 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onNext(10)
11:57:21.080 [main] INFO zipped - onNext([1,10])
11:57:21.080 [main] INFO FLUX 1 - request(1)
11:57:21.080 [main] INFO FLUX 1 - onNext(2)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onNext(11)
11:57:21.080 [main] INFO zipped - onNext([2,11])
11:57:21.080 [main] INFO FLUX 1 - request(1)
11:57:21.080 [main] INFO FLUX 1 - onNext(3)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onComplete()
11:57:21.081 [main] INFO FLUX 1 - cancel() <----- HERE
11:57:21.081 [main] INFO Flux 2 - cancel()
11:57:21.081 [main] INFO zipped - onComplete()
这是"until any of the sources completes"
部分。