有没有办法对值进行分组,然后使用 RxJava 计算总和

Is there a way to group values and then calculate sum using RxJava

我有一个 Flowable 形式的对象列表。 示例 -

values count
A      2
B      3
C      4
A      5
C      1

我想根据值对可流动对象进行分组,然后计算总和。有更好的方法吗?

我已经尝试生成多图了。然后在订阅者中定义一个函数来聚合结果。但是,我觉得我没有有效地使用 JavaRx 在流中进行聚合。

 Flowable<Response> responseFlowable = Flowable.fromIterable(generateList());
        replayResponseFlowable.toMultimap(response -> response.getValues(), response -> response.getCount()).subscribe(groups-> calculateSum(groups));

    }

    private static void calculateSum(Map<String,Collection<Integer>> groups) 
   {
      //iterate over the map and calculate sum for each of the groups.  
   }        

预期结果是:

A      7
B      3
C      5

我希望使用 JavaRx 在流中执行此计算,而不是定义自定义方法。我该怎么做?

您可以将 calculateSum 放入流中以使其看起来更好。

replayResponseFlowable.toMultiMap(response -> response.getValues())
    .map(groups -> calculateSum(groups))
    .subscribe(result -> {
        print(result);
    })


由于您只是汇总计数,因此不需要跟踪所有项目。相反,您可以只保留每个值的计数总和。 这可以使用 .collect 运算符来完成:

replayResponseFlowable
    .collectInto(new HashMap<String, Integer>(), (group, response) -> {
         group.merge(response.values, response.count, Integer::sum);
    })
    .subscribe(result -> {
        print(result);
    });

这将打印

emitted=[{value=A, count=7}, {value=B, count=3}, {value=C, count=5}]


如果你想获得可流动物品发出的结果,使用.scan:

    replayResponseFlowable
            .scan(new HashMap<String, Integer>(), (group, response) -> {
                group.merge(response.values, response.count, Integer::sum);
                return group;
            })
            .subscribe(result -> {
                print(result);
            });

这将打印:

emitted=[]
emitted=[{value=A, count=2}]
emitted=[{value=A, count=2}, {value=B, count=3}]
emitted=[{value=A, count=2}, {value=B, count=3}, {value=C, count=4}]
emitted=[{value=A, count=7}, {value=B, count=3}, {value=C, count=4}]
emitted=[{value=A, count=7}, {value=B, count=3}, {value=C, count=5}]