如何将可流动列表转换为具有前一项累积和的新可流动列表

How to convert an flowable list to new flowable list with cummulative sum of previous item

这是非常基础的,但我是 RxJava 的新手,无法为我的查询找到合适的解决方案。

查询就像是,我正在获取一个可流动列表(单流),并希望再次将其转换为新的可流动列表(不是逐项可观察的),作为初始列表。在转换为新的可流动列表时,每个项目都应该有最后项目的累加和。

var cumulativeSum: Double = 0.0
var firstObservable: Flowable<List<Items>> = repo.getQuery()
var secondObservale: Flowable<List<Items>> = firstObservable.
.flatMapIterable{list -> list}
.map{ item ->
      cumulativeSum += it.qty
      it.totalQty = cumulativeSum
 }
.toList()

如果有人可以帮助我解决上述问题,为什么 secondObservable 不是一个 Flowable 项目列表?我将其作为单个单位列表获取。 或者这不是正确的转换方式。

请帮帮我,我怎样才能达到预期的效果。

如果您使用 flatMap 系列 api,它会使您的数据扁平化,您可能会丢失初始列表。

firstObservable.flatMapIterable{list -> list} // This will be converted to Flowable<Items>

此外,map() 运算符可以将您的对象转换为其他对象。如果您只想保留 Items 对象并且只想更改内部变量,请使用 'doOnXXX()' 运算符。

var secondObservale: Flowable<List<Items>> = firstObservable
    .doOnNext {
        it.fold(0.0) { acc, items ->
            items.totalQty = acc + items.qty
            items.totalQty
        }
    }

如果我有什么问题请告诉我。

更新

我认为 RxJava 的核心功能是并发性。我不确定这种情况是否是您想要的,但是如果您想 运行 同时执行每个任务,请尝试以下代码。

var secondObservale: Flowable<List<Items>> = firstObservable
    .parallel(4) // max concurrency: 4
    .runOn(Schedulers.computation())
    .doOnNext {
        it.fold(0.0) { acc, items ->
            items.totalQty = acc + items.qty
            items.totalQty
        }
    }
    .sequential()