将回调流转换为共享流
Convert callbackflow to sharedflow
我刚开始使用 coroutines/flow(通常是 kotlin),我正在努力将 callbackFlow 转换为 sharedFlow。
我将下面的简单示例放在一起只是为了展示我尝试过的方法,但没有成功。我的代码比较复杂,但我相信这个例子反映了我正在努力实现的问题。
fun main() = runBlocking {
getMySharedFlow().collect{
println("collector 1 value: $it")
}
getMySharedFlow().collect{
println("collector 2 value: $it")
}
}
val sharedFlow = MutableSharedFlow<Int>()
suspend fun getMySharedFlow(): SharedFlow<Int> {
println("inside sharedflow")
getMyCallbackFlow().collect{
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
return sharedFlow
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: ()->Unit) {
println("fetching something...")
myCallBack()
}
想法是 fetchSomethingContinuously
只调用一次,与 sharedFlow 的收集器数量无关。但是正如您从输出中看到的那样,收集器永远不会获取值:
inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3
我查看了 shareIn 运算符,但不确定如何使用它。
我怎样才能实现这样的目标?任何提示将不胜感激。
所以你在这里缺少的是对 collect
、emit()
和 awaitClose()
的调用正在暂停,并且只有在相应的操作完成后才会完成。
函数 getMySharedFlow()
甚至 return 都没有对其应用收集,因为它正在收集 callbackFlow
,callbackFlow
卡在调用处到 awaitClose()
又没有完成,因为 fetchSomethingContinuously
没有用 close()
函数结束回调。
你需要意识到你必须在这里创建一些明确的并行性,而不是生活在暂停的世界中。您的示例代码的一个工作变体是:
val sharedFlow = MutableSharedFlow<Int>()
suspend fun startSharedFlow() {
println("Starting Shared Flow callback collection")
getMyCallbackFlow().collect {
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
}
fun main() = runBlocking<Unit> {
launch {
startSharedFlow()
}
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
调用 launch
允许异步执行发射和收集值。
此外,关于 shareIn()
运算符,它只是从指定的上游创建一个 SharedFlow,就像您想要的那样。此外,您还可以使用 started
参数指定何时开始共享。关于此的更多信息 here.
这就是您在示例中使用它的方式:
fun main() = runBlocking<Unit> {
val sharedFlow = getMyCallbackFlow().shareIn(this, started = SharingStarted.Eagerly)
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
我刚开始使用 coroutines/flow(通常是 kotlin),我正在努力将 callbackFlow 转换为 sharedFlow。
我将下面的简单示例放在一起只是为了展示我尝试过的方法,但没有成功。我的代码比较复杂,但我相信这个例子反映了我正在努力实现的问题。
fun main() = runBlocking {
getMySharedFlow().collect{
println("collector 1 value: $it")
}
getMySharedFlow().collect{
println("collector 2 value: $it")
}
}
val sharedFlow = MutableSharedFlow<Int>()
suspend fun getMySharedFlow(): SharedFlow<Int> {
println("inside sharedflow")
getMyCallbackFlow().collect{
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
return sharedFlow
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: ()->Unit) {
println("fetching something...")
myCallBack()
}
想法是 fetchSomethingContinuously
只调用一次,与 sharedFlow 的收集器数量无关。但是正如您从输出中看到的那样,收集器永远不会获取值:
inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3
我查看了 shareIn 运算符,但不确定如何使用它。
我怎样才能实现这样的目标?任何提示将不胜感激。
所以你在这里缺少的是对 collect
、emit()
和 awaitClose()
的调用正在暂停,并且只有在相应的操作完成后才会完成。
函数 getMySharedFlow()
甚至 return 都没有对其应用收集,因为它正在收集 callbackFlow
,callbackFlow
卡在调用处到 awaitClose()
又没有完成,因为 fetchSomethingContinuously
没有用 close()
函数结束回调。
你需要意识到你必须在这里创建一些明确的并行性,而不是生活在暂停的世界中。您的示例代码的一个工作变体是:
val sharedFlow = MutableSharedFlow<Int>()
suspend fun startSharedFlow() {
println("Starting Shared Flow callback collection")
getMyCallbackFlow().collect {
println("emitting to sharedflow value: $it")
sharedFlow.emit(it)
}
}
fun main() = runBlocking<Unit> {
launch {
startSharedFlow()
}
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}
调用 launch
允许异步执行发射和收集值。
此外,关于 shareIn()
运算符,它只是从指定的上游创建一个 SharedFlow,就像您想要的那样。此外,您还可以使用 started
参数指定何时开始共享。关于此的更多信息 here.
这就是您在示例中使用它的方式:
fun main() = runBlocking<Unit> {
val sharedFlow = getMyCallbackFlow().shareIn(this, started = SharingStarted.Eagerly)
launch {
sharedFlow.collect {
println("collector 1 value: $it")
}
}
launch {
sharedFlow.collect {
println("collector 2 value: $it")
}
}
}
fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
println("inside callbackflow producer")
fetchSomethingContinuously {
println("fetched something")
offer(1)
offer(2)
offer(3)
//close() - call close here if you need to signal that this callback is done sending elements
}
awaitClose()
}
fun fetchSomethingContinuously(myCallBack: () -> Unit) {
println("fetching something...")
myCallBack()
}