用更好的东西替换 GlobalScope.launch
Replacing GlobalScope.launch with something better
我正在重构以下代码,将 CompletableFuture API 包装成可以与协程一起使用的东西,但它使用的是 GlobalScope.launch { ... }
,这是不鼓励的:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cf = CompletableFuture<T>()
try {
this.connectionPool.inTransaction { connection ->
GlobalScope.launch {
try {
cf.complete(f(connection))
} catch (e: Throwable) {
cf.completeExceptionally(e)
}
}
cf
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cf.completeExceptionally(e)
}
return cf.await()
}
摆脱 CompletableFuture 并将其替换为 CompletableDeferred 是比较容易的部分:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cdf = CompletableDeferred<T>()
try {
connectionPool.inTransaction { connection ->
GlobalScope.launch {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cdf.completeExceptionally(e)
}
return cdf.await()
}
inTransaction API 需要 CompletableFuture,我假设这是为了向后兼容 Java
override fun <A> inTransaction(f: (Connection) -> CompletableFuture<A>):
CompletableFuture<A> =
objectPool.use(configuration.executionContext) { it.inTransaction(f) }
因为我在 CoroutineScope 之外,所以我不能只调用 launch { ... }
并且因为 f
是一个挂起函数,所以该部分需要在 CoroutineScope
内
将 connectionPool.inTransaction
包装在 coroutineScope
中以替换 GlobalScope 在运行时锁定 ...
try {
coroutineScope {
connectionPool.inTransaction { connection ->
launch {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
}
} catch (e: Throwable) {
与
类似
try {
coroutineScope {
connectionPool.inTransaction { connection ->
async {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
}
} catch (e: Throwable) {
添加一些好的 ol'println 调试:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cdf = CompletableDeferred<T>()
println("1")
try {
println("2")
coroutineScope {
println("3")
connectionPool.inTransaction { connection ->
println("4")
launch {
println("5")
try {
println("6")
cdf.complete(f(connection))
println("7")
} catch (e: Throwable) {
println("8")
cdf.completeExceptionally(e)
println("9")
}
}
println("10")
cdf.asCompletableFuture()
}
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cdf.completeExceptionally(e)
}
println("11")
return cdf.await()
}
输出:
1
2
3
11
4
10
后跟查询超时的堆栈跟踪
timeout query item <postgres-connection-2> after 73751 ms and was not cleaned by connection as it should, will destroy it - timeout is 30000
这意味着 launch
中的代码永远不会执行,与 async
类似,我假设某些线程在某处被阻塞。
用 CoroutineScope(Dispatchers.IO).launch { ... }
替换 GlobalScope.launch 可行(与 Dispatchers.Unconfined)
类似),但这是正确的解决方案吗?
如果 CompletableFuture 是线程阻塞的,那么这可能比 GlobalScope.launch 解决方案更好...
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cdf = CompletableDeferred<T>()
try {
connectionPool.inTransaction { connection ->
CoroutineScope(Dispatchers.IO).launch {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cdf.completeExceptionally(e)
}
return cdf.await()
}
有什么关于摆脱它的正确方法的建议吗GlobalScope.launch
?
这很复杂,但我相信 launch()
/async()
没有执行的原因是他们的父 coroutineScope()
在这个时间点已经完成了。请注意“11”出现在“4”之前,这意味着您在退出 coroutineScope()
之后调用了 launch()
。这是有道理的,因为 inTransaction()
启动异步操作,所以它立即 returns,无需等待内部代码。要解决此问题,您只需将 cdf.await()
移动到 coroutineScope()
.
内
另一件让我担心的事情是,您等待您自己创建的可完成项,而不是 return 从 inTransaction()
编辑的。请注意,它可能是一个完全不同的 CompletableFuture
,在这种情况下,您实际上 return 在操作完成之前。
此外,我不确定是否真的有必要对可完成的手动异常处理。 async()
已经执行异常处理并将结果包装为 CompleteableDeferred
,然后将其转换为 CompletableFuture
也包装异常。我们唯一要做的就是将 coroutineScope()
替换为 supervisorScope()
。否则,async()
会自动通知 coroutineScope()
失败,因此异常处理将完全绕过 inTransaction()
函数。
试试这个代码:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
return supervisorScope {
connectionPool.inTransaction { connection ->
async { f(connection) }.asCompletableFuture()
}.await()
}
}
我正在重构以下代码,将 CompletableFuture API 包装成可以与协程一起使用的东西,但它使用的是 GlobalScope.launch { ... }
,这是不鼓励的:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cf = CompletableFuture<T>()
try {
this.connectionPool.inTransaction { connection ->
GlobalScope.launch {
try {
cf.complete(f(connection))
} catch (e: Throwable) {
cf.completeExceptionally(e)
}
}
cf
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cf.completeExceptionally(e)
}
return cf.await()
}
摆脱 CompletableFuture 并将其替换为 CompletableDeferred 是比较容易的部分:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cdf = CompletableDeferred<T>()
try {
connectionPool.inTransaction { connection ->
GlobalScope.launch {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cdf.completeExceptionally(e)
}
return cdf.await()
}
inTransaction API 需要 CompletableFuture,我假设这是为了向后兼容 Java
override fun <A> inTransaction(f: (Connection) -> CompletableFuture<A>):
CompletableFuture<A> =
objectPool.use(configuration.executionContext) { it.inTransaction(f) }
因为我在 CoroutineScope 之外,所以我不能只调用 launch { ... }
并且因为 f
是一个挂起函数,所以该部分需要在 CoroutineScope
将 connectionPool.inTransaction
包装在 coroutineScope
中以替换 GlobalScope 在运行时锁定 ...
try {
coroutineScope {
connectionPool.inTransaction { connection ->
launch {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
}
} catch (e: Throwable) {
与
类似 try {
coroutineScope {
connectionPool.inTransaction { connection ->
async {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
}
} catch (e: Throwable) {
添加一些好的 ol'println 调试:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cdf = CompletableDeferred<T>()
println("1")
try {
println("2")
coroutineScope {
println("3")
connectionPool.inTransaction { connection ->
println("4")
launch {
println("5")
try {
println("6")
cdf.complete(f(connection))
println("7")
} catch (e: Throwable) {
println("8")
cdf.completeExceptionally(e)
println("9")
}
}
println("10")
cdf.asCompletableFuture()
}
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cdf.completeExceptionally(e)
}
println("11")
return cdf.await()
}
输出:
1
2
3
11
4
10
后跟查询超时的堆栈跟踪
timeout query item <postgres-connection-2> after 73751 ms and was not cleaned by connection as it should, will destroy it - timeout is 30000
这意味着 launch
中的代码永远不会执行,与 async
类似,我假设某些线程在某处被阻塞。
用 CoroutineScope(Dispatchers.IO).launch { ... }
替换 GlobalScope.launch 可行(与 Dispatchers.Unconfined)
类似),但这是正确的解决方案吗?
如果 CompletableFuture 是线程阻塞的,那么这可能比 GlobalScope.launch 解决方案更好...
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
val cdf = CompletableDeferred<T>()
try {
connectionPool.inTransaction { connection ->
CoroutineScope(Dispatchers.IO).launch {
try {
cdf.complete(f(connection))
} catch (e: Throwable) {
cdf.completeExceptionally(e)
}
}
cdf.asCompletableFuture()
}
} catch (e: Throwable) {
log.error(e.message ?: "", e)
cdf.completeExceptionally(e)
}
return cdf.await()
}
有什么关于摆脱它的正确方法的建议吗GlobalScope.launch
?
这很复杂,但我相信 launch()
/async()
没有执行的原因是他们的父 coroutineScope()
在这个时间点已经完成了。请注意“11”出现在“4”之前,这意味着您在退出 coroutineScope()
之后调用了 launch()
。这是有道理的,因为 inTransaction()
启动异步操作,所以它立即 returns,无需等待内部代码。要解决此问题,您只需将 cdf.await()
移动到 coroutineScope()
.
另一件让我担心的事情是,您等待您自己创建的可完成项,而不是 return 从 inTransaction()
编辑的。请注意,它可能是一个完全不同的 CompletableFuture
,在这种情况下,您实际上 return 在操作完成之前。
此外,我不确定是否真的有必要对可完成的手动异常处理。 async()
已经执行异常处理并将结果包装为 CompleteableDeferred
,然后将其转换为 CompletableFuture
也包装异常。我们唯一要做的就是将 coroutineScope()
替换为 supervisorScope()
。否则,async()
会自动通知 coroutineScope()
失败,因此异常处理将完全绕过 inTransaction()
函数。
试试这个代码:
suspend fun <T> transaction(f: suspend (Connection) -> T): T {
return supervisorScope {
connectionPool.inTransaction { connection ->
async { f(connection) }.asCompletableFuture()
}.await()
}
}