是否有限制数量的GroupedFlux<T>创建groupBy算子
Is there a limited number of GroupedFlux<T> created groupBy operator
根据 document reference,groupBy 运算符根据运算符的 keymapper 函数将给定的 Flux 拆分为多个 GroupedFlux。
如果我使用 257 个整数范围执行以下代码,它可以正常工作,但不能使用 258
public void groupByTest() {
Flux.range(1, 258)
.groupBy(val -> val)
.concatMap(g -> g.map(val -> val + "test"))
.doOnNext(System.out::println)
.blockLast();
}
这是否意味着 groupBy 运算符不能创建超过 257 个组?
如 groupBy
javadoc 中所述:
The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap
with a maxConcurrency
parameter that is set too low).
这意味着一旦发出一个组,groupBy
需要获得更多请求才能取得进展。默认情况下,它最多打开 256 个组,然后它需要更多请求或检测组是否完整。并且 groupBy
不能 "know" 如果一个组是完整的,直到:
- A) 该组被取消(在这种情况下,如果稍后出现具有相同键的值,它将重新创建一个新组)
- B) 源已被完全处理(仅当源小于 256 个元素时才会发生,默认的 groupBy
prefetch
,或者如果 groupBy
从中接收到 onComplete
信号来源)
val -> val
标准和 concatMap
均不符合这些要求。
groupBy
标准最终会生成与值一样多的组。这里有 258 个组,而 groupBy
的默认容量可以跟踪 256 个组。
Note: If the whole sequence starts less than 256 groups, it would work fine.
Try setting the criteria to val -> val % 2
and see that it works. Then try to bump the range to range(1, 513)
and see how it hangs again.
由于 concatMap
的工作方式,上次测试限制为 512 个元素。
concatMap
在我们的例子中尤其糟糕,因为它只会订阅下一组并在第一组完成时取得进展 。这与上面的条件 B) 冲突,造成 groupBy
和 concatMap
都无法取得进展的情况。
Note: In the small example with 513, concatMap
would start consuming group
1 and wait for it to complete before it consumes group 2. BUT
groupBy
stops emitting once it has fetched 256 elements for group 1
and then waits for downstream to start consuming group 2. As a result,
it has just too few data to detect that the group is complete,
concatMap
waits for that completion signal and never subscribes to
group 2, hanging the whole thing.
Using a flatMap
would fix that, because flatMap
will subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.
根据 document reference,groupBy 运算符根据运算符的 keymapper 函数将给定的 Flux 拆分为多个 GroupedFlux。 如果我使用 257 个整数范围执行以下代码,它可以正常工作,但不能使用 258
public void groupByTest() {
Flux.range(1, 258)
.groupBy(val -> val)
.concatMap(g -> g.map(val -> val + "test"))
.doOnNext(System.out::println)
.blockLast();
}
这是否意味着 groupBy 运算符不能创建超过 257 个组?
如 groupBy
javadoc 中所述:
The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMap
with amaxConcurrency
parameter that is set too low).
这意味着一旦发出一个组,groupBy
需要获得更多请求才能取得进展。默认情况下,它最多打开 256 个组,然后它需要更多请求或检测组是否完整。并且 groupBy
不能 "know" 如果一个组是完整的,直到:
- A) 该组被取消(在这种情况下,如果稍后出现具有相同键的值,它将重新创建一个新组)
- B) 源已被完全处理(仅当源小于 256 个元素时才会发生,默认的 groupBy
prefetch
,或者如果groupBy
从中接收到onComplete
信号来源)
val -> val
标准和 concatMap
均不符合这些要求。
groupBy
标准最终会生成与值一样多的组。这里有 258 个组,而 groupBy
的默认容量可以跟踪 256 个组。
Note: If the whole sequence starts less than 256 groups, it would work fine. Try setting the criteria to
val -> val % 2
and see that it works. Then try to bump the range torange(1, 513)
and see how it hangs again.
由于 concatMap
的工作方式,上次测试限制为 512 个元素。
concatMap
在我们的例子中尤其糟糕,因为它只会订阅下一组并在第一组完成时取得进展 。这与上面的条件 B) 冲突,造成 groupBy
和 concatMap
都无法取得进展的情况。
Note: In the small example with 513,
concatMap
would start consuming group 1 and wait for it to complete before it consumes group 2. BUTgroupBy
stops emitting once it has fetched 256 elements for group 1 and then waits for downstream to start consuming group 2. As a result, it has just too few data to detect that the group is complete,concatMap
waits for that completion signal and never subscribes to group 2, hanging the whole thing.Using a
flatMap
would fix that, becauseflatMap
will subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.