为什么 mutableSharedFlow subscriptionCount 在使用 ShareIn 时会发生变化

why mutableSharedFlow subscriptionCount changes when use ShareIn

为什么会改变是否使用ShareIn func

val sharedf = MutableSharedFlow<Int>()

fun Flow<Int>.print() : Flow<Int> {
    return map {
          println("in print : $it")
          it
       }
    }

fun Flow<String>.printS() : Flow<Int> {
    return map {
        println("in print : $it")
        it.toInt()
    }
}

fun Flow<Int>.toNext() : Flow<Int> {
    return merge(
        filterIsInstance<Int>().print(),
        filterIsInstance<String>().printS(),
    )
}

当我这样使用主函数时

fun main() = runBlocking<Unit> {
    launch {

        sharedf.onSubscription{
            println("subsCount : ${sharedf.subscriptionCount.value}")
        }
        .shareIn(GlobalScope, SharingStarted.Lazily)
        .toNext()
        .collect()

    }

    launch {
        for (i in 0..3){
            delay(500)
            sharedf.emit(i)
        }
    }
}

订阅人数:1

但如果我删除 ShareIn

订阅人数:2

为什么我必须使用 ShareIn 即使 sharedf 已经是 MutableSharedFlow

查看 toNext,您可以看到它通过使用两次 filterIsInstance 从接收方 (this) 流构建了 2 个流,并合并了这 2 个流。因此,当你收集someFlow.toNext()时,它会收集两倍于原始源流someFlow

这就是为什么如果您不在主方法中放置 shareIn,您会在 sharedf 上看到 2 个订阅者。由于 toNext() 的实施,toNext().collect() 订阅了两次共享流,正如我们刚刚看到的那样。

现在,如果您在中间添加 shareIn(),则会创建 另一个独立的 共享流。我们称它为shared2,并将其提取到一个变量中以使其更清晰:

val shared2 = sharedf.onSubscription { ... }.shareIn(GlobalScope, SharingStarted.Lazily)
shared2.toNext().collect()

在某种程度上,新的共享流 shared2 “保护”源流 sharedf 免受收集器的侵害。当很多采集器采集shared2时,shared2只采集一次源流sharedf.

所以,即使 toNext() 收集了 shared2 两次,shared2 只收集了 sharedf 一次,这就是为什么您在 sharedf 上只看到 1 个订阅在你的日志中。

why I have to Use ShareIn even sharedf is already MutableSharedFlow

是什么让您认为您必须这样做?通过 shareIn 从已经是共享流的流中创建共享流确实出乎意料。


旁注:您可以在 print 函数中使用 onEach 而不是 map

shareIn 的工作方式非常简单,就是启动一个无限期运行的协同程序,并将源流收集到 re-emit 这些值。因此,与 mapmerge 运算符创建仅在收集时订阅源的冷流不同,shareIn 创建一个立即开始从源流收集的流,因此它立即被计为源流的订阅者。

下游 toNext() 包装第二个 SharedFlow,而不是第一个。第一个不知道下游订阅者。它仅从 shareIn 调用发送到单个 SharedFlow,因此它只有一个订阅者。

当您删除 shareIn 时,您的顶级 SharedFlow 将发送给在您的 toNext()merge() 调用中创建的两个订阅者,因此它会看到两个订阅者.请注意,在调用 collect 之前,它不会看到任何订阅者,因为它们是冷包装。