Reactor 3.x - 限制一个groupBy Flux的时间
Reactor 3.x - limit the time of a groupBy Flux
是否有任何方法可以强制由 groupBy() 生成的 Flux 在一段时间后完成(或者类似地,限制 "open" 组的最大数量)而不管上游的完整性?我有类似以下内容:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
和运行进入Flux挂起的情况,假设是因为组数大于flatMap
的并发。我可以增加 flatMap
并发性,但没有简单的方法来判断最大可能大小是多少。相反,我知道按 Foo.key
分组的 Foo
将按 time/publication 顺序彼此靠近,并且宁愿在 window 上使用某种时间groupBy Flux 与 flatMap 并发性(并以两个不同的组结束并具有相同的 key()
没什么大不了的)。
我猜 groupBy
Flux 在 someFastPubisher
onCompletes 之前不会 onComplete - 即 Flux 移交给 flatMap
只是保持 "open"(尽管他们'不太可能获得新活动)。
我可以通过在 groupBy 中预取 Integer.MAX
或 Integer.MAX
并发来解决这个问题 - 但是有没有办法控制组的 "life"?
是的:您可以对群组应用 take(Duration)
以确保它们提前关闭,之后将打开一个具有相同密钥的新群组:
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();
是否有任何方法可以强制由 groupBy() 生成的 Flux 在一段时间后完成(或者类似地,限制 "open" 组的最大数量)而不管上游的完整性?我有类似以下内容:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
和运行进入Flux挂起的情况,假设是因为组数大于flatMap
的并发。我可以增加 flatMap
并发性,但没有简单的方法来判断最大可能大小是多少。相反,我知道按 Foo.key
分组的 Foo
将按 time/publication 顺序彼此靠近,并且宁愿在 window 上使用某种时间groupBy Flux 与 flatMap 并发性(并以两个不同的组结束并具有相同的 key()
没什么大不了的)。
我猜 groupBy
Flux 在 someFastPubisher
onCompletes 之前不会 onComplete - 即 Flux 移交给 flatMap
只是保持 "open"(尽管他们'不太可能获得新活动)。
我可以通过在 groupBy 中预取 Integer.MAX
或 Integer.MAX
并发来解决这个问题 - 但是有没有办法控制组的 "life"?
是的:您可以对群组应用 take(Duration)
以确保它们提前关闭,之后将打开一个具有相同密钥的新群组:
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();