用更好的东西替换 GlobalScope.launch

Replacing GlobalScope.launch with something better

我正在重构以下代码,将 CompletableFuture API 包装成可以与协程一起使用的东西,但它使用的是 GlobalScope.launch { ... },这是不鼓励的:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
    val cf = CompletableFuture<T>()
    try {
        this.connectionPool.inTransaction { connection ->
            GlobalScope.launch {
                try {
                    cf.complete(f(connection))
                } catch (e: Throwable) {
                    cf.completeExceptionally(e)
                }
            }
            cf
        }
    } catch (e: Throwable) {
        log.error(e.message ?: "", e)
        cf.completeExceptionally(e)
    }
    return cf.await()
}

摆脱 CompletableFuture 并将其替换为 CompletableDeferred 是比较容易的部分:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
    val cdf = CompletableDeferred<T>()
    try {
        connectionPool.inTransaction { connection ->
            GlobalScope.launch {
                try {
                    cdf.complete(f(connection))
                } catch (e: Throwable) {
                    cdf.completeExceptionally(e)
                }
            }
            cdf.asCompletableFuture()
        }
    } catch (e: Throwable) {
        log.error(e.message ?: "", e)
        cdf.completeExceptionally(e)
    }
    return cdf.await()
}

inTransaction API 需要 CompletableFuture,我假设这是为了向后兼容 Java

override fun <A> inTransaction(f: (Connection) -> CompletableFuture<A>):
            CompletableFuture<A> =
        objectPool.use(configuration.executionContext) { it.inTransaction(f) }

因为我在 CoroutineScope 之外,所以我不能只调用 launch { ... } 并且因为 f 是一个挂起函数,所以该部分需要在 CoroutineScope

connectionPool.inTransaction 包装在 coroutineScope 中以替换 GlobalScope 在运行时锁定 ...

    try {
        coroutineScope {
            connectionPool.inTransaction { connection ->
                launch {
                    try {
                        cdf.complete(f(connection))
                    } catch (e: Throwable) {
                        cdf.completeExceptionally(e)
                    }
                }
                cdf.asCompletableFuture()
            }
        }
    } catch (e: Throwable) {

类似
    try {
        coroutineScope {
            connectionPool.inTransaction { connection ->
                async {
                    try {
                        cdf.complete(f(connection))
                    } catch (e: Throwable) {
                        cdf.completeExceptionally(e)
                    }
                }
                cdf.asCompletableFuture()
            }
        }
    } catch (e: Throwable) {

添加一些好的 ol'println 调试:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
        val cdf = CompletableDeferred<T>()
        println("1")
        try {
            println("2")
            coroutineScope {
                println("3")
                connectionPool.inTransaction { connection ->
                    println("4")
                    launch {
                        println("5")
                        try {
                            println("6")
                            cdf.complete(f(connection))
                            println("7")
                        } catch (e: Throwable) {
                            println("8")
                            cdf.completeExceptionally(e)
                            println("9")
                        }
                    }
                    println("10")
                    cdf.asCompletableFuture()
                }
            }
        } catch (e: Throwable) {
            log.error(e.message ?: "", e)
            cdf.completeExceptionally(e)
        }
        println("11")
        return cdf.await()
    }

输出:

1
2
3
11
4
10

后跟查询超时的堆栈跟踪

timeout query item <postgres-connection-2> after 73751 ms and was not cleaned by connection as it should, will destroy it - timeout is 30000

这意味着 launch 中的代码永远不会执行,与 async 类似,我假设某些线程在某处被阻塞。

CoroutineScope(Dispatchers.IO).launch { ... } 替换 GlobalScope.launch 可行(与 Dispatchers.Unconfined) 类似),但这是正确的解决方案吗? 如果 CompletableFuture 是线程阻塞的,那么这可能比 GlobalScope.launch 解决方案更好...

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
        val cdf = CompletableDeferred<T>()
        try {
            connectionPool.inTransaction { connection ->
                CoroutineScope(Dispatchers.IO).launch {
                    try {
                        cdf.complete(f(connection))
                    } catch (e: Throwable) {
                        cdf.completeExceptionally(e)
                    }
                }
                cdf.asCompletableFuture()
            }
        } catch (e: Throwable) {
            log.error(e.message ?: "", e)
            cdf.completeExceptionally(e)
        }
        return cdf.await()
    }

有什么关于摆脱它的正确方法的建议吗GlobalScope.launch

这很复杂,但我相信 launch()/async() 没有执行的原因是他们的父 coroutineScope() 在这个时间点已经完成了。请注意“11”出现在“4”之前,这意味着您在退出 coroutineScope() 之后调用了 launch()。这是有道理的,因为 inTransaction() 启动异步操作,所以它立即 returns,无需等待内部代码。要解决此问题,您只需将 cdf.await() 移动到 coroutineScope().

另一件让我担心的事情是,您等待您自己创建的可完成项,而不是 return 从 inTransaction() 编辑的。请注意,它可能是一个完全不同的 CompletableFuture,在这种情况下,您实际上 return 在操作完成之前。

此外,我不确定是否真的有必要对可完成的手动异常处理。 async() 已经执行异常处理并将结果包装为 CompleteableDeferred,然后将其转换为 CompletableFuture 也包装异常。我们唯一要做的就是将 coroutineScope() 替换为 supervisorScope()。否则,async() 会自动通知 coroutineScope() 失败,因此异常处理将完全绕过 inTransaction() 函数。

试试这个代码:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
    return supervisorScope {
        connectionPool.inTransaction { connection ->
            async { f(connection) }.asCompletableFuture()
        }.await()
    }
}