Flux.range 一旦达到 256 个元素,等待发出更多元素
Flux.range waits to emit more element once 256 elements are reached
我写了这段代码:
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000))
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
当运行时,第一个System.out.println
表明Flux在第256个元素停止发射数字,然后等待旧元素完成再发射新元素。
为什么会这样?
为什么是 256?
为什么会这样?
flatMap
运算符可以描述为运算符(改写自 javadoc):
- 订阅其内部热切
- 不保留元素的顺序。
- 让来自不同内部的值交错。
对于这个问题,第一点很重要。 Project Reactor 限制了
in-flight 内部 序列数 concurrency
参数。
虽然 flatMap(mapper)
uses the default parameter the flatMap(mapper, concurrency)
重载显式接受此参数。
flatMap
s javadoc 将参数描述为:
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel
使用 concurrency
= 500
考虑以下代码
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000)),
500
// ^^^^^^^^^^
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
在这种情况下没有等待:
i = 297
i = 298
i = 299
end 0
end 1
end 2
相比之下,如果将 1
作为 concurrency
传递,输出将类似于:
i = 0
end 0
i = 1
end 1
在发出下一个元素之前等待一秒钟。
为什么是 256?
256 是 默认 并发值 flatMap
。
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
我写了这段代码:
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000))
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
当运行时,第一个System.out.println
表明Flux在第256个元素停止发射数字,然后等待旧元素完成再发射新元素。
为什么会这样?
为什么是 256?
为什么会这样?
flatMap
运算符可以描述为运算符(改写自 javadoc):
- 订阅其内部热切
- 不保留元素的顺序。
- 让来自不同内部的值交错。
对于这个问题,第一点很重要。 Project Reactor 限制了
in-flight 内部 序列数 concurrency
参数。
虽然 flatMap(mapper)
uses the default parameter the flatMap(mapper, concurrency)
重载显式接受此参数。
flatMap
s javadoc 将参数描述为:
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel
使用 concurrency
= 500
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000)),
500
// ^^^^^^^^^^
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
在这种情况下没有等待:
i = 297
i = 298
i = 299
end 0
end 1
end 2
相比之下,如果将 1
作为 concurrency
传递,输出将类似于:
i = 0
end 0
i = 1
end 1
在发出下一个元素之前等待一秒钟。
为什么是 256?
256 是 默认 并发值 flatMap
。
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));