如何将 Kotlin 流转换为可变流?

How can you convert a kotlin flow to a mutable flow?

我试图在我的 class 中保持可变状态流,但是当我对其应用任何方法时,它将转换为不可变状态 Flow<T>:

class MyClass : Listener<String> {

       private val source = Source()

       val flow: Flow<String?>
          get() = _flow

       // region listener
        override fun onUpdate(value: String?) {
            if (value!= null) {
                // emit object changes to the flow
               // not possible, because the builder operators on the flow below convert it to a `Flow` and it doesn't stay as a `MutableSharedFlow` :(
                _flow.tryEmit(value) 
            }
        }
        // end-region

        @OptIn(ExperimentalCoroutinesApi::class)
        private val _flow by lazy {
            MutableStateFlow<String?>(null).onStart {
                emitAll(
                    flow<String?> {
                        val initialValue = source.getInitialValue()
                        emit(initialValue)
                    }.flowOn(MyDispatchers.background)
                )
            }.onCompletion { error ->
                // when the flow is cancelled, stop listening to changes
                if (error is CancellationException) {
                    // is was cancelled
                    source.removeListener(this@MyClass)
                }
            }.apply {
                // listen to changes and send them to the flow
                source.addListener(this@MyClass)
            }
        }
}

有没有办法在我应用 onCompletion/onStart 方法后仍保持 MutableStateFlow 的流程?

如果将转换应用于可变状态流,则生成的流将变为只读,因为原始流充当其源。如果您想手动发出事件,则需要将它们发出到初始源流。

话虽如此,您在这里想要实现的目标似乎很简单:将基于 API 的回调桥接到 Flow API。 Kotlin 协程中有一个内置函数可以执行此操作,称为 callbackFlow.

我不确定你的来源 API 如何处理背压,但它看起来像这样:

@OptIn(ExperimentalCoroutinesApi::class)
fun Source.asFlow(): Flow<String?> = callbackFlow {
    send(getInitialValue())

    val listener = object : Listener<String> {
        override fun onUpdate(value: String?) {
            if (value != null) {
                trySend(value)
            }
        }
    }
    addListener(listener)
    awaitClose {
        removeListener(listener)
    }
}

或者可能使用 runBlocking { send(value) } 而不是 trySend(),这取决于 Source 如何处理背压和阻塞在它自己的线程池中。

请注意,flowOn 可能会在此流程之上使用,但它只对 getInitialValue() 真正重要,因为执行回调的线程由 Source 控制无论如何。

如果 Source 添加许多侦听器的成本很高,您还可以考虑使用 shareIn() 运算符共享此流程,以便多个订阅者共享相同的侦听器订阅。