通过映射到新的 return 值而不删除任何元素来处理 Reactor 错误

Handle Reactor errors by mapping to new return value without dropping any elements


我有一个遵循构建器模式的域对象,它在构建时执行验证,抛出一个包含对象 ID 的异常。

为了处理这个问题,我尝试了以下 2 次尝试:

public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .map(c -> c.toDomain()) // this is what throws the exception
            // some other processing here
            .map(c -> GenericResponse.accepted(c.getId()))
            .onErrorResume(e -> 
                Flux.just(GenericResponse.error(((BadRequestException)e).getId(), e.getMessage()))
public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .map(c -> c.toDomain()) // this is what throws the exception
            // some other processing here
            .concatMap(c ->
                    .onErrorResume(e -> 
                        Flux.just(GenericResponse.error(((BadRequestException)e).getId(), e.getMessage()))

对于第一个,如果我发送了 5 个输入,而第三个预计会抛出错误,我将收到 2 条成功消息和一条预期的失败消息:

    { "id": 1, "code": "ACCEPTED" },
    { "id": 2, "code": "ACCEPTED" },
    { "id": 3, "code": "ERROR", "description": "Some error message" }


    "timestamp": 1627880204616,
    "path": "/",
    "status": 500,
    "error": "Internal Server Error",
    "message": "Some error message",
    "requestId": "2917b3af-1",
    "trace": "com.example.BadRequestException: Some error message..."


    { "id": 1, "code": "ACCEPTED" },
    { "id": 2, "code": "ACCEPTED" },
    { "id": 3, "code": "ERROR", "description": "Some error message" },
    { "id": 4, "code": "ACCEPTED" },
    { "id": 5, "code": "ACCEPTED" }

如果您想管理每个元素的错误元素,最好的选择是使用中间 Mono 来管理错误,如下所示:

public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .flatMap(c -> Mono.fromCallable(() -> c.toDomain())
                              .map(c -> GenericResponse.accepted(c.getId()))
                              .onErrorResume(BadRequestException.class, e 
                                  -> Mono.just(GenericResponse.error(e.getId(), e.getMessage())));

另一种选择是使用 onErrorContinue 运算符。它非常接近您的原始代码。然而,正如文档所述,使用它并不总是安全的,因为如果错误破坏了上游管道,它就无法正确地“继续”流程操作。

使用 onErrorContinue 的示例:

public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .map(c -> c.toDomain()) // this is what throws the exception
            // some other processing here
            .map(c -> GenericResponse.accepted(c.getId()))
            .onErrorContinue(BadRequestException.class, e -> 
                Flux.just(GenericResponse.error(e.getId(), e.getMessage()))