Spring 集成 JMS 使用 DSL 确保消息传递
Spring Integration JMS assured message delivery using DSL
我正在尝试创建一个流 (1),其中从可以是客户端或服务器的 TCP 适配器接收消息并将消息发送到 ActiveMQ 代理。
我的另一个流程(2) 从所需队列中挑选消息并发送到目的地
TCP(client/server) ==(1)==> ActiveMQ 代理 ==(2)==> HTTP 出站适配器
我想确保万一我的邮件没有送达所需的目的地,它会重新尝试再次发送邮件。
我目前流向经纪人的流量 (1) 是:
IntegrationFlow flow = IntegrationFlows
.from(Tcp
.inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
.serializer(customSerializer).deserializer(customSerializer)
.id("server").soTimeout(5000))
.id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
.wireTap("tcpInboundMessageLogChannel").channel(directChannel())
.handle(Jms.outboundAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.get();
this.flowContext.registration(flow).id("outflow").register();
我的流程(2) 从代理到 http 出站:
flow = IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.channel(directChannel())
.handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.mappedRequestHeaders("abc"))
.get();
this.flowContext.registration(flow).id("inflow").register();
问题:
如果在发送过程中出现任何异常,例如我的目的地 URL 无法正常工作,则会重新尝试发送消息。
尝试失败后重试 7 次即 max attempt to 7
如果尝试仍然不成功,则它将消息发送到 ActiveMQ.DLQ
(死信队列)并且不会再次尝试,因为消息从实际队列中出列并发送到ActiveMQ.DLQ
.
所以,我想要没有消息丢失并且消息将按顺序处理的场景。
第一:我相信你可以为无限重试配置jmsInbound
:
/**
* Configuration options for a messageConsumer used to control how messages are re-delivered when they
* are rolled back.
* May be used server side on a per destination basis via the Broker RedeliveryPlugin
*
* @org.apache.xbean.XBean element="redeliveryPolicy"
*
*/
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
另一方面,您可以为 RequestHandlerRetryAdvice
配置 .handle(Http.outboundChannelAdapter(
以实现类似的重试行为,但在应用程序内部无需往返 JMS 和返回:https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice
这里是一些如何从 Java DSL 角度进行配置的示例:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlows.from(Function.class, "errorRecovererFunction")
.handle((GenericHandler<?>) (p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
return requestHandlerRetryAdvice;
}
@Bean
public MessageChannel recoveryChannel() {
return new DirectChannel();
}
RequestHandlerRetryAdvice
可以配置 RetryTemplate
以应用类似 AlwaysRetryPolicy
的内容。有关详细信息,请参阅 Spring 重试项目:https://github.com/spring-projects/spring-retry
我正在尝试创建一个流 (1),其中从可以是客户端或服务器的 TCP 适配器接收消息并将消息发送到 ActiveMQ 代理。
我的另一个流程(2) 从所需队列中挑选消息并发送到目的地
TCP(client/server) ==(1)==> ActiveMQ 代理 ==(2)==> HTTP 出站适配器
我想确保万一我的邮件没有送达所需的目的地,它会重新尝试再次发送邮件。
我目前流向经纪人的流量 (1) 是:
IntegrationFlow flow = IntegrationFlows
.from(Tcp
.inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
.serializer(customSerializer).deserializer(customSerializer)
.id("server").soTimeout(5000))
.id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
.wireTap("tcpInboundMessageLogChannel").channel(directChannel())
.handle(Jms.outboundAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.get();
this.flowContext.registration(flow).id("outflow").register();
我的流程(2) 从代理到 http 出站:
flow = IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.channel(directChannel())
.handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.mappedRequestHeaders("abc"))
.get();
this.flowContext.registration(flow).id("inflow").register();
问题:
如果在发送过程中出现任何异常,例如我的目的地 URL 无法正常工作,则会重新尝试发送消息。
尝试失败后重试 7 次即
max attempt to 7
如果尝试仍然不成功,则它将消息发送到
ActiveMQ.DLQ
(死信队列)并且不会再次尝试,因为消息从实际队列中出列并发送到ActiveMQ.DLQ
.
所以,我想要没有消息丢失并且消息将按顺序处理的场景。
第一:我相信你可以为无限重试配置jmsInbound
:
/**
* Configuration options for a messageConsumer used to control how messages are re-delivered when they
* are rolled back.
* May be used server side on a per destination basis via the Broker RedeliveryPlugin
*
* @org.apache.xbean.XBean element="redeliveryPolicy"
*
*/
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
另一方面,您可以为 RequestHandlerRetryAdvice
配置 .handle(Http.outboundChannelAdapter(
以实现类似的重试行为,但在应用程序内部无需往返 JMS 和返回:https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice
这里是一些如何从 Java DSL 角度进行配置的示例:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlows.from(Function.class, "errorRecovererFunction")
.handle((GenericHandler<?>) (p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
return requestHandlerRetryAdvice;
}
@Bean
public MessageChannel recoveryChannel() {
return new DirectChannel();
}
RequestHandlerRetryAdvice
可以配置 RetryTemplate
以应用类似 AlwaysRetryPolicy
的内容。有关详细信息,请参阅 Spring 重试项目:https://github.com/spring-projects/spring-retry