运行 在 OpenShift 中部署的容器中的多个协程
Run several coroutines in a container deployed in OpenShift
我将以下 Kotlin 代码部署为 OpenShift 中的容器:
fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)
@kotlin.jvm.JvmOverloads
fun Application.module() {
launch { consumeProductionGeneratingUnits1hTopic() }
launch { consumeProductionLargeGeneratingUnits1hTopic() }
launch { consumeProductionAggregateProdType1hTopic() }
}
每个协程都只是在无限循环中从 kafka 主题中消费:
fun runCoroutine() {
val consumer = buildConsumer("topic")
while (true){
val record = consumer.poll(Duration.ofSeconds(30))
println(record.toString())
}
}
当我在本地运行这段代码时,三个协程都启动了。但是,当我在 OpenShift 中部署和 运行 代码作为容器时,只有前两个协程启动。看起来 OpenShift 最多支持两个协程。
有没有人经历过类似的事情?我尝试为 运行ning pod 预留更多 cpu,但这并不影响协程的行为方式。
首先要注意的是,您没有使用挂起函数,因此这些协程将通过 while(true)
循环永远阻塞它们 运行 开启的线程。协程被设计成协作的,所以你需要挂起点来允许线程切换。
对于当前的实现,如果您在只有 2 个线程的线程池上调度,前 2 个协程将阻塞它们,而第三个协程永远不会 运行。一些协程调度程序使用多个线程,这些线程取决于可用内核的数量,这可以解释本地机器(可能超过 2 个内核)和容器(可能有 2 个内核)之间的行为差异。
我无法判断您是否在具有 2 个以上线程的线程池上分派这些协程,因为您没有显示 launch
它们所在的协程范围(您的代码原样应该'编译,除非您使用的是非常旧版本的协程,其顶层 launch
没有 CoroutineScope
接收器?)。
解决方案
当然,您可以为您的 pod 分配更多核心,但这只是推动问题。
另一种选择是使用具有更多线程的线程池,但这也只是推动问题。
IMO 的正确解决方法是实际使用转换为 suspend
函数的异步 API。但更简单(快速)的修复方法是保持代码不变,但只需在循环中添加对 yield() 的调用,以确保不时为其他协程释放线程:
suspend fun runCoroutine() {
val consumer = buildConsumer("topic")
while (true){
val record = consumer.poll(Duration.ofSeconds(30))
println(record.toString())
yield() // ensures we suspend to free the thread
}
}
我将以下 Kotlin 代码部署为 OpenShift 中的容器:
fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)
@kotlin.jvm.JvmOverloads
fun Application.module() {
launch { consumeProductionGeneratingUnits1hTopic() }
launch { consumeProductionLargeGeneratingUnits1hTopic() }
launch { consumeProductionAggregateProdType1hTopic() }
}
每个协程都只是在无限循环中从 kafka 主题中消费:
fun runCoroutine() {
val consumer = buildConsumer("topic")
while (true){
val record = consumer.poll(Duration.ofSeconds(30))
println(record.toString())
}
}
当我在本地运行这段代码时,三个协程都启动了。但是,当我在 OpenShift 中部署和 运行 代码作为容器时,只有前两个协程启动。看起来 OpenShift 最多支持两个协程。
有没有人经历过类似的事情?我尝试为 运行ning pod 预留更多 cpu,但这并不影响协程的行为方式。
首先要注意的是,您没有使用挂起函数,因此这些协程将通过 while(true)
循环永远阻塞它们 运行 开启的线程。协程被设计成协作的,所以你需要挂起点来允许线程切换。
对于当前的实现,如果您在只有 2 个线程的线程池上调度,前 2 个协程将阻塞它们,而第三个协程永远不会 运行。一些协程调度程序使用多个线程,这些线程取决于可用内核的数量,这可以解释本地机器(可能超过 2 个内核)和容器(可能有 2 个内核)之间的行为差异。
我无法判断您是否在具有 2 个以上线程的线程池上分派这些协程,因为您没有显示 launch
它们所在的协程范围(您的代码原样应该'编译,除非您使用的是非常旧版本的协程,其顶层 launch
没有 CoroutineScope
接收器?)。
解决方案
当然,您可以为您的 pod 分配更多核心,但这只是推动问题。
另一种选择是使用具有更多线程的线程池,但这也只是推动问题。
IMO 的正确解决方法是实际使用转换为 suspend
函数的异步 API。但更简单(快速)的修复方法是保持代码不变,但只需在循环中添加对 yield() 的调用,以确保不时为其他协程释放线程:
suspend fun runCoroutine() {
val consumer = buildConsumer("topic")
while (true){
val record = consumer.poll(Duration.ofSeconds(30))
println(record.toString())
yield() // ensures we suspend to free the thread
}
}