Reactor 3.x - 限制一个groupBy Flux的时间

Reactor 3.x - limit the time of a groupBy Flux

是否有任何方法可以强制由 groupBy() 生成的 Flux 在一段时间后完成(或者类似地,限制 "open" 组的最大数量)而不管上游的完整性?我有类似以下内容:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;

和运行进入Flux挂起的情况,假设是因为组数大于flatMap的并发。我可以增加 flatMap 并发性,但没有简单的方法来判断最大可能大小是多少。相反,我知道按 Foo.key 分组的 Foo 将按 time/publication 顺序彼此靠近,并且宁愿在 window 上使用某种时间groupBy Flux 与 flatMap 并发性(并以两个不同的组结束并具有相同的 key() 没什么大不了的)。

我猜 groupBy Flux 在 someFastPubisher onCompletes 之前不会 onComplete - 即 Flux 移交给 flatMap 只是保持 "open"(尽管他们'不太可能获得新活动)。

我可以通过在 groupBy 中预取 Integer.MAXInteger.MAX 并发来解决这个问题 - 但是有没有办法控制组的 "life"?

是的:您可以对群组应用 take(Duration) 以确保它们提前关闭,之后将打开一个具有相同密钥的新群组:

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();