使用 RSocket 和 Project Reactor 实现 202 ACCEPTED - Retry-After 行为
Implementing a 202 ACCEPTED - Retry-After behaviour with RSocket and Project Reactor
我正在实现一个典型用例,其中客户端请求将异步生成的资源。因此,会生成一个 resourceID 并立即 returned:
1. CLIENT ---(POST /request-resource)---> SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT
此时SERVER中有一个后台任务,它最终会产生一个结果并将其存储在与resID关联的数据库中。客户端会定期请求资源,重试直到资源可用:
3. CLIENT ---(/resource/resID)---> SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)---> SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT
我认为 RSocket 是一个完美的选择,以避免这种无休止的客户端重试,直到资源可用(第 3 步)。
哪种交互模型更适合这个问题,我该如何实现?
考虑如下存储库:ResourceRepository.Mono<Result> getResult(String resID)
如果我选择 request/response 交互模型,我会遇到与以前相同的情况。除非有一种方法可以让 Mono 重试直到有结果。这可能吗?
使用 request/stream 我可以 return 结果像 Flux<Response>
和 response.status=PROCESSING 直到查询 Postgre returned 一个结果,然后 Flux 将有一个带有 response.status=OK 的元素并且 Flux 将完成。在配置的时间内没有结果的情况下完成 Flux 需要最长时间。在这种情况下,我怎么能或询问这个?
我需要创建一个 Flux,它会定期发出(具有最大超时时间),当存储库 return 是一个空的 Mono 时,有一个没有结果的元素,或者当 te 存储库时的实际值有了,完成通量
此问题的解决方案是使用 RSocket 和 RequestResponse 交互模型,等待资源在数据库中可用。关键是使用 repeatWhenEmpty
运算符:
@MessageMapping("request-resource")
fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
return resourceService.sendResourceRequestProcessing(resourceRequest)
}
override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
val resourceId = randomUUID().toString()
return Mono.fromCallable {
sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
}.then(poolResourceResponse(resourceId))
}
private fun poolResourceResponse(resourceId: String): Mono<Resource> {
return resourceRepository.findByResourceId(resourceId)
.repeatWhenEmpty(30) { longFlux ->
longFlux.delayElements(Duration.ofSeconds(1))
.doOnNext { logger.info("Repeating {}", it) }
}
}
我正在实现一个典型用例,其中客户端请求将异步生成的资源。因此,会生成一个 resourceID 并立即 returned:
1. CLIENT ---(POST /request-resource)---> SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT
此时SERVER中有一个后台任务,它最终会产生一个结果并将其存储在与resID关联的数据库中。客户端会定期请求资源,重试直到资源可用:
3. CLIENT ---(/resource/resID)---> SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)---> SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT
我认为 RSocket 是一个完美的选择,以避免这种无休止的客户端重试,直到资源可用(第 3 步)。
哪种交互模型更适合这个问题,我该如何实现?
考虑如下存储库:ResourceRepository.Mono<Result> getResult(String resID)
如果我选择 request/response 交互模型,我会遇到与以前相同的情况。除非有一种方法可以让 Mono 重试直到有结果。这可能吗?
使用 request/stream 我可以 return 结果像 Flux<Response>
和 response.status=PROCESSING 直到查询 Postgre returned 一个结果,然后 Flux 将有一个带有 response.status=OK 的元素并且 Flux 将完成。在配置的时间内没有结果的情况下完成 Flux 需要最长时间。在这种情况下,我怎么能或询问这个?
我需要创建一个 Flux,它会定期发出(具有最大超时时间),当存储库 return 是一个空的 Mono 时,有一个没有结果的元素,或者当 te 存储库时的实际值有了,完成通量
此问题的解决方案是使用 RSocket 和 RequestResponse 交互模型,等待资源在数据库中可用。关键是使用 repeatWhenEmpty
运算符:
@MessageMapping("request-resource")
fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
return resourceService.sendResourceRequestProcessing(resourceRequest)
}
override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
val resourceId = randomUUID().toString()
return Mono.fromCallable {
sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
}.then(poolResourceResponse(resourceId))
}
private fun poolResourceResponse(resourceId: String): Mono<Resource> {
return resourceRepository.findByResourceId(resourceId)
.repeatWhenEmpty(30) { longFlux ->
longFlux.delayElements(Duration.ofSeconds(1))
.doOnNext { logger.info("Repeating {}", it) }
}
}