如何制作 Mono/Flux 链并在 spring-webflux 中实现重试

How to make chain of Mono/Flux and implement Retry in sping-webflux

我有一个 reactive/async 调用的场景。我正在使用 spring-boot-starter-webflux 并使用 webclient 进行外部 HTTP 调用。 我的场景是我必须调用 callA() 然后检查它的响应 ResponseA。如果其 ResponseA 正常,则退出并 return ResponseA。 否则使用 ResponseA 创建第二个请求 requestB 并调用 callB()。然后检查其响应 ResponseB。 如果没问题,那么 return ResponseA 否则在 callA() 上重试。

public Mono<ResponseA> callA(Request1 requestA) {
    // calling service-A using webclient
}
public Flux<ResponseB> callB(Request2 requestB) { 
    // calling service-B using webclient but to create requestB, I need ResponseA.
}

你只需要在 flatMap 中做一些 if-statements。可能将其拆分为一些更好的函数名称等。没有订阅,没有阻塞。

callA(createNewRequest()).flatMap(response1 -> {

    // Validate response
    if(!isValidResponse(response)) {

        // if failed validation, create new request and do a new call
        var request = buildRequest(response);
        return callB(request).flatMap(response2 -> {

                // validate second response
                if(!isValidResponse(response2)) {

                     // failed validation return the first response.
                     return Mono.just(response1)
                }

                // otherwise recursively call again
                return callA(createNewRequest()); // Warning this can be an infinite loop
            }
    }

    // Validation passed
    return Mono.just(response);
}