如何改进这段代码并将其重写为 kotlin 协程

How this piece of code can be improved and rewritten to kotlin coroutines

我正在尝试实现功能:我有一个调用代码的休息端点,执行可能会花费很多时间。我现在改善体验的想法是将那段代码包装为一个新线程,等待完成或等待某个最大时间过去,然后 return 一条适当的消息。即使端点已经发回消息,包装代码也应该完成。当前的实现如下所示:

private const val N = 1000
private const val MAX_WAIT_TIME = 5000

@RestController
@RequestMapping("/long")
class SomeController(
    val service: SomeService,
) {
    private val executor = Executors.newFixedThreadPool(N)

    @PostMapping
    fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
        val submit = executor.submit {
            service.longExecution(someParam)
        }
        val start = System.currentTimeMillis()

        while (System.currentTimeMillis() - start < MAX_WAIT_TIME) {
            if (submit.isDone)
                return ResponseEntity.ok("Done")
        }
        return ResponseEntity.ok("Check later")
    }
}

第一个问题 - waiting while for time 似乎不对,我们不释放线程,是否可以改进?

更重要的问题 - 如何将其重写为 Kotlin 协程? 我的尝试很简单,任务完成后没有 returning,看起来像这样:

    @PostMapping
    fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {

        val result = async {
            withContext(Dispatchers.Default) {
                service.longExecution(someParam)
            }
        }
        delay(MAX_WAIT_TIME)
        return@runBlocking ResponseEntity.ok(if(result.isCompleted) "Done" else "Check later")
    }

但即使 return 编辑了正确的字符串,答案也不会发送,直到 longExecution 完成。如何解决这个问题,我错过了什么?也许协程在这里应用不当?

您的实施总是等待 MAX_WAIT_TIME。这可能有效:

@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {

    try {
        withTimeout(MAX_WAIT_TIME) {
            async {
                withContext(Dispatchers.Default) {
                    service.longExecution(someParam)
                }
            }
        }
    } catch (ex: CancellationException) {
        return@runBlocking ResponseEntity.ok("Check later")
    }
    return@runBlocking ResponseEntity.ok("Done")
}

虽然我不确定是否会有任何不需要的副作用,因为这似乎会在达到 MAX_WAIT_TIME 时取消协程。在这里阅读更多相关信息:
取消和超时

您当前的协程尝试存在几个问题:

  1. 您正在 runBlocking 的范围内启动您的 async 计算,因此整体端点方法将等待子协程完成,尽管您之前尝试 return-ing那。
  2. delay() 将始终等待 MAX_WAIT_TIME,即使任务比
  3. 完成得更快
  4. (可选)如果您的框架支持异步控制器方法,则根本不必使用 runBlocking(Spring WebFlux 确实支持控制器中的 suspend 函数)

对于第一个问题,请记住每次启动一个应该比您的函数寿命更长的协程时,您都必须使用外部作用域。 coroutineScoperunBlocking 在这些情况下不合适,因为它们会等待您的子协程完成。

您可以使用 CoroutineScope() 工厂函数来创建作用域,但您需要考虑协程的生命周期以及何时取消它。如果 longExecution 函数有一个错误并永远挂起,你不想泄露调用它的协程并破坏你的内存,所以你应该以某种方式取消那些协程。这就是为什么你应该将范围作为变量存储在你的 class 中并在适当的时候取消它(当你想放弃这些操作时)。

对于第二个问题,使用 withTimeout 很常见,但它不适合您的用例,因为您希望任务即使在等待超时后仍继续运行。一种可能的解决方案是使用 select 子句等待作业完成,或者等待某个指定的最长时间:

// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())

@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
    val job = scope.launch {
        longExecution()
    }
    
    val resultText = runBlocking {
        select {
            job.onJoin() { "Done" }
            onTimeout(MAX_WAIT_TIME) { "Check later" } 
        }
    }
    return ResponseEntity.ok(resultText)
}

注意:我使用 launch 而不是 async,因为您似乎不需要 longExecution 的 return 值。


如果你也想解决问题 #3,你可以简单地声明你的处理程序 suspend 并删除 select:

周围的 runBlocking
// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())

@PostMapping
suspend fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
    val job = scope.launch {
        longExecution()
    }
    
    val resultText = select {
        job.onJoin() { "Done" }
        onTimeout(MAX_WAIT_TIME) { "Check later" }
    }
    return ResponseEntity.ok(resultText)
}

请注意,这需要 spring-boot-starter-webflux 而不是 spring-boot-starter-web