Spring 集成 WebFlux 错误处理

Spring Integration WebFlux Error Handling

SI 5+ 支持 WebFlux,这意味着我们现在可以构建反应式消息系统。然而,这也意味着设计已经经过深思熟虑,通常的错误处理方法不起作用。在反应流中,消息是 Publisher(Flux),它不会抛出异常,但会发出错误通知。因此,在消息上设置的错误通道 header 是无用的,因为 SI 不知道 Flux 导致了错误。 考虑以下代码:

.handle(WebFlux.outboundGateway(m -> m.getPayload().toString(), webClient)
    .expectedResponseType(YelpRecord.class)
    .httpMethod(GET)
    .mappedRequestHeaders(ACCEPT)
    .replyPayloadToFlux(true))
.handle((GenericHandler<Flux<YelpRecord>>) (flux, headers) ->
    flux
            .doOnError(t -> log.error(t.getMessage(), t))
            .doAfterTerminate(() ->
                    log.info("Completed streaming from: {}.", headers.get(DOWNLOAD_URI_HEADER))
            )
            .onBackpressureBuffer(
                    yelpArtifactoryProperties.getOnBackpressureBufferSize(),
                    BufferOverflowStrategy.ERROR)
)

上面的代码片段中缺少的是将异常发送到在来自 doOnError 的消息上配置的错误通道。我们该怎么做?

((MessageChannel) header.getErrorChannel()).send(...).doOnError() 那里为您工作吗?

关键是你是对的,消息负载中的这个 Flux 已经不受框架控制,如果你想处理它的错误,你必须自己做。那已经是带有 doOnError() 的代码,因此框架无法自动帮助您。