将回调流转换为共享流

Convert callbackflow to sharedflow

我刚开始使用 coroutines/flow(通常是 kotlin),我正在努力将 callbackFlow 转换为 sharedFlow。

我将下面的简单示例放在一起只是为了展示我尝试过的方法,但没有成功。我的代码比较复杂,但我相信这个例子反映了我正在努力实现的问题。

fun main() = runBlocking {

    getMySharedFlow().collect{
        println("collector 1 value: $it")
    }

    getMySharedFlow().collect{
        println("collector 2 value: $it")
    }

}

val sharedFlow = MutableSharedFlow<Int>()

suspend fun getMySharedFlow(): SharedFlow<Int> {
    println("inside sharedflow")
    getMyCallbackFlow().collect{
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
    return sharedFlow
}

fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: ()->Unit) {
    println("fetching something...")
    myCallBack()
}

想法是 fetchSomethingContinuously 只调用一次,与 sharedFlow 的收集器数量无关。但是正如您从输出中看到的那样,收集器永远不会获取值:

inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3

我查看了 shareIn 运算符,但不确定如何使用它。

我怎样才能实现这样的目标?任何提示将不胜感激。

所以你在这里缺少的是对 collectemit()awaitClose() 的调用正在暂停,并且只有在相应的操作完成后才会完成。

函数 getMySharedFlow() 甚至 return 都没有对其应用收集,因为它正在收集 callbackFlowcallbackFlow 卡在调用处到 awaitClose() 又没有完成,因为 fetchSomethingContinuously 没有用 close() 函数结束回调。

你需要意识到你必须在这里创建一些明确的并行性,而不是生活在暂停的世界中。您的示例代码的一个工作变体是:

val sharedFlow = MutableSharedFlow<Int>()

suspend fun startSharedFlow() {
    println("Starting Shared Flow callback collection")

    getMyCallbackFlow().collect {
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
}

fun main() = runBlocking<Unit> {

    launch {
        startSharedFlow()
    }

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}

调用 launch 允许异步执行发射和收集值。

此外,关于 shareIn() 运算符,它只是从指定的上游创建一个 SharedFlow,就像您想要的那样。此外,您还可以使用 started 参数指定何时开始共享。关于此的更多信息 here.

这就是您在示例中使用它的方式:

fun main() = runBlocking<Unit> {

    val sharedFlow = getMyCallbackFlow().shareIn(this, started = SharingStarted.Eagerly)

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}