Flow<T>.distinctUntilChanged() 不工作

Flow<T>.distinctUntilChanged() is not working

尝试在SharedFlow处于活动状态(subscriptionCount大于零)时有一个协程运行并在计数时被取消滴。但不知何故,即使像 distinctUntilChanged() 这样简单的东西也无法正常工作,我感到困惑。

为此,我正在制作这样的“onActive”扩展:

fun <T : Any> MutableSharedFlow<T>.onActive(
    block: suspend CoroutineScope.() -> Unit
): Flow<T> {

    val original = this

    val isActiveFlow: Flow<Boolean> = subscriptionCount
        .map {
            println("Class: Count is $it")
            it > 0
        }
        .distinctUntilChanged()

    return isActiveFlow.flatMapLatest { isActive ->
        println("Class: isActive is $isActive")
        // here would be the code that calls `block`
        // but just this exactly as is, already triggers the error

        original // still emits the original flow, 
                 // that is needed or else subscriptionCount never changes
    }
}

这最初似乎可行,但是 运行对其添加多个订阅者的测试将连续多次打印“isActive is true”。为什么 distinctUntilChanged() 不起作用?此重复调用与编辑区域中的其余逻辑混淆。

测试是这样的:

    @Test
    fun `onActive is called only once with multiple subscribers`() = runBlocking {

        val flow = MutableSharedFlow<Int>(
            replay = 2,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        ).apply {
            repeat(5) { tryEmit(it) }
        }.onActive {

        }

        val jobs = mutableListOf<Job>()
        repeat(3) { count ->
            jobs.add(flow.onEach {
                println("Test:  Listener $count received $it")
            }.launchIn(this))
        }
        delay(100)
        jobs.forEach { it.cancel() }
        jobs.forEach { it.join() }
    }

运行宁此输出是:

Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4
Test:  Listener 1 received 3
Test:  Listener 1 received 4
Test:  Listener 2 received 3
Test:  Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4

那么问题来了,为什么 distinctUntilChanged() 不起作用,我该如何解决?

distinctUntilChanged 而言,您看到的行为似乎是正确的:

  • 第一个注册订阅者收集原始的 2 个具有起始 isActive=false 值的重播元素
  • 然后isActive因为第一个订阅而变为真,所以第一个订阅者由于flatMapLatest而重新收集原始流,从而再次获得重播元素
  • 其他 2 个订阅者在 subscriptionCount 已经非 0 时到达,因此 isActive 对他们保持真实,直到他们被取消

如果您“在有订阅者的情况下”启动的协程旨在生成 SharedFlow 中的元素,我宁愿最初将流程定义为 channelFlow/callbackFlow,然后使用 shareInSharingStarted.WhileSubscribed 来获得这种“运行 当有 susbcribers 时”的行为。

如果它“就在旁边”,您可能需要一个外部范围并单独启动一个协程来收听 sharedFlow.subscribersCount 和 start/stop “sidecar”协程。