流消息未在单元测试中传递

Flow message not delivered in unit test

我有一个从 MutableSharedFlow(在我的应用程序中充当 EventBus)读取消息的消费者。我正在尝试编写一个单元测试来表明将消息传递到 Flow 会触发我的监听器。

这是我的流程定义:

class MessageBus {

    private val _messages = MutableSharedFlow<Message>()
    val messages = _messages.asSharedFlow()

    suspend fun send(message: Message) {
        _messages.emit(message)
    }
}

这是监听器:

class Listener(private val messageBus: MessageBus) {

    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    init {
        scope.launch {
            messageBus.messages.collectLatest { message ->
                when (message) {
                    is CustomMessage -> handleCustomMessage(message)
                }
            }
        }
    }

最后是我的单元测试:

class CommandTest {
    @Test
    fun `should process CustomMessage`(): Unit = runBlocking {
        val messageBus = MessageBus()
        val listener = Listener(messageBus)
        messageBus.send(CustomMessage("test command"))
        //argumentCaptor...verify[removed for brevity]
    }
}

不幸的是,上面的代码没有触发我的监听器中的断点(触发了行 init 上的断点,但从未收到消息并且 collectLatest 块中没有触发断点)。

我什至尝试在验证语句前添加一个Thread.sleep(5_000),但结果是一样的。我是否遗漏了一些关于协程如何工作的明显信息?

编辑:如果重要的话,这不是一个 Android 项目。简单的 Kotlin + Ktor

首先,我建议阅读这篇 article 关于如何在 Android 中测试流程的文章。

其次,在您的示例中,问题是由 Listener 内的范围硬编码引起的。您应该将作用域作为参数传递并将其注入到测试中:

class Listener(private val messageBus: MessageBus, private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()))

class CommandTest {
    @Test
    fun `should process CustomMessage`(): Unit = runBlockingTest {
        val messageBus = MessageBus()
        val listener = Listener(messageBus, this)
        messageBus.send(CustomMessage("test command"))
        //argumentCaptor...verify[removed for brevity]
    }
}

我还建议使用 runBlockingTest 而不是 runBlocking,这样您的测试就不必实际等待。如果测试完成后遗留任何协程运行,它也会失败。

我想既然代码在 Listenerinit 块中,一旦你在测试中初始化 val listener = Listener(messageBus, this)它读取所有消息,此时你有 none 然后在下一行你发出一条消息 messageBus.send(CustomMessage("test command")) 但你的 launch 块到那时应该已经完成​​。您可以先发出消息,也可以将 launch 放在一个循环中,或者放在发出消息后可以调用的其他方法中

你可以使用这样的东西

class Emitter {
  private val emitter: MutableSharedFlow<String> = MutableSharedFlow()

  suspend fun publish(messages: Flow<String>) = messages.onEach {
      emitter.emit(it)
  }.collect()

  fun stream(): Flow<String> = emitter
}

onEach 末尾的 collect 最初将作为终端操作触发收集...我需要进一步了解 emit,因为它在所有情况下以及在本案例中使用时都无法像我预期的那样工作你最初的方式它不会post你的流程中的任何东西,除非你先收集来处理

然后在您的收集器中

class Collector {

suspend fun collect(emitter: Emitter): Unit = coroutineScope {
    println("Starting collection...")
    emitter.stream().collect { println("collecting message: $it") }
  }
}

那么你的主要(或测试)

fun main() = runBlocking {
  withContext(Dispatchers.Default + Job()) {
    val emitter = Emitter()
    val collector = Collector()

    launch {
        collector.collect(emitter)
    }

    emitter.publish(listOf("article#1", "article#2", "article#3", "article#4").asFlow())
  }
}

输出:

Starting collection...
collecting message: article#1
collecting message: article#2
collecting message: article#3
collecting message: article#4