是否有限制数量的GroupedFlux<T>创建groupBy算子

Is there a limited number of GroupedFlux<T> created groupBy operator

根据 document reference,groupBy 运算符根据运算符的 keymapper 函数将给定的 Flux 拆分为多个 GroupedFlux。 如果我使用 257 个整数范围执行以下代码,它可以正常工作,但不能使用 258

    public void groupByTest() {
    Flux.range(1, 258)
            .groupBy(val -> val)
            .concatMap(g -> g.map(val -> val + "test"))
            .doOnNext(System.out::println)
            .blockLast();
}

这是否意味着 groupBy 运算符不能创建超过 257 个组?

groupBy javadoc 中所述:

The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

这意味着一旦发出一个组,groupBy 需要获得更多请求才能取得进展。默认情况下,它最多打开 256 个组,然后它需要更多请求或检测组是否完整。并且 groupBy 不能 "know" 如果一个组是完整的,直到:

  • A) 该组被取消(在这种情况下,如果稍​​后出现具有相同键的值,它将重新创建一个新组)
  • B) 源已被完全处理(仅当源小于 256 个元素时才会发生,默认的 groupBy prefetch,或者如果 groupBy 从中接收到 onComplete 信号来源)

val -> val 标准和 concatMap 均不符合这些要求。

groupBy 标准最终会生成与值一样多的组。这里有 258 个组,而 groupBy 的默认容量可以跟踪 256 个组。

Note: If the whole sequence starts less than 256 groups, it would work fine. Try setting the criteria to val -> val % 2 and see that it works. Then try to bump the range to range(1, 513) and see how it hangs again.

由于 concatMap 的工作方式,上次测试限制为 512 个元素。

concatMap 在我们的例子中尤其糟糕,因为它只会订阅下一组并在第一组完成时取得进展 。这与上面的条件 B) 冲突,造成 groupByconcatMap 都无法取得进展的情况。

Note: In the small example with 513, concatMap would start consuming group 1 and wait for it to complete before it consumes group 2. BUT groupBy stops emitting once it has fetched 256 elements for group 1 and then waits for downstream to start consuming group 2. As a result, it has just too few data to detect that the group is complete, concatMap waits for that completion signal and never subscribes to group 2, hanging the whole thing.

Using a flatMap would fix that, because flatMap will subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.