Flow.take(ITEM_COUNT) 返回所有元素而不是指定数量的元素

Flow.take(ITEM_COUNT) returning all the elements rather then specified amount of elements

我有一个方法 X,它通过 pub sub 从服务器获取数据。这个方法returns一个流程。我有另一种方法,它通过方法 X 订阅流,但如果数据与以前的数据不同,则只想从流中获取前 3 个最大值。我写了下面的代码

fun subscribeToData() : Flow<List<MyData>> {
    ....
    //incoming data
    emit(list)
}

fun getUptoFirst3Items() {
   subscribeToData()
   .take(ITEM_COUNT) // ITEM_COUNT is 3
   .distinctUntilChange() //only proceed if the data is different from the previous top 3 items
   .mapIndex {
     //do transformation
   }
   .collect { transformedListOf3Elements ->
    
   }

}

问题:

在 collect{} 中,我没有获得 3 个元素,而是获得了流中的所有数据。

我不确定这里出了什么问题?有人可以帮助我吗?

这里有一个Flow<List<MyData>>,这意味着这个流程的每个元素本身都是一个列表。

take 运算符应用于流程,因此您将获取流程的前 3 个列表。每个单独的列表都不受限制,除非您在列表本身上使用 take

所以名称 transformedListOf3Elements 是不正确的,因为该列表包含未知数量的元素,除非您在 map.

中以某种方式过滤它

@Joffrey 的回答已经解释了为什么你会返回整个列表并建议你在列表本身上使用 take()

如果您只想从每个列表中获取前 ITEM_COUNT 个元素,即 emitted/observed,那么您必须映射结果并且每个列表中只获取 ITEM_COUNT 个项目时间,而不是从流程中获取 ITEM_COUNT 项。

fun getUptoFirst3Items() {
   subscribeToData()
   .map {
     // in Kotlin stdlib Iterable<T> has an extension method take(n: Int)
     // that will return a List<T> containing the first n element from the iterable
     it.take(ITEM_COUNT)
     // alternatively you can also use subList, but the semantics are not the same,
     // so check the subList documentation, before using it
     it.subList(0, ITEM_COUNT)
   }
   .distinctUntilChange() //only proceed if the data is different from the previous top 3 items
   .mapIndex {
     //do transformation
   }
   .collect { transformedListOf3Elements ->
    
   }

}