从不同的流程收集时,我应该从协程发出吗?

Should I emit from a coroutine when collecting from a different flow?

我有一个用例,我需要触发从流收集的特定事件并在它关闭时重新启动它。我还需要将所有事件发送到不同的流程。我当前的实现如下所示:

scope.launch {
    val flowToReturn = MutableSharedFlow<Event>()
    while (true) {
        client
            .connect()                                   // returns Flow<Event>
            .catch { ... }                               // ignore errors
            .onEach { launch { flowToReturn.emit(it) } } // problem here
            .filterIsInstance<Event.Some>()
            .collect { someEvent ->
                doStuff(someEvent)
            }
    }
}.start()

想法是在客户端断开连接时总是重新连接(collect 然后 returns 并且新的迭代开始)同时将外部流生命周期与内部(连接)生命周期分开。它是一个可能有多个订阅者的共享流是次要问题。

正如 emit 文档所述,它不是线程安全的。那我应该从一个新的协程调用它吗?我担心的是,如果没有外部流的订阅者,emit 将暂停,无论如何我都需要 运行 下游管道。

MutableSharedFlow.emit() 文档说它 is thread-safe. Maybe you were accidentally looking at FlowCollector.emit(),这不是线程安全的。 MutableSharedFlow 是 Fl​​owCollector 的子类型,但将 emit() 提升为线程安全的,因为它不像普通的 FlowCollector 那样用作 Flow 构建器接收器。没有理由启动协程只是为了发送到您的共享流。

没有理由在使用 launch 创建的协程作业上调用 start(),因为 launch 既创建作业又启动作业。

您需要在 launch 调用之前声明 flowToReturn 才能将其置于此外部函数的 return 范围内。