Java 如何缓冲和分组 Reactor Flux 中的元素

How to buffer and group elements in Reactor Flux in Java

鉴于无限多的对象,其中每个对象都有一个 ID,我如何使用 flux 创建一个按 ID 属性 分组的缓冲更新列表(保留最后发出的值)? 谢谢

例子

    Obj(ID=A, V=1)
    Obj(ID=A, V=2)
    Obj(ID=B, V=3) 
    --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
    Obj(ID=A, V=1)
    Obj(ID=B, V=4)
    Obj(ID=B, V=6)
    Obj(ID=A, V=2)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
    Obj(ID=B, V=1)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]

像下面这样的东西是完美的,但它似乎在我的测试中等待通量结束而不是缓冲。

flux
    .buffer(Duration.ofMillis(2000))
    .groupBy(Obj::getId)
    .flatMap(GroupedFlux::getLast)
    .collectToList()
    .subscribe(this::printList);

它与用于分组的缓冲区和自定义逻辑一起工作

    public static void main(String[] args) {
        flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
    }

    private void groupList(List<T> ts) {
        Collection<T> values = ts.stream()
                .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
                .values();
        System.out.println(values);
    }

buffer 将发出 List<T>,因此您可以只使用 non-reactive java 进行分组。例如,java 像您的示例中那样流式传输。假设您的流程功能是被动的,您可以继续流程

flux
    .buffer(Duration.ofMillis(2000))
    .map(list ->
            list.stream()
                    .collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
    .flatMapIterable(Map::values)
    .flatMap(obj -> process(obj));