使用无休止的流程测试 Kotlin Flow.combine

Testing Kotlin Flow.combine with unending flows

我有以下测试

@Test
fun combineUnendingFlows() = runBlockingTest {

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1, 2, 3)

    val combinations = mutableListOf<String>()

    combine(source1, source2) { first, second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(this)

    advanceUntilIdle()

    assertThat(combinations).containsExactly("a1", "b1", "b2", "b3")
}

断言成功,但测试失败,异常:

kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs

我知道这是人为设计的,我们可以通过确保 source1 完成来轻松通过,但我想知道为什么它会失败? runBlockingTest 测试永无止境的流程的方法是错误的吗?

(这是协程1.4.0)。

问题的关键似乎是 runBlockingTest(可能是正确的),预计在它完成时不会有 运行 个工作。

如果我们可以显式取消 runBlockingTest 提供的范围,这会很好,但是尝试在该范围上调用 .cancel() 会引发异常。

this GitHub issue 中有更多关于此问题的讨论。

在这个人为设计的示例中,获取对 launchIn() 创建的 Job 的引用并在测试结束前取消它就足够简单了。

更复杂的场景可以捕获 UncompletedCoroutinesError 并忽略它,因为他们希望有未完成的协程,或者他们可以创建一个新的子 CoroutineScope 来显式取消。在这个例子中,它看起来像

@Test
fun combineUnendingFlows() = runBlockingTest {
    val job = Job()
    val childScope = CoroutineScope(this.coroutineContext + job)

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1, 2, 3)

    val combinations = mutableListOf<String>()

    combine(source1, source2) { first, second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(childScope)

    advanceUntilIdle()
    assertThat(combinations).containsExactly("a1", "b1", "b2", "b3")
    job.cancel()
}