如何在 Kotlin 中合并流和通道?

How to merge flow and channel in Kotlin?

我需要创建一个API,应该是flow,收集事件。问题是这些事件可能来自一个通道(我需要一个 PublishSubject 的模拟)和一个流(它执行网络请求)。

我也不确定这是否是最佳解决方案,所以如果我可以改进它,请告诉我。

我在做什么:

我的api:

override val statusFlow = trackStatus()

private fun trackStatus(): Flow<State> = flow { ... }

private val deviceChannel = Channel<State>(CONFLATED)

所以 statusFlow 应该 return 一个流,我可以从中接收来自流和通道的数据。

我试过用consumeAsflow把channel转成flow,但是不行

我认为解决方案是

private fun trackStatus(): Flow<State> = flowOf(channel.toFlow(), flow).flattenMerge()

正确的做法是什么?

private fun trackStatus() = merge(deviceChannel.recieveAsFlow(), trackStatus)

协程库中 merge() 的定义是

/**
 * Merges the given flows into a single flow without preserving an order of elements.
 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
 *
 * ### Operator fusion
 *
 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
 */
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()

这个案例的解决方案是 merge(),正如在省答案中提到的那样,但它不会那样工作。 您应该使用 BroadcastChannel 而不是 Channel,因为最后一个在一生中只能提供一次订阅。此外,您应该使用 asFlow() 将此类通道转换为流。