为什么 mutableSharedFlow subscriptionCount 在使用 ShareIn 时会发生变化
why mutableSharedFlow subscriptionCount changes when use ShareIn
为什么会改变是否使用ShareIn func
val sharedf = MutableSharedFlow<Int>()
fun Flow<Int>.print() : Flow<Int> {
return map {
println("in print : $it")
it
}
}
fun Flow<String>.printS() : Flow<Int> {
return map {
println("in print : $it")
it.toInt()
}
}
fun Flow<Int>.toNext() : Flow<Int> {
return merge(
filterIsInstance<Int>().print(),
filterIsInstance<String>().printS(),
)
}
当我这样使用主函数时
fun main() = runBlocking<Unit> {
launch {
sharedf.onSubscription{
println("subsCount : ${sharedf.subscriptionCount.value}")
}
.shareIn(GlobalScope, SharingStarted.Lazily)
.toNext()
.collect()
}
launch {
for (i in 0..3){
delay(500)
sharedf.emit(i)
}
}
}
订阅人数:1
但如果我删除 ShareIn
订阅人数:2
为什么我必须使用 ShareIn 即使 sharedf 已经是 MutableSharedFlow
查看 toNext
,您可以看到它通过使用两次 filterIsInstance
从接收方 (this
) 流构建了 2 个流,并合并了这 2 个流。因此,当你收集someFlow.toNext()
时,它会收集两倍于原始源流someFlow
。
这就是为什么如果您不在主方法中放置 shareIn
,您会在 sharedf
上看到 2 个订阅者。由于 toNext()
的实施,toNext().collect()
订阅了两次共享流,正如我们刚刚看到的那样。
现在,如果您在中间添加 shareIn()
,则会创建 另一个独立的 共享流。我们称它为shared2
,并将其提取到一个变量中以使其更清晰:
val shared2 = sharedf.onSubscription { ... }.shareIn(GlobalScope, SharingStarted.Lazily)
shared2.toNext().collect()
在某种程度上,新的共享流 shared2
“保护”源流 sharedf
免受收集器的侵害。当很多采集器采集shared2
时,shared2
只采集一次源流sharedf
.
所以,即使 toNext()
收集了 shared2
两次,shared2
只收集了 sharedf
一次,这就是为什么您在 sharedf
上只看到 1 个订阅在你的日志中。
why I have to Use ShareIn even sharedf is already MutableSharedFlow
是什么让您认为您必须这样做?通过 shareIn
从已经是共享流的流中创建共享流确实出乎意料。
旁注:您可以在 print
函数中使用 onEach
而不是 map
shareIn
的工作方式非常简单,就是启动一个无限期运行的协同程序,并将源流收集到 re-emit 这些值。因此,与 map
和 merge
运算符创建仅在收集时订阅源的冷流不同,shareIn
创建一个立即开始从源流收集的流,因此它立即被计为源流的订阅者。
下游 toNext()
包装第二个 SharedFlow,而不是第一个。第一个不知道下游订阅者。它仅从 shareIn
调用发送到单个 SharedFlow,因此它只有一个订阅者。
当您删除 shareIn
时,您的顶级 SharedFlow 将发送给在您的 toNext()
的 merge()
调用中创建的两个订阅者,因此它会看到两个订阅者.请注意,在调用 collect
之前,它不会看到任何订阅者,因为它们是冷包装。
为什么会改变是否使用ShareIn func
val sharedf = MutableSharedFlow<Int>()
fun Flow<Int>.print() : Flow<Int> {
return map {
println("in print : $it")
it
}
}
fun Flow<String>.printS() : Flow<Int> {
return map {
println("in print : $it")
it.toInt()
}
}
fun Flow<Int>.toNext() : Flow<Int> {
return merge(
filterIsInstance<Int>().print(),
filterIsInstance<String>().printS(),
)
}
当我这样使用主函数时
fun main() = runBlocking<Unit> {
launch {
sharedf.onSubscription{
println("subsCount : ${sharedf.subscriptionCount.value}")
}
.shareIn(GlobalScope, SharingStarted.Lazily)
.toNext()
.collect()
}
launch {
for (i in 0..3){
delay(500)
sharedf.emit(i)
}
}
}
订阅人数:1
但如果我删除 ShareIn
订阅人数:2
为什么我必须使用 ShareIn 即使 sharedf 已经是 MutableSharedFlow
查看 toNext
,您可以看到它通过使用两次 filterIsInstance
从接收方 (this
) 流构建了 2 个流,并合并了这 2 个流。因此,当你收集someFlow.toNext()
时,它会收集两倍于原始源流someFlow
。
这就是为什么如果您不在主方法中放置 shareIn
,您会在 sharedf
上看到 2 个订阅者。由于 toNext()
的实施,toNext().collect()
订阅了两次共享流,正如我们刚刚看到的那样。
现在,如果您在中间添加 shareIn()
,则会创建 另一个独立的 共享流。我们称它为shared2
,并将其提取到一个变量中以使其更清晰:
val shared2 = sharedf.onSubscription { ... }.shareIn(GlobalScope, SharingStarted.Lazily)
shared2.toNext().collect()
在某种程度上,新的共享流 shared2
“保护”源流 sharedf
免受收集器的侵害。当很多采集器采集shared2
时,shared2
只采集一次源流sharedf
.
所以,即使 toNext()
收集了 shared2
两次,shared2
只收集了 sharedf
一次,这就是为什么您在 sharedf
上只看到 1 个订阅在你的日志中。
why I have to Use ShareIn even sharedf is already MutableSharedFlow
是什么让您认为您必须这样做?通过 shareIn
从已经是共享流的流中创建共享流确实出乎意料。
旁注:您可以在 print
函数中使用 onEach
而不是 map
shareIn
的工作方式非常简单,就是启动一个无限期运行的协同程序,并将源流收集到 re-emit 这些值。因此,与 map
和 merge
运算符创建仅在收集时订阅源的冷流不同,shareIn
创建一个立即开始从源流收集的流,因此它立即被计为源流的订阅者。
下游 toNext()
包装第二个 SharedFlow,而不是第一个。第一个不知道下游订阅者。它仅从 shareIn
调用发送到单个 SharedFlow,因此它只有一个订阅者。
当您删除 shareIn
时,您的顶级 SharedFlow 将发送给在您的 toNext()
的 merge()
调用中创建的两个订阅者,因此它会看到两个订阅者.请注意,在调用 collect
之前,它不会看到任何订阅者,因为它们是冷包装。