Kotlin:如果没有人在阅读,在 redezvous 频道上发送不会暂停
Kotlin: send on redezvous Channel does not suspend if nobody is reading
我不明白为什么这个程序没有结束:
object RedezvousExample {
@JvmStatic
fun main(args: Array<String>) {
setOptionToShowCoroutineNames()
runBlocking {
val ioChannel = Channel<String>()
val outputFunction: StringDestination = ChannelDestination(ioChannel, this)
val producerJob = launch(Dispatchers.Default) { producer(outputFunction) }
val producedValue = ioChannel.receive()
logMsg("Received $producedValue")
producerJob.cancel()
}
}
}
fun producer(output: StringDestination) {
for(i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
class ChannelDestination(val output: SendChannel<String>, val coroutineScope: CoroutineScope) : StringDestination {
override fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}
}
fun logMsg(msg: Any?) = println("${threadName()}$msg")
根据我对集合点通道的(可怜的)理解,我认为如果没有人在读取通道,对 output.send 的调用将被阻塞,而生产者似乎连续写入通道,即使有没有待处理的接收方法。
谢谢
每次要发送新值时都创建一个新协程。
operator fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}
这不会暂停,因为异步只是构建启动并忘记(但会延迟等待)。
显然 output.send(line)
行挂起,并释放将 Continuation(异步块内的协程)添加到暂停状态且不发送值的线程。但是 logMsg()
仍然是通过 for 循环调用的,因为 for 循环从未暂停。
为了解决这个问题,您的 invoke()
函数需要以某种方式挂起,直到 send() 调用恢复,这样 for 循环才会挂起。您不能启动新协程,因为它们无论如何都不会挂起。
// make producer suspend
suspend fun producer(output: ChannelDestination) {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
// make it suspend as well
suspend operator fun invoke(line: String) {
logMsg("Sending $line")
output.send(line)
}
您已经在 Dispatchers.Default
中启动了 producer()
功能,因此所有内容都会 运行 并在某人收到它之前暂停。
@Animesh 的回答非常好。我是这样阐述的:
object RedezvousExample01 {
@JvmStatic
fun main(args: Array<String>) {
setOptionToShowCoroutineNames()
runBlocking {
val ioChannel = Channel<String>()
val outputFunction: StringDestination = ChannelDestination(ioChannel)
launch { producer(outputFunction) }
val producedValue = ioChannel.receive()
ioChannel.close()
logMsg("Received $producedValue")
}
}
}
fun producer(output: StringDestination) {
try {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
} catch (e: ClosedSendChannelException) {}
}
class ChannelDestination(val output: SendChannel<String>) : StringDestination {
override fun invoke(line: String) {
runBlocking {
logMsg("Sending $line")
output.send(line)
}
}
}
无论如何我不得不关闭通道以结束协程,我猜为什么 job.cancel() 方法不起作用。
我不明白为什么这个程序没有结束:
object RedezvousExample {
@JvmStatic
fun main(args: Array<String>) {
setOptionToShowCoroutineNames()
runBlocking {
val ioChannel = Channel<String>()
val outputFunction: StringDestination = ChannelDestination(ioChannel, this)
val producerJob = launch(Dispatchers.Default) { producer(outputFunction) }
val producedValue = ioChannel.receive()
logMsg("Received $producedValue")
producerJob.cancel()
}
}
}
fun producer(output: StringDestination) {
for(i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
class ChannelDestination(val output: SendChannel<String>, val coroutineScope: CoroutineScope) : StringDestination {
override fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}
}
fun logMsg(msg: Any?) = println("${threadName()}$msg")
根据我对集合点通道的(可怜的)理解,我认为如果没有人在读取通道,对 output.send 的调用将被阻塞,而生产者似乎连续写入通道,即使有没有待处理的接收方法。
谢谢
每次要发送新值时都创建一个新协程。
operator fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}
这不会暂停,因为异步只是构建启动并忘记(但会延迟等待)。
显然 output.send(line)
行挂起,并释放将 Continuation(异步块内的协程)添加到暂停状态且不发送值的线程。但是 logMsg()
仍然是通过 for 循环调用的,因为 for 循环从未暂停。
为了解决这个问题,您的 invoke()
函数需要以某种方式挂起,直到 send() 调用恢复,这样 for 循环才会挂起。您不能启动新协程,因为它们无论如何都不会挂起。
// make producer suspend
suspend fun producer(output: ChannelDestination) {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
// make it suspend as well
suspend operator fun invoke(line: String) {
logMsg("Sending $line")
output.send(line)
}
您已经在 Dispatchers.Default
中启动了 producer()
功能,因此所有内容都会 运行 并在某人收到它之前暂停。
@Animesh 的回答非常好。我是这样阐述的:
object RedezvousExample01 {
@JvmStatic
fun main(args: Array<String>) {
setOptionToShowCoroutineNames()
runBlocking {
val ioChannel = Channel<String>()
val outputFunction: StringDestination = ChannelDestination(ioChannel)
launch { producer(outputFunction) }
val producedValue = ioChannel.receive()
ioChannel.close()
logMsg("Received $producedValue")
}
}
}
fun producer(output: StringDestination) {
try {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
} catch (e: ClosedSendChannelException) {}
}
class ChannelDestination(val output: SendChannel<String>) : StringDestination {
override fun invoke(line: String) {
runBlocking {
logMsg("Sending $line")
output.send(line)
}
}
}
无论如何我不得不关闭通道以结束协程,我猜为什么 job.cancel() 方法不起作用。