PublishSubscribeChannel 中的错误处理
Error handling in PublishSubscribeChannel
我正在使用 Spring 使用 DSL 的集成来处理 JMS 和 REST 服务之间的通信。要求是消息应该无限期地重新传递。在一种情况下,我必须顺序执行两个操作。如果第一个失败,则不应执行第二个,但万一它是 4xx 错误,我不应该尝试重新传递它。我的代码如下所示:
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
.publishSubscribeChannel(c -> c
.subscribe(firstRestOperation ->
firstRestOperation
.transform(originalMessageToFirstRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
.httpMethod(HttpMethod.POST).get()) //when this handler receives HTTP Status 4xx,
//second operation shouldn't be executed and
//and message shouldn't be redelievered
.subscribe(secondRestOperation->
secondRestOperation
.transform(originalMessageToSecondRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();
class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B
@Override
public void handleError(ClientHttpResponse response) throws IOException {
if(response.getStatusCode().is4xxClientError()){
log.warn(...);
}else{
super.handleError(response);
}
}
}
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
return restTemplate;
}
我怎样才能满足这些要求?我唯一的想法是在提交 JMS 会话时以某种方式中断 IntegrationFlow。
感谢您的任何建议。
更新
选项 A:目前:
- 操作 1 失败并出现任何错误
- 操作2没有执行
- 消息不确定地重新传送
选项B:我也可以处理4xx错误,那么:
- 操作 1 失败并显示 4xx,已处理异常
- 操作2被执行
- 集成流程正常完成,JMS 会话已提交且消息未重新传送
但这会导致操作 2 被执行
我需要的是:
操作 1 失败,返回 4xx
操作 2 未 执行
邮件未重新传送
更新 2
我想我可能有所进展。正如@gary-russel 建议的那样,我添加了错误通道,并处理了 4xx 错误:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
}
@Bean
public PublishSubscribeChannel errorHandlingChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn(...) //handle error here
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
//how should I implement it?
}
我只想处理 4xx 错误,应传播其余错误并导致 JMS 消息重新传递。我尝试不在 ErrorMessageExceptionTypeRouter
中设置 defaultOutputChannel
(比抛出另一个异常)或将 defaultOutputChannel
设置为默认值 errorChannel
(比处理所有错误)。
更新 3
在以下位置找到解决方案:
这是我的错误处理流程的代码:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
}
@Bean
public PublishSubscribeChannel customErrorChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
}
public void handleError(Throwable t) throws Throwable {
log.warn("Received unhandled exception");
throw t;
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
所以解决方案是将异常重定向到一个流程,该流程将通过重新抛出它们来处理它们。太糟糕了 BaseIntegrationFlow
没有接受和抛出的方法 Throwable
- 现在只能通过指定要调用的 bean 和方法名称来实现。
这是默认行为;除非 ignoreFailures
属性 是 true
(默认情况下是 false
),否则不会调用第二个订阅者。
您需要显示上游流,但要“捕获”异常,您需要向(大概)消息驱动的入站适配器添加错误通道并在那里处理异常。
我正在使用 Spring 使用 DSL 的集成来处理 JMS 和 REST 服务之间的通信。要求是消息应该无限期地重新传递。在一种情况下,我必须顺序执行两个操作。如果第一个失败,则不应执行第二个,但万一它是 4xx 错误,我不应该尝试重新传递它。我的代码如下所示:
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
.publishSubscribeChannel(c -> c
.subscribe(firstRestOperation ->
firstRestOperation
.transform(originalMessageToFirstRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
.httpMethod(HttpMethod.POST).get()) //when this handler receives HTTP Status 4xx,
//second operation shouldn't be executed and
//and message shouldn't be redelievered
.subscribe(secondRestOperation->
secondRestOperation
.transform(originalMessageToSecondRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();
class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B
@Override
public void handleError(ClientHttpResponse response) throws IOException {
if(response.getStatusCode().is4xxClientError()){
log.warn(...);
}else{
super.handleError(response);
}
}
}
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
return restTemplate;
}
我怎样才能满足这些要求?我唯一的想法是在提交 JMS 会话时以某种方式中断 IntegrationFlow。
感谢您的任何建议。
更新
选项 A:目前:
- 操作 1 失败并出现任何错误
- 操作2没有执行
- 消息不确定地重新传送
选项B:我也可以处理4xx错误,那么:
- 操作 1 失败并显示 4xx,已处理异常
- 操作2被执行
- 集成流程正常完成,JMS 会话已提交且消息未重新传送
但这会导致操作 2 被执行
我需要的是:
操作 1 失败,返回 4xx
操作 2 未 执行
邮件未重新传送
更新 2
我想我可能有所进展。正如@gary-russel 建议的那样,我添加了错误通道,并处理了 4xx 错误:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
}
@Bean
public PublishSubscribeChannel errorHandlingChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn(...) //handle error here
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
//how should I implement it?
}
我只想处理 4xx 错误,应传播其余错误并导致 JMS 消息重新传递。我尝试不在 ErrorMessageExceptionTypeRouter
中设置 defaultOutputChannel
(比抛出另一个异常)或将 defaultOutputChannel
设置为默认值 errorChannel
(比处理所有错误)。
更新 3
在以下位置找到解决方案:
这是我的错误处理流程的代码:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
}
@Bean
public PublishSubscribeChannel customErrorChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
}
public void handleError(Throwable t) throws Throwable {
log.warn("Received unhandled exception");
throw t;
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
所以解决方案是将异常重定向到一个流程,该流程将通过重新抛出它们来处理它们。太糟糕了 BaseIntegrationFlow
没有接受和抛出的方法 Throwable
- 现在只能通过指定要调用的 bean 和方法名称来实现。
这是默认行为;除非 ignoreFailures
属性 是 true
(默认情况下是 false
),否则不会调用第二个订阅者。
您需要显示上游流,但要“捕获”异常,您需要向(大概)消息驱动的入站适配器添加错误通道并在那里处理异常。