单元测试 Kotlin 的 ConflatedBroadcastChannel 行为
Unit testing Kotlin's ConflatedBroadcastChannel behavior
在我目前正在处理的新项目中,我完全没有 RxJava 依赖项,因为直到现在我都不需要它 - 协程非常优雅地解决了线程问题。
在这一点上,我偶然发现了一个要求 BehaviorSubject
-alike behavior, where one can subscribe to a stream of data and receive the latest value upon subscription. As I've learned, Channel
s 在 Kotlin 中提供非常相似的行为,所以我决定试一试。
From this article I've learned, that ConflatedBroadcastChannel
是模仿 BehaviorSubject
的频道类型,所以我声明如下:
class ChannelSender {
val channel = ConflatedBroadcastChannel<String>()
fun sendToChannel(someString: String) {
GlobalScope.launch(Dispatchers.Main) { channel.send(someString) }
}
}
为了收听我这样做的频道:
class ChannelListener(val channelSender: ChannelSender) {
fun listenToChannel() {
channelSender.channel.consumeEach { someString ->
if (someString == "A") foo.perform()
else bar.perform()
}
}
}
这按预期工作,但此时我很难理解如何进行单元测试 ChannelListener
。
我试图找到与 here 相关的内容,但是 example-channel-**.kt
类 的 none 很有帮助。
感谢任何与我的错误假设相关的帮助、建议或更正。谢谢。
使用 我可以设法得到以下代码,它回答了问题:
class ChannelListenerTest {
private val val channelSender: ChannelSender = mock()
private val sut = ChannelListener(channelSender)
private val broadcastChannel = ConflatedBroadcastChannel<String>()
private val timeLimit = 1_000L
private val endMarker = "end"
@Test
fun `some description here`() = runBlocking {
whenever(channelSender.channel).thenReturn(broadcastChannel)
val sender = launch(Dispatchers.Default) {
broadcastChannel.offer("A")
yield()
}
val receiver = launch(Dispatchers.Default) {
while (isActive) {
val i = waitForEvent()
if (i == endMarker) break
yield()
}
}
try {
withTimeout(timeLimit) {
sut.listenToChannel()
sender.join()
broadcastChannel.offer(endMarker) // last event to signal receivers termination
receiver.join()
}
verify(foo).perform()
} catch (e: CancellationException) {
println("Test timed out $e")
}
}
private suspend fun waitForEvent(): String =
with(broadcastChannel.openSubscription()) {
val value = receive()
cancel()
value
}
}
在我目前正在处理的新项目中,我完全没有 RxJava 依赖项,因为直到现在我都不需要它 - 协程非常优雅地解决了线程问题。
在这一点上,我偶然发现了一个要求 BehaviorSubject
-alike behavior, where one can subscribe to a stream of data and receive the latest value upon subscription. As I've learned, Channel
s 在 Kotlin 中提供非常相似的行为,所以我决定试一试。
From this article I've learned, that ConflatedBroadcastChannel
是模仿 BehaviorSubject
的频道类型,所以我声明如下:
class ChannelSender {
val channel = ConflatedBroadcastChannel<String>()
fun sendToChannel(someString: String) {
GlobalScope.launch(Dispatchers.Main) { channel.send(someString) }
}
}
为了收听我这样做的频道:
class ChannelListener(val channelSender: ChannelSender) {
fun listenToChannel() {
channelSender.channel.consumeEach { someString ->
if (someString == "A") foo.perform()
else bar.perform()
}
}
}
这按预期工作,但此时我很难理解如何进行单元测试 ChannelListener
。
我试图找到与 here 相关的内容,但是 example-channel-**.kt
类 的 none 很有帮助。
感谢任何与我的错误假设相关的帮助、建议或更正。谢谢。
使用
class ChannelListenerTest {
private val val channelSender: ChannelSender = mock()
private val sut = ChannelListener(channelSender)
private val broadcastChannel = ConflatedBroadcastChannel<String>()
private val timeLimit = 1_000L
private val endMarker = "end"
@Test
fun `some description here`() = runBlocking {
whenever(channelSender.channel).thenReturn(broadcastChannel)
val sender = launch(Dispatchers.Default) {
broadcastChannel.offer("A")
yield()
}
val receiver = launch(Dispatchers.Default) {
while (isActive) {
val i = waitForEvent()
if (i == endMarker) break
yield()
}
}
try {
withTimeout(timeLimit) {
sut.listenToChannel()
sender.join()
broadcastChannel.offer(endMarker) // last event to signal receivers termination
receiver.join()
}
verify(foo).perform()
} catch (e: CancellationException) {
println("Test timed out $e")
}
}
private suspend fun waitForEvent(): String =
with(broadcastChannel.openSubscription()) {
val value = receive()
cancel()
value
}
}