Kotlin 协程流中 RxJava .toList() 的等价物
Equivalent of RxJava .toList() in Kotlin coroutines flow
我有一种情况需要观察 userId,然后使用这些 userId 观察用户。 userIds 或 users 都可以随时更改,我想让发出的用户保持最新状态。
这是我拥有的数据来源的示例:
data class User(val name: String)
fun observeBestUserIds(): Flow<List<String>> {
return flow {
emit(listOf("abc", "def"))
delay(500)
emit(listOf("123", "234"))
}
}
fun observeUserForId(userId: String): Flow<User> {
return flow {
emit(User("${userId}_name"))
delay(2000)
emit(User("${userId}_name_updated"))
}
}
在这种情况下,我希望排放量为:
[User(abc_name), User(def_name)]
,然后
[User(123_name), User(234_name)]
,然后
[User(123_name_updated), User(234_name_updated)]
我想我可以像这样在 RxJava 中实现这个:
observeBestUserIds.concatMapSingle { ids ->
Observable.fromIterable(ids)
.concatMap { id ->
observeUserForId(id)
}
.toList()
}
我应该编写什么函数来制作一个发出那个的流?
您可以使用flatMapConcat
val users = observeBestUserIds()
.flatMapConcat { ids ->
flowOf(*ids.toTypedArray())
.map { id ->
observeUserForId(id)
}
}
.flattenConcat()
.toList()
或
observeBestUserIds()
.flatMapConcat { ids ->
flowOf(*ids.toTypedArray())
.map { id ->
observeUserForId(id)
}
}
.flattenConcat()
.collect { user ->
}
不幸的是,这对于 kotlin Flow 的当前状态来说非常重要,似乎缺少重要的运算符。但请注意,您不是在寻找 rxJavas toList()。如果你想在 rxjava 中使用 toList
和 concatMap
来做,你将不得不等到所有的 observbes 完成。
这不是你想要的。
不幸的是,我认为没有办法绕过自定义函数。
它必须汇总 return 由 observeUserForId
编辑的所有结果,用于您将传递给它的所有 ID。它也不是一个简单的窗口函数,因为实际上可以想象一个 observeUserForId
已经 return 编辑了两次而另一个调用仍然没有完成。因此,检查您是否已经拥有与将 id 传递给聚合函数时相同数量的用户是不够的,您还必须按用户 id 分组。
我会在今天晚些时候尝试添加代码。
编辑:正如我所承诺的,这是我的解决方案,我冒昧地稍微增加了要求。因此,每当所有 userIds 都有值并且底层用户发生变化时,流程就会发出。我认为这更有可能是您想要的,因为用户可能不会同步更改属性。
不过,如果这不是您想要的,请发表评论。
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
data class User(val name: String)
fun observeBestUserIds(): Flow<List<String>> {
return flow {
emit(listOf("abc", "def"))
delay(500)
emit(listOf("123", "234"))
}
}
fun observeUserForId(userId: String): Flow<User> {
return flow {
emit(User("${userId}_name"))
delay(2000)
emit(User("${userId}_name_updated"))
}
}
inline fun <reified K, V> buildMap(keys: Set<K>, crossinline valueFunc: (K) -> Flow<V>): Flow<Map<K, V>> = flow {
val keysSize = keys.size
val valuesMap = HashMap<K, V>(keys.size)
flowOf(*keys.toTypedArray())
.flatMapMerge { key -> valueFunc(key).map {v -> Pair(key, v)} }
.collect { (key, value) ->
valuesMap[key] = value
if (valuesMap.keys.size == keysSize) {
emit(valuesMap.toMap())
}
}
}
fun observeUsersForIds(): Flow<List<User>> {
return observeBestUserIds().flatMapLatest { ids -> buildMap(ids.toSet(), ::observeUserForId as (String) -> Flow<User>) }
.map { m -> m.values.toList() }
}
fun main() = runBlocking {
observeUsersForIds()
.collect { user ->
println(user)
}
}
这将 return
[User(name=def_name), User(name=abc_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
您可以运行在线获取代码here
我相信您正在寻找 combine
,它为您提供了一个可以轻松调用 toList()
的数组:
observeBestUserIds().collectLatest { ids ->
combine(
ids.map { id -> observeUserForId(id) }
) {
it.toList()
}.collect {
println(it)
}
}
这里的内部部分具有更明确的参数名称,因为您在 Stack Overflow 上看不到 IDE 的类型提示:
combine(
ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
println(listOfUsers)
}
输出:
[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
Live demo(请注意,在演示中,所有输出都会同时出现,但这是演示站点的一个限制 - 当代码为 运行本地)
这避免了原始问题中讨论的 (abc_name_updated
, def_name_updated
)。但是,123_name_updated
和 234_name
仍然存在中间发射,因为 123_name_updated
首先发射并且它立即发送组合版本,因为它们是每个流中的最新版本。
然而,这可以通过消除发射的抖动来避免(在我的机器上,小到 1 毫秒的超时就可以工作,但为了保守起见,我做了 20 毫秒):
observeBestUserIds().collectLatest { ids ->
combine(
ids.map { id -> observeUserForId(id) }
) {
it.toList()
}.debounce(timeoutMillis = 20).collect {
println(it)
}
}
这会得到您想要的确切输出:
[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
我有一种情况需要观察 userId,然后使用这些 userId 观察用户。 userIds 或 users 都可以随时更改,我想让发出的用户保持最新状态。 这是我拥有的数据来源的示例:
data class User(val name: String)
fun observeBestUserIds(): Flow<List<String>> {
return flow {
emit(listOf("abc", "def"))
delay(500)
emit(listOf("123", "234"))
}
}
fun observeUserForId(userId: String): Flow<User> {
return flow {
emit(User("${userId}_name"))
delay(2000)
emit(User("${userId}_name_updated"))
}
}
在这种情况下,我希望排放量为:
[User(abc_name), User(def_name)]
,然后
[User(123_name), User(234_name)]
,然后
[User(123_name_updated), User(234_name_updated)]
我想我可以像这样在 RxJava 中实现这个:
observeBestUserIds.concatMapSingle { ids ->
Observable.fromIterable(ids)
.concatMap { id ->
observeUserForId(id)
}
.toList()
}
我应该编写什么函数来制作一个发出那个的流?
您可以使用flatMapConcat
val users = observeBestUserIds()
.flatMapConcat { ids ->
flowOf(*ids.toTypedArray())
.map { id ->
observeUserForId(id)
}
}
.flattenConcat()
.toList()
或
observeBestUserIds()
.flatMapConcat { ids ->
flowOf(*ids.toTypedArray())
.map { id ->
observeUserForId(id)
}
}
.flattenConcat()
.collect { user ->
}
不幸的是,这对于 kotlin Flow 的当前状态来说非常重要,似乎缺少重要的运算符。但请注意,您不是在寻找 rxJavas toList()。如果你想在 rxjava 中使用 toList
和 concatMap
来做,你将不得不等到所有的 observbes 完成。
这不是你想要的。
不幸的是,我认为没有办法绕过自定义函数。
它必须汇总 return 由 observeUserForId
编辑的所有结果,用于您将传递给它的所有 ID。它也不是一个简单的窗口函数,因为实际上可以想象一个 observeUserForId
已经 return 编辑了两次而另一个调用仍然没有完成。因此,检查您是否已经拥有与将 id 传递给聚合函数时相同数量的用户是不够的,您还必须按用户 id 分组。
我会在今天晚些时候尝试添加代码。
编辑:正如我所承诺的,这是我的解决方案,我冒昧地稍微增加了要求。因此,每当所有 userIds 都有值并且底层用户发生变化时,流程就会发出。我认为这更有可能是您想要的,因为用户可能不会同步更改属性。
不过,如果这不是您想要的,请发表评论。
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
data class User(val name: String)
fun observeBestUserIds(): Flow<List<String>> {
return flow {
emit(listOf("abc", "def"))
delay(500)
emit(listOf("123", "234"))
}
}
fun observeUserForId(userId: String): Flow<User> {
return flow {
emit(User("${userId}_name"))
delay(2000)
emit(User("${userId}_name_updated"))
}
}
inline fun <reified K, V> buildMap(keys: Set<K>, crossinline valueFunc: (K) -> Flow<V>): Flow<Map<K, V>> = flow {
val keysSize = keys.size
val valuesMap = HashMap<K, V>(keys.size)
flowOf(*keys.toTypedArray())
.flatMapMerge { key -> valueFunc(key).map {v -> Pair(key, v)} }
.collect { (key, value) ->
valuesMap[key] = value
if (valuesMap.keys.size == keysSize) {
emit(valuesMap.toMap())
}
}
}
fun observeUsersForIds(): Flow<List<User>> {
return observeBestUserIds().flatMapLatest { ids -> buildMap(ids.toSet(), ::observeUserForId as (String) -> Flow<User>) }
.map { m -> m.values.toList() }
}
fun main() = runBlocking {
observeUsersForIds()
.collect { user ->
println(user)
}
}
这将 return
[User(name=def_name), User(name=abc_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
您可以运行在线获取代码here
我相信您正在寻找 combine
,它为您提供了一个可以轻松调用 toList()
的数组:
observeBestUserIds().collectLatest { ids ->
combine(
ids.map { id -> observeUserForId(id) }
) {
it.toList()
}.collect {
println(it)
}
}
这里的内部部分具有更明确的参数名称,因为您在 Stack Overflow 上看不到 IDE 的类型提示:
combine(
ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
println(listOfUsers)
}
输出:
[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]
Live demo(请注意,在演示中,所有输出都会同时出现,但这是演示站点的一个限制 - 当代码为 运行本地)
这避免了原始问题中讨论的 (abc_name_updated
, def_name_updated
)。但是,123_name_updated
和 234_name
仍然存在中间发射,因为 123_name_updated
首先发射并且它立即发送组合版本,因为它们是每个流中的最新版本。
然而,这可以通过消除发射的抖动来避免(在我的机器上,小到 1 毫秒的超时就可以工作,但为了保守起见,我做了 20 毫秒):
observeBestUserIds().collectLatest { ids ->
combine(
ids.map { id -> observeUserForId(id) }
) {
it.toList()
}.debounce(timeoutMillis = 20).collect {
println(it)
}
}
这会得到您想要的确切输出:
[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]