Kotlin 协程未来等待超时(无取消)

Kotlin coroutine future await with timeout (no cancellation)

鉴于我们有一个 CompletableFuture f,在 kotlin 可挂起范围内我们可以调用 f.await() 我们将挂起直到它完成。

我在实现带有签名 f.await(t) 的类似功能时遇到问题,该功能必须暂停最多 t 毫秒,或者如果 future 在该持续时间内完成(以先发生者为准)则更快 return ).

这是我试过的。

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }

   return future.isDone

}

fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }

   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

我的工作也需要类似的功能。但也许这个解决方案也会帮助我...

此输出是

computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done

我写了一些测试代码:

fun main(args: Array<String>) = runBlocking {
    val future = calculateAsync()
    val result = future.await(2000)
    println("result=$result")
}

suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
    val future = this
    var result: T? = null
    try {
        withTimeout(duration) {
            result = future.await()
        }
    } catch (t: TimeoutCancellationException) {
        println("timeout exception")
    } catch (e: Throwable) {
        e.printStackTrace()
    }

    return result
}

@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
    val completableFuture = CompletableFuture<String>()

    Executors.newCachedThreadPool().submit {
        Thread.sleep(3000)
        println("after sleep")
        completableFuture.complete("Completed")
    }

    return completableFuture
}

在我们 运行 这段代码之后,我们将得到一个输出:

timeout exception
result=null
after sleep

我们看到我们的扩展函数 await returns null 因为我们将超时设置为 2000 毫秒,但 CompletableFuture 在 3000 毫秒后完成。在这种情况下 CompletableFuture 被取消了(它的 isCancelled 属性 returns true),但是我们 运行 在 calculateAsync 中的线程函数继续执行(我们在日志中看到它 after sleep)。

如果我们在 main 函数中将超时持续时间设置为 4000 毫秒 future.await(4000),我们将看到下一个输出:

after sleep
result=Completed

现在我们有了一些结果,因为 CompletableFuture 的执行速度快于 4000 毫秒。

这是我想到的,我认为这不是一个好的解决方案,因为我很可能会为相当原始的任务创建大量垃圾。


suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
   val timeout = CompletableFuture<Unit>()

   GlobalScope.launch {
      delay(duration)
      timeout.complete(Unit)
   }

   val anyOfTwo = CompletableFuture.anyOf(this, timeout)
   anyOfTwo.await()
   return this.isDone
}


fun main() = runBlocking {
   val future = CompletableFuture<String>()

   GlobalScope.launch {
      delay(2000)
      println("setting the result (future now ${future.isDone})")
      future.complete("something")
   }

   while (future.isNotDone()) {
      println("waiting for the future to complete for the next 500ms")
      val isDone = future.await(500)

      if (isDone) {
         println("future is done")
         break
      } else {

         println("future not done")
      }
   }

   Unit
}

这将给出

的输出
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
setting the result (future now false)
future is done

这就是我们想要的...