使用 RSocket 和 Project Reactor 实现 202 ACCEPTED - Retry-After 行为

Implementing a 202 ACCEPTED - Retry-After behaviour with RSocket and Project Reactor

我正在实现一个典型用例,其中客户端请求将异步生成的资源。因此,会生成一个 resourceID 并立即 returned:

1. CLIENT ---(POST /request-resource)--->  SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT

此时SERVER中有一个后台任务,它最终会产生一个结果并将其存储在与resID关联的数据库中。客户端会定期请求资源,重试直到资源可用:

3. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT

我认为 RSocket 是一个完美的选择,以避免这种无休止的客户端重试,直到资源可用(第 3 步)。

哪种交互模型更适合这个问题,我该如何实现?

考虑如下存储库:ResourceRepository.Mono<Result> getResult(String resID)

如果我选择 request/response 交互模型,我会遇到与以前相同的情况。除非有一种方法可以让 Mono 重试直到有结果。这可能吗?

使用 request/stream 我可以 return 结果像 Flux<Response> 和 response.status=PROCESSING 直到查询 Postgre returned 一个结果,然后 Flux 将有一个带有 response.status=OK 的元素并且 Flux 将完成。在配置的时间内没有结果的情况下完成 Flux 需要最长时间。在这种情况下,我怎么能或询问这个?

我需要创建一个 Flux,它会定期发出(具有最大超时时间),当存储库 return 是一个空的 Mono 时,有一个没有结果的元素,或者当 te 存储库时的实际值有了,完成通量

此问题的解决方案是使用 RSocket 和 RequestResponse 交互模型,等待资源在数据库中可用。关键是使用 repeatWhenEmpty 运算符:

    @MessageMapping("request-resource")
    fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
        return resourceService.sendResourceRequestProcessing(resourceRequest)
    }

    override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
        val resourceId = randomUUID().toString()
        return Mono.fromCallable {
            sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
        }.then(poolResourceResponse(resourceId))
    }
    private fun poolResourceResponse(resourceId: String): Mono<Resource> {
        return resourceRepository.findByResourceId(resourceId)
                .repeatWhenEmpty(30) { longFlux ->
                    longFlux.delayElements(Duration.ofSeconds(1))
                            .doOnNext { logger.info("Repeating {}", it) }
                }
    }