从不同的流程收集时,我应该从协程发出吗?
Should I emit from a coroutine when collecting from a different flow?
我有一个用例,我需要触发从流收集的特定事件并在它关闭时重新启动它。我还需要将所有事件发送到不同的流程。我当前的实现如下所示:
scope.launch {
val flowToReturn = MutableSharedFlow<Event>()
while (true) {
client
.connect() // returns Flow<Event>
.catch { ... } // ignore errors
.onEach { launch { flowToReturn.emit(it) } } // problem here
.filterIsInstance<Event.Some>()
.collect { someEvent ->
doStuff(someEvent)
}
}
}.start()
想法是在客户端断开连接时总是重新连接(collect
然后 returns 并且新的迭代开始)同时将外部流生命周期与内部(连接)生命周期分开。它是一个可能有多个订阅者的共享流是次要问题。
正如 emit
文档所述,它不是线程安全的。那我应该从一个新的协程调用它吗?我担心的是,如果没有外部流的订阅者,emit
将暂停,无论如何我都需要 运行 下游管道。
MutableSharedFlow.emit()
文档说它 is thread-safe. Maybe you were accidentally looking at FlowCollector.emit()
,这不是线程安全的。 MutableSharedFlow 是 FlowCollector 的子类型,但将 emit()
提升为线程安全的,因为它不像普通的 FlowCollector 那样用作 Flow 构建器接收器。没有理由启动协程只是为了发送到您的共享流。
没有理由在使用 launch
创建的协程作业上调用 start()
,因为 launch
既创建作业又启动作业。
您需要在 launch
调用之前声明 flowToReturn
才能将其置于此外部函数的 return 范围内。
我有一个用例,我需要触发从流收集的特定事件并在它关闭时重新启动它。我还需要将所有事件发送到不同的流程。我当前的实现如下所示:
scope.launch {
val flowToReturn = MutableSharedFlow<Event>()
while (true) {
client
.connect() // returns Flow<Event>
.catch { ... } // ignore errors
.onEach { launch { flowToReturn.emit(it) } } // problem here
.filterIsInstance<Event.Some>()
.collect { someEvent ->
doStuff(someEvent)
}
}
}.start()
想法是在客户端断开连接时总是重新连接(collect
然后 returns 并且新的迭代开始)同时将外部流生命周期与内部(连接)生命周期分开。它是一个可能有多个订阅者的共享流是次要问题。
正如 emit
文档所述,它不是线程安全的。那我应该从一个新的协程调用它吗?我担心的是,如果没有外部流的订阅者,emit
将暂停,无论如何我都需要 运行 下游管道。
MutableSharedFlow.emit()
文档说它 is thread-safe. Maybe you were accidentally looking at FlowCollector.emit()
,这不是线程安全的。 MutableSharedFlow 是 FlowCollector 的子类型,但将 emit()
提升为线程安全的,因为它不像普通的 FlowCollector 那样用作 Flow 构建器接收器。没有理由启动协程只是为了发送到您的共享流。
没有理由在使用 launch
创建的协程作业上调用 start()
,因为 launch
既创建作业又启动作业。
您需要在 launch
调用之前声明 flowToReturn
才能将其置于此外部函数的 return 范围内。