单元测试 Kotlin 的 ConflatedBroadcastChannel 行为

Unit testing Kotlin's ConflatedBroadcastChannel behavior

在我目前正在处理的新项目中,我完全没有 RxJava 依赖项,因为直到现在我都不需要它 - 协程非常优雅地解决了线程问题。

在这一点上,我偶然发现了一个要求 BehaviorSubject-alike behavior, where one can subscribe to a stream of data and receive the latest value upon subscription. As I've learned, Channels 在 Kotlin 中提供非常相似的行为,所以我决定试一试。

From this article I've learned, that ConflatedBroadcastChannel 是模仿 BehaviorSubject 的频道类型,所以我声明如下:

class ChannelSender {

    val channel = ConflatedBroadcastChannel<String>()

    fun sendToChannel(someString: String) {
         GlobalScope.launch(Dispatchers.Main) { channel.send(someString) }
    }
}

为了收听我这样做的频道:


    class ChannelListener(val channelSender: ChannelSender) {
        fun listenToChannel() {
            channelSender.channel.consumeEach { someString ->
                if (someString == "A") foo.perform() 
                else bar.perform()
            }
        }
    }

这按预期工作,但此时我很难理解如何进行单元测试 ChannelListener

我试图找到与 here 相关的内容,但是 example-channel-**.kt 类 的 none 很有帮助。

感谢任何与我的错误假设相关的帮助、建议或更正。谢谢。

使用 我可以设法得到以下代码,它回答了问题:

class ChannelListenerTest {

  private val val channelSender: ChannelSender = mock()

  private val sut = ChannelListener(channelSender)
  private val broadcastChannel = ConflatedBroadcastChannel<String>()

  private val timeLimit = 1_000L
  private val endMarker = "end"

  @Test
  fun `some description here`() = runBlocking {
    whenever(channelSender.channel).thenReturn(broadcastChannel)

    val sender = launch(Dispatchers.Default) {
      broadcastChannel.offer("A")
      yield()
    }

    val receiver = launch(Dispatchers.Default) {
      while (isActive) {
        val i = waitForEvent()
        if (i == endMarker) break
        yield()
      }
    }

    try {
      withTimeout(timeLimit) {
        sut.listenToChannel()
        sender.join()
        broadcastChannel.offer(endMarker) // last event to signal receivers termination
        receiver.join()
      }
      verify(foo).perform()
    } catch (e: CancellationException) {
      println("Test timed out $e")
    }
  }

  private suspend fun waitForEvent(): String =
      with(broadcastChannel.openSubscription()) {
        val value = receive()
        cancel()
        value
      }

}