如何按组聚合通量上的元素/如何按组减少?
How can I aggregate elements on a flux by group / how to reduce groupwise?
假设您有大量具有以下结构的对象:
class Element {
String key;
int count;
}
现在想象一下,这些元素以预定义的排序顺序流动,总是以一个键为一组,比如
{ key = "firstKey", count=123}
{ key = "firstKey", count=1 }
{ key = "secondKey", count=4 }
{ key = "thirdKey", count=98 }
{ key = "thirdKey", count=5 }
.....
我想做的是创建一个通量,其中 returns 每个不同 key
的元素和每个键组的总和 count
。
所以基本上就像每个组的经典归约,但使用 reduce
运算符不起作用,因为它只有 returns 一个元素,我想为每个不同的键获得一个元素的通量。
使用 bufferUntil
可能有效,但缺点是我必须保持状态以检查 key
与前一个相比是否发生了变化。
使用 groupBy
是一种矫枉过正,因为我知道一旦找到新密钥,每个组就会结束,所以我不想在该事件之后保留任何缓存。
是否可以使用 Flux
进行这样的聚合,而无需将状态保持在流程之外?
如果不自己跟踪状态,目前(从 3.2.5 开始)这是不可能的。 distinctUntilChanged
可能符合最小状态的要求,但不发出状态,只是根据所述状态认为 "distinct" 的值。
解决此问题的最简单方法是使用 windowUntil
和 compose
+ 每个订阅者状态的 AtomicReference
:
Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
//having this state inside a compose means it will not be shared by multiple subscribers
AtomicReference<T> last = new AtomicReference<>(null);
return source
//use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
.windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
//reduce each window
.flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
});
这对我来说真的很管用!感谢 post。
请注意,与此同时,“撰写”方法已重命名。您需要改用 transformDeferred
。
在我的例子中,我有一个“仪表板”对象,它有一个 id(存储为 UUID),我想在其上对源通量进行分组:
Flux<Dashboard> sourceFlux = ... // could be a DB query. The Flux must be sorted according the id.
sourceFlux.transformDeferred(dashboardFlux -> {
// this stores the dashboardId's as the Flux publishes. It is used to decide when to open a new window
// having this state inside a compose means it will not be shared by multiple subscribers
AtomicReference<UUID> last = new AtomicReference<>(null);
return dashboardFlux
//use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
.windowUntil(i -> !i.getDashboardId().equals(last.getAndSet(i.getDashboardId())), true)
//reduce each window
.flatMap(window -> window.reduce(... /* reduce one window here */));
})
假设您有大量具有以下结构的对象:
class Element {
String key;
int count;
}
现在想象一下,这些元素以预定义的排序顺序流动,总是以一个键为一组,比如
{ key = "firstKey", count=123}
{ key = "firstKey", count=1 }
{ key = "secondKey", count=4 }
{ key = "thirdKey", count=98 }
{ key = "thirdKey", count=5 }
.....
我想做的是创建一个通量,其中 returns 每个不同 key
的元素和每个键组的总和 count
。
所以基本上就像每个组的经典归约,但使用 reduce
运算符不起作用,因为它只有 returns 一个元素,我想为每个不同的键获得一个元素的通量。
使用 bufferUntil
可能有效,但缺点是我必须保持状态以检查 key
与前一个相比是否发生了变化。
使用 groupBy
是一种矫枉过正,因为我知道一旦找到新密钥,每个组就会结束,所以我不想在该事件之后保留任何缓存。
是否可以使用 Flux
进行这样的聚合,而无需将状态保持在流程之外?
如果不自己跟踪状态,目前(从 3.2.5 开始)这是不可能的。 distinctUntilChanged
可能符合最小状态的要求,但不发出状态,只是根据所述状态认为 "distinct" 的值。
解决此问题的最简单方法是使用 windowUntil
和 compose
+ 每个订阅者状态的 AtomicReference
:
Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
//having this state inside a compose means it will not be shared by multiple subscribers
AtomicReference<T> last = new AtomicReference<>(null);
return source
//use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
.windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
//reduce each window
.flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
});
这对我来说真的很管用!感谢 post。
请注意,与此同时,“撰写”方法已重命名。您需要改用 transformDeferred
。
在我的例子中,我有一个“仪表板”对象,它有一个 id(存储为 UUID),我想在其上对源通量进行分组:
Flux<Dashboard> sourceFlux = ... // could be a DB query. The Flux must be sorted according the id.
sourceFlux.transformDeferred(dashboardFlux -> {
// this stores the dashboardId's as the Flux publishes. It is used to decide when to open a new window
// having this state inside a compose means it will not be shared by multiple subscribers
AtomicReference<UUID> last = new AtomicReference<>(null);
return dashboardFlux
//use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
.windowUntil(i -> !i.getDashboardId().equals(last.getAndSet(i.getDashboardId())), true)
//reduce each window
.flatMap(window -> window.reduce(... /* reduce one window here */));
})