运行 在 OpenShift 中部署的容器中的多个协程

Run several coroutines in a container deployed in OpenShift

我将以下 Kotlin 代码部署为 OpenShift 中的容器:

fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

@kotlin.jvm.JvmOverloads
fun Application.module() {

    launch { consumeProductionGeneratingUnits1hTopic() }
    launch { consumeProductionLargeGeneratingUnits1hTopic() }
    launch { consumeProductionAggregateProdType1hTopic() }

}

每个协程都只是在无限循环中从 kafka 主题中消费:

fun runCoroutine() {
    val consumer = buildConsumer("topic")
    while (true){
        val record = consumer.poll(Duration.ofSeconds(30))
        println(record.toString())
    }
}

当我在本地运行这段代码时,三个协程都启动了。但是,当我在 OpenShift 中部署和 运行 代码作为容器时,只有前两个协程启动。看起来 OpenShift 最多支持两个协程。

有没有人经历过类似的事情?我尝试为 运行ning pod 预留更多 cpu,但这并不影响协程的行为方式。

首先要注意的是,您没有使用挂起函数,因此这些协程将通过 while(true) 循环永远阻塞它们 运行 开启的线程。协程被设计成协作的,所以你需要挂起点来允许线程切换。

对于当前的实现,如果您在只有 2 个线程的线程池上调度,前 2 个协程将阻塞它们,而第三个协程永远不会 运行。一些协程调度程序使用多个线程,这些线程取决于可用内核的数量,这可以解释本地机器(可能超过 2 个内核)和容器(可能有 2 个内核)之间的行为差​​异。

我无法判断您是否在具有 2 个以上线程的线程池上分派这些协程,因为您没有显示 launch 它们所在的协程范围(您的代码原样应该'编译,除非您使用的是非常旧版本的协程,其顶层 launch 没有 CoroutineScope 接收器?)。

解决方案

当然,您可以为您的 pod 分配更多核心,但这只是推动问题。

另一种选择是使用具有更多线程的线程池,但这也只是推动问题。

IMO 的正确解决方法是实际使用转换为 suspend 函数的异步 API。但更简单(快速)的修复方法是保持代码不变,但只需在循环中添加对 yield() 的调用,以确保不时为其他协程释放线程:

suspend fun runCoroutine() {
    val consumer = buildConsumer("topic")
    while (true){
        val record = consumer.poll(Duration.ofSeconds(30))
        println(record.toString())
        yield() // ensures we suspend to free the thread
    }
}