Spring 集成 Java 使用 JMS 的 DSL retry/redlivery
Spring Integration Java DSL using JMS retry/redlivery
当消息处理抛出异常时,如何有效支持JMS重投?
我有一个流使用 JMS (ActiveMQ) 和一个 connectionFactory 配置为允许 n 次重新传递尝试。
我希望在处理 msg 时发生的任何错误都会导致 connectionFactory 配置允许将 msg 放回重新传递的次数,然后当最大重新传递尝试次数用尽时,传递给 DLQ。与 AMQ 一样。
对相关 SO 问题的回答暗示我可以有一个 errorChannel 重新抛出应该触发重新传递:Spring Integration DSL ErrorHandling
但是,没有发生以下情况:
/***
* Dispatch msgs from JMS queue to a handler using a rate-limit
* @param connectionFactory
* @return
*/
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.inboundAdapter(connectionFactory)
.configureJmsTemplate(t -> t.receiveTimeout(1000))
.destination(INPUT_DIRECT_QUEUE),
e -> e.poller(Pollers
.fixedDelay(5000)
.errorChannel("customErrorChannel")
//.errorHandler(this.msgHandler)
.maxMessagesPerPoll(2))
).handle(this.msgHandler).get();
return flow;
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct("customErrorChannel").get();
}
@Bean
public IntegrationFlow customErrorFlow() {
return IntegrationFlows.from(customErrorChannel())
.handle ("simpleMessageHandler","handleError")
.get();
}
errorChannel 方法实现:
public void handleError(Throwable t) throws Throwable {
log.warn("got error from customErrorChannel");
throw t;
}
当 flow2 中的处理程序抛出异常时,errorChannel 确实得到异常,但随后重新抛出导致 MessageHandlingException:
2018-08-13 09:00:34.221 WARN 98425 --- [ask-scheduler-5] c.v.m.i.jms.SimpleMessageHandler : got error from customErrorChannel
2018-08-13 09:00:34.224 WARN 98425 --- [ask-scheduler-5] o.s.i.c.MessagePublishingErrorHandler : Error message was not delivered.
org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [simpleMessageHandler]; nested exception is java.lang.IllegalArgumentException: dont want first try, failedMessage=GenericMessage [payload=Enter some text here for the message body..., headers={jms_redelivered=false, jms_destination=queue://_dev.directQueue, jms_correlationId=, jms_type=, id=c2dbffc8-8ab0-486f-f2e5-e8d613d62b6a, priority=0, jms_timestamp=1534176031021, jms_messageId=ID:che2-39670-1533047293479-4:9:1:1:8, timestamp=1534176034205}]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.handler.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:61) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
它可以与消息驱动的通道适配器一起使用,但我认为这不是您想要的,因为 。
由于轮询适配器使用 JmsTemplate.receive()
操作,因此在调用流程时消息已经确认。
您需要使用带有 JmsTransactionManager
的事务轮询器,以便错误流抛出的异常回滚事务并重新传递消息。
当消息处理抛出异常时,如何有效支持JMS重投?
我有一个流使用 JMS (ActiveMQ) 和一个 connectionFactory 配置为允许 n 次重新传递尝试。
我希望在处理 msg 时发生的任何错误都会导致 connectionFactory 配置允许将 msg 放回重新传递的次数,然后当最大重新传递尝试次数用尽时,传递给 DLQ。与 AMQ 一样。
对相关 SO 问题的回答暗示我可以有一个 errorChannel 重新抛出应该触发重新传递:Spring Integration DSL ErrorHandling
但是,没有发生以下情况:
/***
* Dispatch msgs from JMS queue to a handler using a rate-limit
* @param connectionFactory
* @return
*/
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.inboundAdapter(connectionFactory)
.configureJmsTemplate(t -> t.receiveTimeout(1000))
.destination(INPUT_DIRECT_QUEUE),
e -> e.poller(Pollers
.fixedDelay(5000)
.errorChannel("customErrorChannel")
//.errorHandler(this.msgHandler)
.maxMessagesPerPoll(2))
).handle(this.msgHandler).get();
return flow;
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct("customErrorChannel").get();
}
@Bean
public IntegrationFlow customErrorFlow() {
return IntegrationFlows.from(customErrorChannel())
.handle ("simpleMessageHandler","handleError")
.get();
}
errorChannel 方法实现:
public void handleError(Throwable t) throws Throwable {
log.warn("got error from customErrorChannel");
throw t;
}
当 flow2 中的处理程序抛出异常时,errorChannel 确实得到异常,但随后重新抛出导致 MessageHandlingException:
2018-08-13 09:00:34.221 WARN 98425 --- [ask-scheduler-5] c.v.m.i.jms.SimpleMessageHandler : got error from customErrorChannel
2018-08-13 09:00:34.224 WARN 98425 --- [ask-scheduler-5] o.s.i.c.MessagePublishingErrorHandler : Error message was not delivered.
org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [simpleMessageHandler]; nested exception is java.lang.IllegalArgumentException: dont want first try, failedMessage=GenericMessage [payload=Enter some text here for the message body..., headers={jms_redelivered=false, jms_destination=queue://_dev.directQueue, jms_correlationId=, jms_type=, id=c2dbffc8-8ab0-486f-f2e5-e8d613d62b6a, priority=0, jms_timestamp=1534176031021, jms_messageId=ID:che2-39670-1533047293479-4:9:1:1:8, timestamp=1534176034205}]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.handler.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:61) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
它可以与消息驱动的通道适配器一起使用,但我认为这不是您想要的,因为
由于轮询适配器使用 JmsTemplate.receive()
操作,因此在调用流程时消息已经确认。
您需要使用带有 JmsTransactionManager
的事务轮询器,以便错误流抛出的异常回滚事务并重新传递消息。