如何在流中使用 callbackFlow?

How to use callbackFlow within a flow?

我正在尝试包装 callbackFlow within an outer flow - there are items I'd like to emit from the outer flow, but I've got an old callback interface, which I'd like to adapt to Kotlin flow. I've looked at several 但我不知道如何在另一个流程中正确触发它。

这是一个例子:

class Processor {
    fun start(processProgress: ProcessProgressListener) {
        processProgress.onFinished() //finishes as soon as it starts!
    }
}

interface ProcessProgressListener {
    fun onFinished()
}

//main method here:
 fun startProcess(processor: Processor): Flow<String> {
     val mainFlow = flow {
         emit("STARTED")
         emit("IN_PROGRESS")
     }

     return merge(processProgressFlow(processor), mainFlow)
 }

 fun processProgressFlow(processor: Processor) = callbackFlow {
     val listener = object : ProcessProgressListener {
         override fun onFinished() {
             trySend("FINISHED")
         }
     }

     processor.start(listener)
 }

Processor 需要一个侦听器,该侦听器在进程完成时触发。发生这种情况时,我想发出最后一项 FINISHED.

我调用整个流程的方式如下:

     runBlocking {
         startProcess(Processor()).collect {
             print(it)
         }
     }

但是,我没有得到任何输出。但是,如果我不使用 megre 而只使用 return mainFlow,我会得到 STARTEDIN_PROGRESS 项目。

我做错了什么?

您忘记在 callbackFlow 区块末尾调用 awaitClose:

fun processProgressFlow(processor: Processor) = callbackFlow<String> {
    val listener = object : ProcessProgressListener {
        override fun onFinished() {
            trySend("FINISHED")
            channel.close()
        }
    }

    processor.start(listener)

    /*
     * Suspends until 'channel.close() or cancel()' is invoked
     * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
     * In both cases, callback will be properly unregistered.
     */
    awaitClose { /* unregister listener here */ }
}

awaitClose {} 应该用在 callbackFlow 块的末尾。 否则,在外部取消的情况下,callback/listener 可能会泄漏。

根据 callbackFlow docs:

awaitClose should be used to keep the flow running, otherwise the channel will be closed immediately when block completes. awaitClose argument is called either when a flow consumer cancels the flow collection or when a callback-based API invokes SendChannel.close manually and is typically used to cleanup the resources after the completion, e.g. unregister a callback. Using awaitClose is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed. To avoid such leaks, this method throws IllegalStateException if block returns, but the channel is not closed yet.