如何改进这段代码并将其重写为 kotlin 协程
How this piece of code can be improved and rewritten to kotlin coroutines
我正在尝试实现功能:我有一个调用代码的休息端点,执行可能会花费很多时间。我现在改善体验的想法是将那段代码包装为一个新线程,等待完成或等待某个最大时间过去,然后 return 一条适当的消息。即使端点已经发回消息,包装代码也应该完成。当前的实现如下所示:
private const val N = 1000
private const val MAX_WAIT_TIME = 5000
@RestController
@RequestMapping("/long")
class SomeController(
val service: SomeService,
) {
private val executor = Executors.newFixedThreadPool(N)
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val submit = executor.submit {
service.longExecution(someParam)
}
val start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < MAX_WAIT_TIME) {
if (submit.isDone)
return ResponseEntity.ok("Done")
}
return ResponseEntity.ok("Check later")
}
}
第一个问题 - waiting while for time 似乎不对,我们不释放线程,是否可以改进?
更重要的问题 - 如何将其重写为 Kotlin 协程?
我的尝试很简单,任务完成后没有 returning,看起来像这样:
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {
val result = async {
withContext(Dispatchers.Default) {
service.longExecution(someParam)
}
}
delay(MAX_WAIT_TIME)
return@runBlocking ResponseEntity.ok(if(result.isCompleted) "Done" else "Check later")
}
但即使 return 编辑了正确的字符串,答案也不会发送,直到 longExecution
完成。如何解决这个问题,我错过了什么?也许协程在这里应用不当?
您的实施总是等待 MAX_WAIT_TIME。这可能有效:
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {
try {
withTimeout(MAX_WAIT_TIME) {
async {
withContext(Dispatchers.Default) {
service.longExecution(someParam)
}
}
}
} catch (ex: CancellationException) {
return@runBlocking ResponseEntity.ok("Check later")
}
return@runBlocking ResponseEntity.ok("Done")
}
虽然我不确定是否会有任何不需要的副作用,因为这似乎会在达到 MAX_WAIT_TIME 时取消协程。在这里阅读更多相关信息:
取消和超时
您当前的协程尝试存在几个问题:
- 您正在
runBlocking
的范围内启动您的 async
计算,因此整体端点方法将等待子协程完成,尽管您之前尝试 return
-ing那。
delay()
将始终等待 MAX_WAIT_TIME
,即使任务比 完成得更快
- (可选)如果您的框架支持异步控制器方法,则根本不必使用
runBlocking
(Spring WebFlux 确实支持控制器中的 suspend
函数)
对于第一个问题,请记住每次启动一个应该比您的函数寿命更长的协程时,您都必须使用外部作用域。 coroutineScope
或 runBlocking
在这些情况下不合适,因为它们会等待您的子协程完成。
您可以使用 CoroutineScope()
工厂函数来创建作用域,但您需要考虑协程的生命周期以及何时取消它。如果 longExecution
函数有一个错误并永远挂起,你不想泄露调用它的协程并破坏你的内存,所以你应该以某种方式取消那些协程。这就是为什么你应该将范围作为变量存储在你的 class 中并在适当的时候取消它(当你想放弃这些操作时)。
对于第二个问题,使用 withTimeout
很常见,但它不适合您的用例,因为您希望任务即使在等待超时后仍继续运行。一种可能的解决方案是使用 select 子句等待作业完成,或者等待某个指定的最长时间:
// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val job = scope.launch {
longExecution()
}
val resultText = runBlocking {
select {
job.onJoin() { "Done" }
onTimeout(MAX_WAIT_TIME) { "Check later" }
}
}
return ResponseEntity.ok(resultText)
}
注意:我使用 launch
而不是 async
,因为您似乎不需要 longExecution
的 return 值。
如果你也想解决问题 #3,你可以简单地声明你的处理程序 suspend
并删除 select
:
周围的 runBlocking
// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())
@PostMapping
suspend fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val job = scope.launch {
longExecution()
}
val resultText = select {
job.onJoin() { "Done" }
onTimeout(MAX_WAIT_TIME) { "Check later" }
}
return ResponseEntity.ok(resultText)
}
请注意,这需要 spring-boot-starter-webflux
而不是 spring-boot-starter-web
。
我正在尝试实现功能:我有一个调用代码的休息端点,执行可能会花费很多时间。我现在改善体验的想法是将那段代码包装为一个新线程,等待完成或等待某个最大时间过去,然后 return 一条适当的消息。即使端点已经发回消息,包装代码也应该完成。当前的实现如下所示:
private const val N = 1000
private const val MAX_WAIT_TIME = 5000
@RestController
@RequestMapping("/long")
class SomeController(
val service: SomeService,
) {
private val executor = Executors.newFixedThreadPool(N)
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val submit = executor.submit {
service.longExecution(someParam)
}
val start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < MAX_WAIT_TIME) {
if (submit.isDone)
return ResponseEntity.ok("Done")
}
return ResponseEntity.ok("Check later")
}
}
第一个问题 - waiting while for time 似乎不对,我们不释放线程,是否可以改进?
更重要的问题 - 如何将其重写为 Kotlin 协程? 我的尝试很简单,任务完成后没有 returning,看起来像这样:
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {
val result = async {
withContext(Dispatchers.Default) {
service.longExecution(someParam)
}
}
delay(MAX_WAIT_TIME)
return@runBlocking ResponseEntity.ok(if(result.isCompleted) "Done" else "Check later")
}
但即使 return 编辑了正确的字符串,答案也不会发送,直到 longExecution
完成。如何解决这个问题,我错过了什么?也许协程在这里应用不当?
您的实施总是等待 MAX_WAIT_TIME。这可能有效:
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {
try {
withTimeout(MAX_WAIT_TIME) {
async {
withContext(Dispatchers.Default) {
service.longExecution(someParam)
}
}
}
} catch (ex: CancellationException) {
return@runBlocking ResponseEntity.ok("Check later")
}
return@runBlocking ResponseEntity.ok("Done")
}
虽然我不确定是否会有任何不需要的副作用,因为这似乎会在达到 MAX_WAIT_TIME 时取消协程。在这里阅读更多相关信息:
取消和超时
您当前的协程尝试存在几个问题:
- 您正在
runBlocking
的范围内启动您的async
计算,因此整体端点方法将等待子协程完成,尽管您之前尝试return
-ing那。 delay()
将始终等待MAX_WAIT_TIME
,即使任务比 完成得更快
- (可选)如果您的框架支持异步控制器方法,则根本不必使用
runBlocking
(Spring WebFlux 确实支持控制器中的suspend
函数)
对于第一个问题,请记住每次启动一个应该比您的函数寿命更长的协程时,您都必须使用外部作用域。 coroutineScope
或 runBlocking
在这些情况下不合适,因为它们会等待您的子协程完成。
您可以使用 CoroutineScope()
工厂函数来创建作用域,但您需要考虑协程的生命周期以及何时取消它。如果 longExecution
函数有一个错误并永远挂起,你不想泄露调用它的协程并破坏你的内存,所以你应该以某种方式取消那些协程。这就是为什么你应该将范围作为变量存储在你的 class 中并在适当的时候取消它(当你想放弃这些操作时)。
对于第二个问题,使用 withTimeout
很常见,但它不适合您的用例,因为您希望任务即使在等待超时后仍继续运行。一种可能的解决方案是使用 select 子句等待作业完成,或者等待某个指定的最长时间:
// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())
@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val job = scope.launch {
longExecution()
}
val resultText = runBlocking {
select {
job.onJoin() { "Done" }
onTimeout(MAX_WAIT_TIME) { "Check later" }
}
}
return ResponseEntity.ok(resultText)
}
注意:我使用 launch
而不是 async
,因为您似乎不需要 longExecution
的 return 值。
如果你也想解决问题 #3,你可以简单地声明你的处理程序 suspend
并删除 select
:
runBlocking
// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())
@PostMapping
suspend fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
val job = scope.launch {
longExecution()
}
val resultText = select {
job.onJoin() { "Done" }
onTimeout(MAX_WAIT_TIME) { "Check later" }
}
return ResponseEntity.ok(resultText)
}
请注意,这需要 spring-boot-starter-webflux
而不是 spring-boot-starter-web
。