如何确保清除 ktor websocket 客户端创建的所有 Kotlin 协程?
How can I make sure all Kotlin coroutines created by a ktor websocket client are cleared up?
我正在努力研究 Kotlin 协程和 Ktors websocket 支持。我的理解是 runBlocking
将创建一个范围,只要该范围(或子范围)内存在协程,它就会阻塞,但我在下面的测试中调用 runBlocking
returns 还有两个协程还活着..
为什么我在这里泄漏协程?
package dummy
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.wss
import io.ktor.http.HttpMethod
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readBytes
import io.ktor.http.cio.websocket.readText
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.DebugProbes
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
@ExperimentalCoroutinesApi
@KtorExperimentalAPI
class WebsocketTest {
@Test
fun tidy() {
DebugProbes.install()
runBlocking {
val socketJob = Job()
launch(CoroutineName("Websocket") + socketJob) {
println("Connecting to websocket")
connectWebsocket(socketJob)
println("Websocket dead?")
}
launch(CoroutineName("Ninja socket killer")) {
delay(3500)
println("Killing websocket client")
socketJob.cancel(message = "Time to die..")
}
}
println("\n\n-------")
DebugProbes.dumpCoroutines(System.err)
Assertions.assertEquals(0, DebugProbes.dumpCoroutinesInfo().size, "It would be nice if all coroutines had been cleared up by now..")
}
}
@KtorExperimentalAPI
private suspend fun connectWebsocket(socketJob: CompletableJob) {
val client = HttpClient {
install(WebSockets)
}
socketJob.invokeOnCompletion {
println("Shutting down ktor http client")
client.close()
}
client.wss(
method = HttpMethod.Get,
host = "echo.websocket.org",
port = 443,
path = "/"
) {
send(Frame.Text("Hello World"))
for (frame in incoming) {
when (frame) {
is Frame.Text -> println(frame.readText())
is Frame.Binary -> println(frame.readBytes())
}
delay(1000)
send(Frame.Text("Hello World"))
}
}
}
build.gradle.kts
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestLogEvent
plugins {
kotlin("jvm") version "1.3.41" apply true
}
repositories {
mavenCentral()
}
val ktorVersion = "1.2.3"
val junitVersion = "5.5.1"
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("io.ktor:ktor-client-websockets:$ktorVersion")
implementation("io.ktor:ktor-client-okhttp:$ktorVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-RC2")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
}
tasks.withType<Test> {
useJUnitPlatform()
testLogging {
showExceptions = true
showStackTraces = true
exceptionFormat = TestExceptionFormat.FULL
events = setOf(TestLogEvent.PASSED, TestLogEvent.SKIPPED, TestLogEvent.FAILED)
}
}
好像我已经弄明白了(显然是在把我的头发扯得足够长以首先制作这个 post 之后)。当我写 post 时,我泄露了两个协同程序,其中一个 "solved itself" (我对此不是很满意,但无论我做什么我都无法重现它)。
第二个协程泄露是因为 Nonce.kt 来自 Ktor 明确地在 GlobalScope 中启动一个协程。
https://github.com/ktorio/ktor/blob/master/ktor-utils/jvm/src/io/ktor/util/Nonce.kt#L30
private val nonceGeneratorJob =
GlobalScope.launch(
context = Dispatchers.IO + NonCancellable + NonceGeneratorCoroutineName,
start = CoroutineStart.LAZY
) { ....
我正在努力研究 Kotlin 协程和 Ktors websocket 支持。我的理解是 runBlocking
将创建一个范围,只要该范围(或子范围)内存在协程,它就会阻塞,但我在下面的测试中调用 runBlocking
returns 还有两个协程还活着..
为什么我在这里泄漏协程?
package dummy
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.wss
import io.ktor.http.HttpMethod
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readBytes
import io.ktor.http.cio.websocket.readText
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.DebugProbes
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
@ExperimentalCoroutinesApi
@KtorExperimentalAPI
class WebsocketTest {
@Test
fun tidy() {
DebugProbes.install()
runBlocking {
val socketJob = Job()
launch(CoroutineName("Websocket") + socketJob) {
println("Connecting to websocket")
connectWebsocket(socketJob)
println("Websocket dead?")
}
launch(CoroutineName("Ninja socket killer")) {
delay(3500)
println("Killing websocket client")
socketJob.cancel(message = "Time to die..")
}
}
println("\n\n-------")
DebugProbes.dumpCoroutines(System.err)
Assertions.assertEquals(0, DebugProbes.dumpCoroutinesInfo().size, "It would be nice if all coroutines had been cleared up by now..")
}
}
@KtorExperimentalAPI
private suspend fun connectWebsocket(socketJob: CompletableJob) {
val client = HttpClient {
install(WebSockets)
}
socketJob.invokeOnCompletion {
println("Shutting down ktor http client")
client.close()
}
client.wss(
method = HttpMethod.Get,
host = "echo.websocket.org",
port = 443,
path = "/"
) {
send(Frame.Text("Hello World"))
for (frame in incoming) {
when (frame) {
is Frame.Text -> println(frame.readText())
is Frame.Binary -> println(frame.readBytes())
}
delay(1000)
send(Frame.Text("Hello World"))
}
}
}
build.gradle.kts
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestLogEvent
plugins {
kotlin("jvm") version "1.3.41" apply true
}
repositories {
mavenCentral()
}
val ktorVersion = "1.2.3"
val junitVersion = "5.5.1"
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("io.ktor:ktor-client-websockets:$ktorVersion")
implementation("io.ktor:ktor-client-okhttp:$ktorVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-RC2")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
}
tasks.withType<Test> {
useJUnitPlatform()
testLogging {
showExceptions = true
showStackTraces = true
exceptionFormat = TestExceptionFormat.FULL
events = setOf(TestLogEvent.PASSED, TestLogEvent.SKIPPED, TestLogEvent.FAILED)
}
}
好像我已经弄明白了(显然是在把我的头发扯得足够长以首先制作这个 post 之后)。当我写 post 时,我泄露了两个协同程序,其中一个 "solved itself" (我对此不是很满意,但无论我做什么我都无法重现它)。
第二个协程泄露是因为 Nonce.kt 来自 Ktor 明确地在 GlobalScope 中启动一个协程。
https://github.com/ktorio/ktor/blob/master/ktor-utils/jvm/src/io/ktor/util/Nonce.kt#L30
private val nonceGeneratorJob =
GlobalScope.launch(
context = Dispatchers.IO + NonCancellable + NonceGeneratorCoroutineName,
start = CoroutineStart.LAZY
) { ....