Java 如何缓冲和分组 Reactor Flux 中的元素
How to buffer and group elements in Reactor Flux in Java
鉴于无限多的对象,其中每个对象都有一个 ID,我如何使用 flux 创建一个按 ID 属性 分组的缓冲更新列表(保留最后发出的值)?
谢谢
例子
Obj(ID=A, V=1)
Obj(ID=A, V=2)
Obj(ID=B, V=3)
--- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
Obj(ID=A, V=1)
Obj(ID=B, V=4)
Obj(ID=B, V=6)
Obj(ID=A, V=2)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
Obj(ID=B, V=1)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]
像下面这样的东西是完美的,但它似乎在我的测试中等待通量结束而不是缓冲。
flux
.buffer(Duration.ofMillis(2000))
.groupBy(Obj::getId)
.flatMap(GroupedFlux::getLast)
.collectToList()
.subscribe(this::printList);
它与用于分组的缓冲区和自定义逻辑一起工作
public static void main(String[] args) {
flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
}
private void groupList(List<T> ts) {
Collection<T> values = ts.stream()
.collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
.values();
System.out.println(values);
}
buffer
将发出 List<T>
,因此您可以只使用 non-reactive java 进行分组。例如,java 像您的示例中那样流式传输。假设您的流程功能是被动的,您可以继续流程
flux
.buffer(Duration.ofMillis(2000))
.map(list ->
list.stream()
.collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
.flatMapIterable(Map::values)
.flatMap(obj -> process(obj));
鉴于无限多的对象,其中每个对象都有一个 ID,我如何使用 flux 创建一个按 ID 属性 分组的缓冲更新列表(保留最后发出的值)? 谢谢
例子
Obj(ID=A, V=1)
Obj(ID=A, V=2)
Obj(ID=B, V=3)
--- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
Obj(ID=A, V=1)
Obj(ID=B, V=4)
Obj(ID=B, V=6)
Obj(ID=A, V=2)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
Obj(ID=B, V=1)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]
像下面这样的东西是完美的,但它似乎在我的测试中等待通量结束而不是缓冲。
flux
.buffer(Duration.ofMillis(2000))
.groupBy(Obj::getId)
.flatMap(GroupedFlux::getLast)
.collectToList()
.subscribe(this::printList);
它与用于分组的缓冲区和自定义逻辑一起工作
public static void main(String[] args) {
flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
}
private void groupList(List<T> ts) {
Collection<T> values = ts.stream()
.collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
.values();
System.out.println(values);
}
buffer
将发出 List<T>
,因此您可以只使用 non-reactive java 进行分组。例如,java 像您的示例中那样流式传输。假设您的流程功能是被动的,您可以继续流程
flux
.buffer(Duration.ofMillis(2000))
.map(list ->
list.stream()
.collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
.flatMapIterable(Map::values)
.flatMap(obj -> process(obj));