Kotlin:如果没有人在阅读,在 redezvous 频道上发送不会暂停

Kotlin: send on redezvous Channel does not suspend if nobody is reading

我不明白为什么这个程序没有结束:

object RedezvousExample {
    @JvmStatic
    fun main(args: Array<String>) {
        setOptionToShowCoroutineNames()
        runBlocking {
            val ioChannel = Channel<String>()
            val outputFunction: StringDestination = ChannelDestination(ioChannel, this)
            val producerJob = launch(Dispatchers.Default) { producer(outputFunction) }
            val producedValue = ioChannel.receive()
            logMsg("Received $producedValue")
            producerJob.cancel()
        }
    }
}

fun producer(output: StringDestination) {
    for(i in 1..Int.MAX_VALUE) {
        logMsg("Producing $i")
        output("Iteration $i")
    }
}

class ChannelDestination(val output: SendChannel<String>, val coroutineScope: CoroutineScope) : StringDestination {
    override fun invoke(line: String) {
        coroutineScope.async(Dispatchers.Default) {
            logMsg("Sending $line")
            output.send(line)
        }
    }
}

fun logMsg(msg: Any?) = println("${threadName()}$msg")

根据我对集合点通道的(可怜的)理解,我认为如果没有人在读取通道,对 output.send 的调用将被阻塞,而生产者似乎连续写入通道,即使有没有待处理的接收方法。

谢谢

每次要发送新值时都创建一个新协程。

operator fun invoke(line: String) {
    coroutineScope.async(Dispatchers.Default) {
        logMsg("Sending $line")
        output.send(line)
    }
}

这不会暂停,因为异步只是构建启动并忘记(但会延迟等待)。

显然 output.send(line) 行挂起,并释放将 Continuation(异步块内的协程)添加到暂停状态且不发送值的线程。但是 logMsg() 仍然是通过 for 循环调用的,因为 for 循环从未暂停。

为了解决这个问题,您的 invoke() 函数需要以某种方式挂起,直到 send() 调用恢复,这样 for 循环才会挂起。您不能启动新协程,因为它们无论如何都不会挂起。

// make producer suspend
suspend fun producer(output: ChannelDestination) {
    for (i in 1..Int.MAX_VALUE) {
        logMsg("Producing $i")
        output("Iteration $i")
    }
}

// make it suspend as well
suspend operator fun invoke(line: String) {
    logMsg("Sending $line")
    output.send(line)
}

您已经在 Dispatchers.Default 中启动了 producer() 功能,因此所有内容都会 运行 并在某人收到它之前暂停。

@Animesh 的回答非常好。我是这样阐述的:

object RedezvousExample01 {
    @JvmStatic
    fun main(args: Array<String>) {
        setOptionToShowCoroutineNames()
        runBlocking {
            val ioChannel = Channel<String>()
            val outputFunction: StringDestination = ChannelDestination(ioChannel)
            launch { producer(outputFunction) }
            val producedValue = ioChannel.receive()
            ioChannel.close()
            logMsg("Received $producedValue")
        }
    }
}

fun producer(output: StringDestination) {
    try {
        for (i in 1..Int.MAX_VALUE) {
            logMsg("Producing $i")
            output("Iteration $i")
        }
    } catch (e: ClosedSendChannelException) {}
}

class ChannelDestination(val output: SendChannel<String>) : StringDestination {
    override fun invoke(line: String) {
        runBlocking {
            logMsg("Sending $line")
            output.send(line)
        }
    }
}

无论如何我不得不关闭通道以结束协程,我猜为什么 job.cancel() 方法不起作用。