Flux.range 一旦达到 256 个元素,等待发出更多元素

Flux.range waits to emit more element once 256 elements are reached

我写了这段代码:

Flux.range(0, 300)
            .doOnNext(i -> System.out.println("i = " + i))
            .flatMap(i -> Mono.just(i)
                            .subscribeOn(Schedulers.elastic())
                            .delayElement(Duration.ofMillis(1000))
            )
            .doOnNext(i -> System.out.println("end " + i))
            .blockLast();

当运行时,第一个System.out.println表明Flux在第256个元素停止发射数字,然后等待旧元素完成再发射新元素。

为什么会这样?
为什么是 256?

为什么会这样?

flatMap 运算符可以描述为运算符(改写自 javadoc):

  1. 订阅其内部热切
  2. 不保留元素的顺序。
  3. 让来自不同内部的值交错。

对于这个问题,第一点很重要。 Project Reactor 限制了 in-flight 内部 序列数 concurrency 参数。

虽然 flatMap(mapper) uses the default parameter the flatMap(mapper, concurrency) 重载显式接受此参数。

flatMaps javadoc 将参数描述为:

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel

使用 concurrency = 500

考虑以下代码
Flux.range(0, 300)
        .doOnNext(i -> System.out.println("i = " + i))
        .flatMap(i -> Mono.just(i)
                        .subscribeOn(Schedulers.elastic())
                        .delayElement(Duration.ofMillis(1000)),
                500
//         ^^^^^^^^^^
        )
        .doOnNext(i -> System.out.println("end " + i))
        .blockLast();

在这种情况下没有等待:

i = 297
i = 298
i = 299
end 0
end 1
end 2

相比之下,如果将 1 作为 concurrency 传递,输出将类似于:

i = 0
end 0
i = 1
end 1

在发出下一个元素之前等待一秒钟。

为什么是 256?

256 是 默认 并发值 flatMap

看看Queues.SMALL_BUFFER_SIZE:

public static final int SMALL_BUFFER_SIZE = Math.max(16,
        Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));