Spring-Integration Webflux异常处理
Spring-Integration Webflux exception handling
如果在 spring-integration webflux 流中发生异常,异常本身(带有堆栈跟踪)将作为负载通过 MessagePublishingErrorHandler 发送回调用方,它使用来自 "errorChannel" header,不是默认的错误通道。
如何设置类似于 WebExceptionHandler 的错误处理程序?我想生成一个 Http 状态代码,可能还有一个 DefaultErrorAttributes
object 作为响应。
简单地定义一个从 errorChannel
开始的流程是行不通的,错误消息不会在那里结束。我试图定义我自己的 fluxErrorChannel
,但它似乎也没有用作错误通道,错误不会在我的 errorFlow
:
中结束
@Bean
public IntegrationFlow fooRestFlow() {
return IntegrationFlows.from(
WebFlux.inboundGateway("/foo")
.requestMapping(r -> r.methods(HttpMethod.POST))
.requestPayloadType(Map.class)
.errorChannel(fluxErrorChannel()))
.channel(bazFlow().getInputChannel())
.get();
}
@Bean
public MessageChannel fluxErrorChannel() {
return MessageChannels.flux().get();
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from(fluxErrorChannel())
.transform(source -> source)
.enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
.get();
}
@Bean
public IntegrationFlow bazFlow() {
return f -> f.split(Map.class, map -> map.get("items"))
.channel(MessageChannels.flux())
.<String>handle((p, h) -> throw new RuntimeException())
.aggregate();
}
更新
在 MessagingGatewaySupport.doSendAndReceiveMessageReactive
中,我在 WebFlux.inboundGateway 上定义的错误通道从未用于设置错误通道,错误通道始终是正在创建的 replyChannel
here:
FutureReplyChannel replyChannel = new FutureReplyChannel();
Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
.setReplyChannel(replyChannel)
.setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
.setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
.setErrorChannel(replyChannel)
.build();
错误通道最终被重置为 Mono.fromFuture
中的 originalErrorChannelHandler
,但在我的例子中,错误通道是“null”。此外,永远不会调用 onErrorResume
lambda:
return Mono.fromFuture(replyChannel.messageFuture)
.doOnSubscribe(s -> {
if (!error && this.countsEnabled) {
this.messageCount.incrementAndGet();
}
})
.<Message<?>>map(replyMessage ->
MessageBuilder.fromMessage(replyMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build())
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
这是如何工作的?
这是一个错误;错误处理程序为异常创建的 ErrorMessage
被发送到 errorChannel
header(必须是 replyChannel
,以便网关获得结果)。然后网关应调用错误流(如果存在)和 return 结果。
如果在 spring-integration webflux 流中发生异常,异常本身(带有堆栈跟踪)将作为负载通过 MessagePublishingErrorHandler 发送回调用方,它使用来自 "errorChannel" header,不是默认的错误通道。
如何设置类似于 WebExceptionHandler 的错误处理程序?我想生成一个 Http 状态代码,可能还有一个 DefaultErrorAttributes
object 作为响应。
简单地定义一个从 errorChannel
开始的流程是行不通的,错误消息不会在那里结束。我试图定义我自己的 fluxErrorChannel
,但它似乎也没有用作错误通道,错误不会在我的 errorFlow
:
@Bean
public IntegrationFlow fooRestFlow() {
return IntegrationFlows.from(
WebFlux.inboundGateway("/foo")
.requestMapping(r -> r.methods(HttpMethod.POST))
.requestPayloadType(Map.class)
.errorChannel(fluxErrorChannel()))
.channel(bazFlow().getInputChannel())
.get();
}
@Bean
public MessageChannel fluxErrorChannel() {
return MessageChannels.flux().get();
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from(fluxErrorChannel())
.transform(source -> source)
.enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
.get();
}
@Bean
public IntegrationFlow bazFlow() {
return f -> f.split(Map.class, map -> map.get("items"))
.channel(MessageChannels.flux())
.<String>handle((p, h) -> throw new RuntimeException())
.aggregate();
}
更新
在 MessagingGatewaySupport.doSendAndReceiveMessageReactive
中,我在 WebFlux.inboundGateway 上定义的错误通道从未用于设置错误通道,错误通道始终是正在创建的 replyChannel
here:
FutureReplyChannel replyChannel = new FutureReplyChannel();
Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
.setReplyChannel(replyChannel)
.setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
.setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
.setErrorChannel(replyChannel)
.build();
错误通道最终被重置为 Mono.fromFuture
中的 originalErrorChannelHandler
,但在我的例子中,错误通道是“null”。此外,永远不会调用 onErrorResume
lambda:
return Mono.fromFuture(replyChannel.messageFuture)
.doOnSubscribe(s -> {
if (!error && this.countsEnabled) {
this.messageCount.incrementAndGet();
}
})
.<Message<?>>map(replyMessage ->
MessageBuilder.fromMessage(replyMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build())
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
这是如何工作的?
这是一个错误;错误处理程序为异常创建的 ErrorMessage
被发送到 errorChannel
header(必须是 replyChannel
,以便网关获得结果)。然后网关应调用错误流(如果存在)和 return 结果。