通过映射到新的 return 值而不删除任何元素来处理 Reactor 错误

Handle Reactor errors by mapping to new return value without dropping any elements

在将输入值映射到包含成功消息或失败消息以及错误消息的响应对象的情况下,如何正确处理错误而不从发布者删除任何元素?

我有一个遵循构建器模式的域对象,它在构建时执行验证,抛出一个包含对象 ID 的异常。

为了处理这个问题,我尝试了以下 2 次尝试:

public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .map(c -> c.toDomain()) // this is what throws the exception
            // some other processing here
            .map(c -> GenericResponse.accepted(c.getId()))
            .onErrorResume(e -> 
                Flux.just(GenericResponse.error(((BadRequestException)e).getId(), e.getMessage()))
            );
}
public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .map(c -> c.toDomain()) // this is what throws the exception
            // some other processing here
            .concatMap(c ->
                Flux.just(GenericResponse.accepted(c.getId()))
                    .onErrorResume(e -> 
                        Flux.just(GenericResponse.error(((BadRequestException)e).getId(), e.getMessage()))
                    )
            );
}

对于第一个,如果我发送了 5 个输入,而第三个预计会抛出错误,我将收到 2 条成功消息和一条预期的失败消息:

[
    { "id": 1, "code": "ACCEPTED" },
    { "id": 2, "code": "ACCEPTED" },
    { "id": 3, "code": "ERROR", "description": "Some error message" }
]

然而,第二次尝试使用相同的输入时,我得到了一个没有结果的实际堆栈跟踪:

{
    "timestamp": 1627880204616,
    "path": "/",
    "status": 500,
    "error": "Internal Server Error",
    "message": "Some error message",
    "requestId": "2917b3af-1",
    "trace": "com.example.BadRequestException: Some error message..."
}

我该怎么做才能获得所有输入的响应,如下所示:

[
    { "id": 1, "code": "ACCEPTED" },
    { "id": 2, "code": "ACCEPTED" },
    { "id": 3, "code": "ERROR", "description": "Some error message" },
    { "id": 4, "code": "ACCEPTED" },
    { "id": 5, "code": "ACCEPTED" }
]

如果您想管理每个元素的错误元素,最好的选择是使用中间 Mono 来管理错误,如下所示:

public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .flatMap(c -> Mono.fromCallable(() -> c.toDomain())
                              .map(c -> GenericResponse.accepted(c.getId()))
                              .onErrorResume(BadRequestException.class, e 
                                  -> Mono.just(GenericResponse.error(e.getId(), e.getMessage())));
}

另一种选择是使用 onErrorContinue 运算符。它非常接近您的原始代码。然而,正如文档所述,使用它并不总是安全的,因为如果错误破坏了上游管道,它就无法正确地“继续”流程操作。

使用 onErrorContinue 的示例:

public Flux<GenericResponse> handle(Publisher<DomainDto> input) {
    return Flux.from(input)
            .map(c -> c.toDomain()) // this is what throws the exception
            // some other processing here
            .map(c -> GenericResponse.accepted(c.getId()))
            .onErrorContinue(BadRequestException.class, e -> 
                Flux.just(GenericResponse.error(e.getId(), e.getMessage()))
            );
}