为什么订阅者在不同情况下请求不同数量的元素?
Why is Subscriber requesting different number of elements in different cases?
我正在学习反应流和发布-订阅实用程序,我正在使用发布者(在我的例子中是 Flux)和订阅者的默认行为。
我有两个场景,都在 Flux 中有相同数量的元素。但是当我分析日志时,onSubscribe 方法请求不同数量的元素(比如在一种情况下请求无限元素,在另一种情况下请求 32 个元素)。
以下是两个案例和日志:
System.out.println("*********Calling MapData************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
//printElements(elements);
System.out.println("-------------------------------------");
System.out.println("Inside Combine Streams");
List<Integer> elems = new ArrayList<>();
Flux.just(10,20,30,40)
.log()
.map(x -> x * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(two, one) -> String.format("First : %d, Second : %d \n", one, two))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
}
});
System.out.println("-------------------------------------");
这是日志:
*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------
因为我没有使用任何定制的订阅者实现,那么为什么在 "MapData" 的情况下,它正在记录 "[info] | request(unbounded)" 并且在""Inside Combine Streams"" 记录 "[info] | request(32)" ?
求推荐。
首先,您应该知道这是预期的行为。
根据您使用的运算符,Reactor 将应用不同的预取策略:
- 有些运算符会使用默认值,例如
32
或 256
- 如果您添加了具有特定值的缓冲运算符,某些安排将使用您提供的值
- Reactor 可以猜测值流是有限的,并且会请求一个无界的值
如果您使用带有 int prefetch
方法参数的运算符变体,或者如果您使用 BaseSubscriber
实现自己的 Subscriber
(它为那个)。
最重要的是,您通常不需要注意那个特定的值;只有当你想为特定数据源优化预取策略时它才有用。
我正在学习反应流和发布-订阅实用程序,我正在使用发布者(在我的例子中是 Flux)和订阅者的默认行为。
我有两个场景,都在 Flux 中有相同数量的元素。但是当我分析日志时,onSubscribe 方法请求不同数量的元素(比如在一种情况下请求无限元素,在另一种情况下请求 32 个元素)。
以下是两个案例和日志:
System.out.println("*********Calling MapData************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
//printElements(elements);
System.out.println("-------------------------------------");
System.out.println("Inside Combine Streams");
List<Integer> elems = new ArrayList<>();
Flux.just(10,20,30,40)
.log()
.map(x -> x * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(two, one) -> String.format("First : %d, Second : %d \n", one, two))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
}
});
System.out.println("-------------------------------------");
这是日志:
*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------
因为我没有使用任何定制的订阅者实现,那么为什么在 "MapData" 的情况下,它正在记录 "[info] | request(unbounded)" 并且在""Inside Combine Streams"" 记录 "[info] | request(32)" ?
求推荐。
首先,您应该知道这是预期的行为。
根据您使用的运算符,Reactor 将应用不同的预取策略:
- 有些运算符会使用默认值,例如
32
或256
- 如果您添加了具有特定值的缓冲运算符,某些安排将使用您提供的值
- Reactor 可以猜测值流是有限的,并且会请求一个无界的值
如果您使用带有 int prefetch
方法参数的运算符变体,或者如果您使用 BaseSubscriber
实现自己的 Subscriber
(它为那个)。
最重要的是,您通常不需要注意那个特定的值;只有当你想为特定数据源优化预取策略时它才有用。