Flow<T>.distinctUntilChanged() 不工作
Flow<T>.distinctUntilChanged() is not working
我尝试在SharedFlow处于活动状态(subscriptionCount
大于零)时有一个协程运行并在计数时被取消滴。但不知何故,即使像 distinctUntilChanged()
这样简单的东西也无法正常工作,我感到困惑。
为此,我正在制作这样的“onActive”扩展:
fun <T : Any> MutableSharedFlow<T>.onActive(
block: suspend CoroutineScope.() -> Unit
): Flow<T> {
val original = this
val isActiveFlow: Flow<Boolean> = subscriptionCount
.map {
println("Class: Count is $it")
it > 0
}
.distinctUntilChanged()
return isActiveFlow.flatMapLatest { isActive ->
println("Class: isActive is $isActive")
// here would be the code that calls `block`
// but just this exactly as is, already triggers the error
original // still emits the original flow,
// that is needed or else subscriptionCount never changes
}
}
这最初似乎可行,但是 运行对其添加多个订阅者的测试将连续多次打印“isActive is true”。为什么 distinctUntilChanged()
不起作用?此重复调用与编辑区域中的其余逻辑混淆。
测试是这样的:
@Test
fun `onActive is called only once with multiple subscribers`() = runBlocking {
val flow = MutableSharedFlow<Int>(
replay = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
).apply {
repeat(5) { tryEmit(it) }
}.onActive {
}
val jobs = mutableListOf<Job>()
repeat(3) { count ->
jobs.add(flow.onEach {
println("Test: Listener $count received $it")
}.launchIn(this))
}
delay(100)
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
}
运行宁此输出是:
Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
Test: Listener 1 received 3
Test: Listener 1 received 4
Test: Listener 2 received 3
Test: Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
那么问题来了,为什么 distinctUntilChanged()
不起作用,我该如何解决?
就 distinctUntilChanged
而言,您看到的行为似乎是正确的:
- 第一个注册订阅者收集原始的 2 个具有起始
isActive=false
值的重播元素
- 然后
isActive
因为第一个订阅而变为真,所以第一个订阅者由于flatMapLatest
而重新收集原始流,从而再次获得重播元素
- 其他 2 个订阅者在
subscriptionCount
已经非 0 时到达,因此 isActive
对他们保持真实,直到他们被取消
如果您“在有订阅者的情况下”启动的协程旨在生成 SharedFlow
中的元素,我宁愿最初将流程定义为 channelFlow
/callbackFlow
,然后使用 shareIn 和 SharingStarted.WhileSubscribed
来获得这种“运行 当有 susbcribers 时”的行为。
如果它“就在旁边”,您可能需要一个外部范围并单独启动一个协程来收听 sharedFlow.subscribersCount
和 start/stop “sidecar”协程。
我尝试在SharedFlow处于活动状态(subscriptionCount
大于零)时有一个协程运行并在计数时被取消滴。但不知何故,即使像 distinctUntilChanged()
这样简单的东西也无法正常工作,我感到困惑。
为此,我正在制作这样的“onActive”扩展:
fun <T : Any> MutableSharedFlow<T>.onActive(
block: suspend CoroutineScope.() -> Unit
): Flow<T> {
val original = this
val isActiveFlow: Flow<Boolean> = subscriptionCount
.map {
println("Class: Count is $it")
it > 0
}
.distinctUntilChanged()
return isActiveFlow.flatMapLatest { isActive ->
println("Class: isActive is $isActive")
// here would be the code that calls `block`
// but just this exactly as is, already triggers the error
original // still emits the original flow,
// that is needed or else subscriptionCount never changes
}
}
这最初似乎可行,但是 运行对其添加多个订阅者的测试将连续多次打印“isActive is true”。为什么 distinctUntilChanged()
不起作用?此重复调用与编辑区域中的其余逻辑混淆。
测试是这样的:
@Test
fun `onActive is called only once with multiple subscribers`() = runBlocking {
val flow = MutableSharedFlow<Int>(
replay = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
).apply {
repeat(5) { tryEmit(it) }
}.onActive {
}
val jobs = mutableListOf<Job>()
repeat(3) { count ->
jobs.add(flow.onEach {
println("Test: Listener $count received $it")
}.launchIn(this))
}
delay(100)
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
}
运行宁此输出是:
Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
Test: Listener 1 received 3
Test: Listener 1 received 4
Test: Listener 2 received 3
Test: Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
那么问题来了,为什么 distinctUntilChanged()
不起作用,我该如何解决?
就 distinctUntilChanged
而言,您看到的行为似乎是正确的:
- 第一个注册订阅者收集原始的 2 个具有起始
isActive=false
值的重播元素 - 然后
isActive
因为第一个订阅而变为真,所以第一个订阅者由于flatMapLatest
而重新收集原始流,从而再次获得重播元素 - 其他 2 个订阅者在
subscriptionCount
已经非 0 时到达,因此isActive
对他们保持真实,直到他们被取消
如果您“在有订阅者的情况下”启动的协程旨在生成 SharedFlow
中的元素,我宁愿最初将流程定义为 channelFlow
/callbackFlow
,然后使用 shareIn 和 SharingStarted.WhileSubscribed
来获得这种“运行 当有 susbcribers 时”的行为。
如果它“就在旁边”,您可能需要一个外部范围并单独启动一个协程来收听 sharedFlow.subscribersCount
和 start/stop “sidecar”协程。