Spring 集成 WebFlux 错误处理
Spring Integration WebFlux Error Handling
SI 5+ 支持 WebFlux
,这意味着我们现在可以构建反应式消息系统。然而,这也意味着设计已经经过深思熟虑,通常的错误处理方法不起作用。在反应流中,消息是 Publisher
(Flux
),它不会抛出异常,但会发出错误通知。因此,在消息上设置的错误通道 header 是无用的,因为 SI 不知道 Flux
导致了错误。
考虑以下代码:
.handle(WebFlux.outboundGateway(m -> m.getPayload().toString(), webClient)
.expectedResponseType(YelpRecord.class)
.httpMethod(GET)
.mappedRequestHeaders(ACCEPT)
.replyPayloadToFlux(true))
.handle((GenericHandler<Flux<YelpRecord>>) (flux, headers) ->
flux
.doOnError(t -> log.error(t.getMessage(), t))
.doAfterTerminate(() ->
log.info("Completed streaming from: {}.", headers.get(DOWNLOAD_URI_HEADER))
)
.onBackpressureBuffer(
yelpArtifactoryProperties.getOnBackpressureBufferSize(),
BufferOverflowStrategy.ERROR)
)
上面的代码片段中缺少的是将异常发送到在来自 doOnError
的消息上配置的错误通道。我们该怎么做?
((MessageChannel) header.getErrorChannel()).send(...)
在 .doOnError()
那里为您工作吗?
关键是你是对的,消息负载中的这个 Flux
已经不受框架控制,如果你想处理它的错误,你必须自己做。那已经是带有 doOnError()
的代码,因此框架无法自动帮助您。
SI 5+ 支持 WebFlux
,这意味着我们现在可以构建反应式消息系统。然而,这也意味着设计已经经过深思熟虑,通常的错误处理方法不起作用。在反应流中,消息是 Publisher
(Flux
),它不会抛出异常,但会发出错误通知。因此,在消息上设置的错误通道 header 是无用的,因为 SI 不知道 Flux
导致了错误。
考虑以下代码:
.handle(WebFlux.outboundGateway(m -> m.getPayload().toString(), webClient)
.expectedResponseType(YelpRecord.class)
.httpMethod(GET)
.mappedRequestHeaders(ACCEPT)
.replyPayloadToFlux(true))
.handle((GenericHandler<Flux<YelpRecord>>) (flux, headers) ->
flux
.doOnError(t -> log.error(t.getMessage(), t))
.doAfterTerminate(() ->
log.info("Completed streaming from: {}.", headers.get(DOWNLOAD_URI_HEADER))
)
.onBackpressureBuffer(
yelpArtifactoryProperties.getOnBackpressureBufferSize(),
BufferOverflowStrategy.ERROR)
)
上面的代码片段中缺少的是将异常发送到在来自 doOnError
的消息上配置的错误通道。我们该怎么做?
((MessageChannel) header.getErrorChannel()).send(...)
在 .doOnError()
那里为您工作吗?
关键是你是对的,消息负载中的这个 Flux
已经不受框架控制,如果你想处理它的错误,你必须自己做。那已经是带有 doOnError()
的代码,因此框架无法自动帮助您。