Kotlin 协程流中 RxJava .toList() 的等价物

Equivalent of RxJava .toList() in Kotlin coroutines flow

我有一种情况需要观察 userId,然后使用这些 userId 观察用户。 userIds 或 users 都可以随时更改,我想让发出的用户保持最新状态。 这是我拥有的数据来源的示例:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

在这种情况下,我希望排放量为:

[User(abc_name), User(def_name)],然后

[User(123_name), User(234_name)],然后

[User(123_name_updated), User(234_name_updated)]

我想我可以像这样在 RxJava 中实现这个:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}

我应该编写什么函数来制作一个发出那个的流?

您可以使用flatMapConcat

val users = observeBestUserIds()
        .flatMapConcat { ids ->
            flowOf(*ids.toTypedArray())
                .map { id ->
                    observeUserForId(id)
                }
        }
        .flattenConcat()
        .toList()

    observeBestUserIds()
        .flatMapConcat { ids ->
            flowOf(*ids.toTypedArray())
                .map { id ->
                    observeUserForId(id)
                }
        }
        .flattenConcat()
        .collect { user ->

        }

不幸的是,这对于 kotlin Flow 的当前状态来说非常重要,似乎缺少重要的运算符。但请注意,您不是在寻找 rxJavas toList()。如果你想在 rxjava 中使用 toListconcatMap 来做,你将不得不等到所有的 observbes 完成。 这不是你想要的。

不幸的是,我认为没有办法绕过自定义函数。

它必须汇总 return 由 observeUserForId 编辑的所有结果,用于您将传递给它的所有 ID。它也不是一个简单的窗口函数,因为实际上可以想象一个 observeUserForId 已经 return 编辑了两次而另一个调用仍然没有完成。因此,检查您是否已经拥有与将 id 传递给聚合函数时相同数量的用户是不够的,您还必须按用户 id 分组。

我会在今天晚些时候尝试添加代码。

编辑:正如我所承诺的,这是我的解决方案,我冒昧地稍微增加了要求。因此,每当所有 userIds 都有值并且底层用户发生变化时,流程就会发出。我认为这更有可能是您想要的,因为用户可能不会同步更改属性。

不过,如果这不是您想要的,请发表评论。

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

inline fun <reified K, V> buildMap(keys: Set<K>, crossinline valueFunc: (K) -> Flow<V>): Flow<Map<K, V>> = flow {
    val keysSize = keys.size
    val valuesMap = HashMap<K, V>(keys.size)
    flowOf(*keys.toTypedArray())
            .flatMapMerge { key -> valueFunc(key).map {v -> Pair(key, v)} }
            .collect { (key, value) ->
                valuesMap[key] = value
                if (valuesMap.keys.size == keysSize) {
                    emit(valuesMap.toMap())
                }
            }
}

fun observeUsersForIds(): Flow<List<User>> {
    return observeBestUserIds().flatMapLatest { ids -> buildMap(ids.toSet(), ::observeUserForId as (String) -> Flow<User>) }
            .map { m -> m.values.toList() }
}


fun main() = runBlocking {
    observeUsersForIds()
        .collect { user ->
            println(user)
        }
}

这将 return

[User(name=def_name), User(name=abc_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

您可以运行在线获取代码here

我相信您正在寻找 combine,它为您提供了一个可以轻松调用 toList() 的数组:

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.collect {
        println(it)
    } 
}

这里的内部部分具有更明确的参数名称,因为您在 Stack Overflow 上看不到 IDE 的类型提示:

combine(
    ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
    arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
    println(listOfUsers)
}

输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

Live demo(请注意,在演示中,所有输出都会同时出现,但这是演示站点的一个限制 - 当代码为 运行本地)

这避免了原始问题中讨论的 (abc_name_updated, def_name_updated)。但是,123_name_updated234_name 仍然存在中间发射,因为 123_name_updated 首先发射并且它立即发送组合版本,因为它们是每个流中的最新版本。

然而,这可以通过消除发射的抖动来避免(在我的机器上,小到 1 毫秒的超时就可以工作,但为了保守起见,我做了 20 毫秒):

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.debounce(timeoutMillis = 20).collect {
        println(it)
    }
}

这会得到您想要的确切输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

Live demo