如何按组聚合通量上的元素/如何按组减少?

How can I aggregate elements on a flux by group / how to reduce groupwise?

假设您有大量具有以下结构的对象:

class Element {
  String key;
  int count;
}

现在想象一下,这些元素以预定义的排序顺序流动,总是以一个键为一组,比如

{ key = "firstKey",  count=123}
{ key = "firstKey",  count=1  }
{ key = "secondKey", count=4  }
{ key = "thirdKey",  count=98 }
{ key = "thirdKey",  count=5  }
 .....

我想做的是创建一个通量,其中 returns 每个不同 key 的元素和每个键组的总和 count。 所以基本上就像每个组的经典归约,但使用 reduce 运算符不起作用,因为它只有 returns 一个元素,我想为每个不同的键获得一个元素的通量。

使用 bufferUntil 可能有效,但缺点是我必须保持状态以检查 key 与前一个相比是否发生了变化。

使用 groupBy 是一种矫枉过正,因为我知道一旦找到新密钥,每个组就会结束,所以我不想在该事件之后保留任何缓存。

是否可以使用 Flux 进行这样的聚合,而无需将状态保持在流程之外?

如果不自己跟踪状态,目前(从 3.2.5 开始)这是不可能的。 distinctUntilChanged 可能符合最小状态的要求,但不发出状态,只是根据所述状态认为 "distinct" 的值。

解决此问题的最简单方法是使用 windowUntilcompose + 每个订阅者状态的 AtomicReference

Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
    //having this state inside a compose means it will not be shared by multiple subscribers
    AtomicReference<T> last = new AtomicReference<>(null);

    return source
      //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
      .windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
      //reduce each window
      .flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
});

这对我来说真的很管用!感谢 post。 请注意,与此同时,“撰写”方法已重命名。您需要改用 transformDeferred。 在我的例子中,我有一个“仪表板”对象,它有一个 id(存储为 UUID),我想在其上对源通量进行分组:

Flux<Dashboard> sourceFlux = ... // could be a DB query. The Flux must be sorted according the id.  

sourceFlux.transformDeferred(dashboardFlux -> {
    // this stores the dashboardId's as the Flux publishes. It is used to decide when to open a new window
    // having this state inside a compose means it will not be shared by multiple subscribers
    AtomicReference<UUID> last = new AtomicReference<>(null);
    
    return dashboardFlux
    //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
        .windowUntil(i -> !i.getDashboardId().equals(last.getAndSet(i.getDashboardId())), true)
        //reduce each window
        .flatMap(window -> window.reduce(... /* reduce one window here */));
})