通道流发送挂起行为
Channel flow send suspend behavior
使用以下代码示例:
val scope = CoroutineScope(Dispatchers.IO)
val flow = channelFlow {
println("1")
send("1")
println("2")
send("2")
}.buffer(0)
scope.launch {
flow.collect {
println("collect $it")
delay(5000)
}
}
输出如下:
1
2 // should be printed after collect 1
collect 1
collect 2 // after 5000ms
预计:
打印 1,然后打印收集 1,等待 5 秒,然后打印 2
发送功能似乎没有挂起,缓冲区设置为 0 或 RENDEZVOUS,使用标准流程并按预期发出挂起工作,是否有另一个操作员,或者通道流是否可以挂起(有一个容量为 0/1 的缓冲区)?
正如 Flow.buffer() 的文档所述,此函数创建了两个协程:一个生产者和一个消费者。这些协程同时工作。这意味着在启动 collect()
块来处理一个项目时,另一侧的 send()
已经恢复。我相信 2
和 collect 1
之间存在竞争条件,但实际上顺序可能是确定性的。
“正常”流程与 emit()
的工作方式不同。它按顺序工作,因此生产者和消费者不会同时运行。 emit()
暂停,直到消费者完成对前一项的处理并请求另一个。
使用以下代码示例:
val scope = CoroutineScope(Dispatchers.IO)
val flow = channelFlow {
println("1")
send("1")
println("2")
send("2")
}.buffer(0)
scope.launch {
flow.collect {
println("collect $it")
delay(5000)
}
}
输出如下:
1
2 // should be printed after collect 1
collect 1
collect 2 // after 5000ms
预计: 打印 1,然后打印收集 1,等待 5 秒,然后打印 2
发送功能似乎没有挂起,缓冲区设置为 0 或 RENDEZVOUS,使用标准流程并按预期发出挂起工作,是否有另一个操作员,或者通道流是否可以挂起(有一个容量为 0/1 的缓冲区)?
正如 Flow.buffer() 的文档所述,此函数创建了两个协程:一个生产者和一个消费者。这些协程同时工作。这意味着在启动 collect()
块来处理一个项目时,另一侧的 send()
已经恢复。我相信 2
和 collect 1
之间存在竞争条件,但实际上顺序可能是确定性的。
“正常”流程与 emit()
的工作方式不同。它按顺序工作,因此生产者和消费者不会同时运行。 emit()
暂停,直到消费者完成对前一项的处理并请求另一个。