尝试调用 运行 阻塞时,Kotlin Vertx with Coroutines 阻塞

Kotlin Vertx with Coroutines blocks when trying to call run blocking

我正在使用公开回调函数的第三方库。回调函数将在成功时被调用。回调函数不是挂起函数,但是当我尝试在非挂起函数内部调用 return 挂起函数的结果时,该挂起函数使用 aysnc 和 await 进行 IO 调用,调用永远不会被强制执行。下面我想出了一个简单的代码片段来演示这个问题。

open class TestVerticle: CoroutineVerticle() {

  override suspend fun start() {

    awaitBlockingExample()

  }

 fun awaitBlockingExample():String {

    val future= async(vertx.dispatcher()) {

        makeSuspendFunCall()
     }
     val result:String= runBlocking(vertx.dispatcher()){future.await()}
     println(" The final Result is $result")
     return result
   }

  suspend fun makeSuspendFunCall():String{
    println("Comming here 3")
    delay(500)
    val resp="Test"
    return resp
  }

}
fun main(args: Array<String>) = runBlocking {
    Vertx.vertx().deployVerticle("TestVerticle")
}

如果我在makeSuspendFunCall 中去掉延迟函数,程序运行s 可以正常运行,但是如果我添加延迟函数,它就会挂起。我实际上是在这里使用延迟函数模拟挂起函数网络调用。在这种情况下如何从 awaitBlockingExample 获得结果?我清楚地知道,通过将 awaitBlockingExample 设置为挂起函数,我可以完成这项工作并删除内部的异步和 运行 阻塞调用。但是这里的 awaitBlockingExample(非挂起函数)表示由本方库提供的实现,它在我们的实现中被覆盖了。例如guava cache提供了reload功能,我想覆盖reload功能(非suspend功能),在reload方法中调用协程函数,从数据库或网络调用刷新缓存值。

问题在于 vertx.dispatcher() 使用单个线程作为事件循环并且 runBlocking 阻塞了该线程。

详情:

您的 awaitBlockingExample() 函数在此 Vertx 事件循环线程上是 运行,因为它是由 suspend start() 函数触发的。如果你调用 runBlocking() 这个 Vertx 线程被阻塞并且永远不会被释放。但是你的其他协程,例如async(),现在没有线程可以做他们的工作。

解决方案:

我假设从 start 函数调用 awaitBlockingExample 只发生在这个例子中。实际上我会假设外部回调使用它自己的线程。那么就完全没有问题了,因为现在外线程被阻塞了:

override suspend fun start() {

    //simulate own thread for external callback
    thread {
        awaitBlockingExample()
    }
}

fun awaitBlockingExample():String {

    val future= async(vertx.dispatcher()) {

        makeSuspendFunCall()
    }
    val result:String= runBlocking(vertx.dispatcher()){future.await()}
    println(" The final Result is $result")
    return result
}

顺便说一句:你不需要 async() 块,你可以直接从 runBlocking()

调用 makeSuspendFunCall()
fun awaitBlockingExample():String = runBlocking(vertx.dispatcher()){
    val result = makeSuspendFunCall()
    println(" The final Result is $result")
    result
}

尝试下一种方法:

override fun start() {
    GlobalScope.launch {
        val result = awaitBlockingExample()
    }
}

suspend fun awaitBlockingExample(): String {
    val response =  makeSuspendFunCall()
    println(" The final Result is $response")
    return response
}

suspend fun makeSuspendFunCall():String{
    println("Comming here 3")
    return suspendCoroutine {
        delay(500)
        val resp="Test"
        it.resume(resp)
    }
}

对于 Kotlin 1.3.0 及更高版本

private val mainScope = CoroutineScope(Dispatchers.Main)

fun start(){
        mainScope.launch { 
            val data = withContext(Dispatchers.IO){
                //This function will return the result. Return type of the below function will be type of data variable above.
                awaitBlockingExample()
            }
            //use your data result from async call. Result will be available here as soon as awaitBlockingExample() return it.
        }
        //Your function will continue execution without waiting for async call to finish.
    }

 fun awaitBlockingExample():String {
    //Your Logic
   }

希望这会有所帮助。