如何在 Kotlin 中合并流和通道?
How to merge flow and channel in Kotlin?
我需要创建一个API,应该是flow,收集事件。问题是这些事件可能来自一个通道(我需要一个 PublishSubject 的模拟)和一个流(它执行网络请求)。
我也不确定这是否是最佳解决方案,所以如果我可以改进它,请告诉我。
我在做什么:
我的api:
override val statusFlow = trackStatus()
private fun trackStatus(): Flow<State> = flow { ... }
private val deviceChannel = Channel<State>(CONFLATED)
所以 statusFlow 应该 return 一个流,我可以从中接收来自流和通道的数据。
我试过用consumeAsflow把channel转成flow,但是不行
我认为解决方案是
private fun trackStatus(): Flow<State> = flowOf(channel.toFlow(), flow).flattenMerge()
正确的做法是什么?
private fun trackStatus() = merge(deviceChannel.recieveAsFlow(), trackStatus)
协程库中 merge()
的定义是
/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
这个案例的解决方案是 merge(),正如在省答案中提到的那样,但它不会那样工作。
您应该使用 BroadcastChannel 而不是 Channel,因为最后一个在一生中只能提供一次订阅。此外,您应该使用 asFlow() 将此类通道转换为流。
我需要创建一个API,应该是flow,收集事件。问题是这些事件可能来自一个通道(我需要一个 PublishSubject 的模拟)和一个流(它执行网络请求)。
我也不确定这是否是最佳解决方案,所以如果我可以改进它,请告诉我。
我在做什么:
我的api:
override val statusFlow = trackStatus()
private fun trackStatus(): Flow<State> = flow { ... }
private val deviceChannel = Channel<State>(CONFLATED)
所以 statusFlow 应该 return 一个流,我可以从中接收来自流和通道的数据。
我试过用consumeAsflow把channel转成flow,但是不行
我认为解决方案是
private fun trackStatus(): Flow<State> = flowOf(channel.toFlow(), flow).flattenMerge()
正确的做法是什么?
private fun trackStatus() = merge(deviceChannel.recieveAsFlow(), trackStatus)
协程库中 merge()
的定义是
/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
这个案例的解决方案是 merge(),正如在省答案中提到的那样,但它不会那样工作。 您应该使用 BroadcastChannel 而不是 Channel,因为最后一个在一生中只能提供一次订阅。此外,您应该使用 asFlow() 将此类通道转换为流。