使用 spring 在 Ibm Websphere MQ 中实现重试逻辑
Implementing retry logic in Ibm Websphere MQ with spring
我正在使用 Spring 和 Webphere MQ 进行消息传递的以下配置。
我需要为一个场景实现重试逻辑,在该场景中,如果搜索服务器关闭,我会从队列中接收一条消息并将消息数据放入 Elastic 搜索服务器(搜索服务器是非事务性的)我必须再次将消息回滚到队列中,并在一段时间后(例如:30 秒)处理消息。此重试必须进行 5 次。 5 次后必须将消息放入死信队列。我们使用 Tomcat 作为服务器。
我们正在使用 spring 集成 jms:message-driven-channel-adapter 来接收消息
如何使用 spring 和 Websphere MQ 实现此行为?
我已经爬过很多站点,我可以找到对 Active MQ 但不支持 IBM MQ 的支持。
<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>${queue_hostname}</value>
</property>
<property name="port">
<value>${queue_port}</value>
</property>
<property name="queueManager">
<value>${queue_manager}</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<!-- JMS Queue Connection Factory -->
<bean id="jmsQueueConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory102">
<property name="targetConnectionFactory">
<ref bean="mqConnectionFactory" />
</property>
<property name="pubSubDomain">
<value>false</value>
</property>
</bean>
<!-- JMS Destination Resolver -->
<bean id="jmsDestinationResolver"
class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>
<!-- JMS Queue Template -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory">
<ref bean="jmsQueueConnectionFactory" />
</property>
<property name="destinationResolver">
<ref bean="jmsDestinationResolver" />
</property>
<property name="pubSubDomain">
<value>false</value>
</property>
<property name="receiveTimeout">
<value>20000</value>
</property>
</bean>
JMS 规范中没有关于延迟重新传递的内容。一些经纪人有一个自定义 mechanism/policy 来实现它;您必须查看经纪人文档。
如前所述,您可以使用递送计数 header 在重试一定次数后放弃。
编辑
回应您在下方的评论...
仅当您使用支持 JMS 2.0 的 MQ 版本时 - 看起来您使用的是需要 JmsTemplate102
的非常旧的版本。 1.0.2 是古老的; 1.1 已经推出多年; Spring 已经支持 JMS 2.0 将近 3 年了。如果您有 JMS 2.0 代理(和客户端库),请设置 deliveryDelay on a JmsTemplate bean。然后,配置出站通道适配器以通过 jms-template
属性.
使用该模板
为了获得重新投递计数的可见性,要么将整个消息传递到您的服务中,要么使用 POJO 方法对其进行配置以获取 header...
public MyReply process(@Payload MyObject foo,
@Header("JMSXRedeliveryCount") int redeliveryCOunt) {
...
}
同样,这是一个 JMS 2.0 功能(header),尽管一些代理在 1.1 中提供了它。
IBM MQ 没有延迟重新交付,但一种选择是考虑使用 Delayed Delivery 的 JMS2.0 概念。这不会设置 redelivered 标志,因此不会涉及任何退出逻辑,但它会实现定时行为。
例如,当无法处理消息时,应用程序可能会延迟重新排队。这将不会再看到 5 分钟。应用程序可能必须用计数器标记消息。两者都可以在事务会话下完成以实现
我正在使用 Spring 和 Webphere MQ 进行消息传递的以下配置。
我需要为一个场景实现重试逻辑,在该场景中,如果搜索服务器关闭,我会从队列中接收一条消息并将消息数据放入 Elastic 搜索服务器(搜索服务器是非事务性的)我必须再次将消息回滚到队列中,并在一段时间后(例如:30 秒)处理消息。此重试必须进行 5 次。 5 次后必须将消息放入死信队列。我们使用 Tomcat 作为服务器。
我们正在使用 spring 集成 jms:message-driven-channel-adapter 来接收消息
如何使用 spring 和 Websphere MQ 实现此行为?
我已经爬过很多站点,我可以找到对 Active MQ 但不支持 IBM MQ 的支持。
<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>${queue_hostname}</value>
</property>
<property name="port">
<value>${queue_port}</value>
</property>
<property name="queueManager">
<value>${queue_manager}</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<!-- JMS Queue Connection Factory -->
<bean id="jmsQueueConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory102">
<property name="targetConnectionFactory">
<ref bean="mqConnectionFactory" />
</property>
<property name="pubSubDomain">
<value>false</value>
</property>
</bean>
<!-- JMS Destination Resolver -->
<bean id="jmsDestinationResolver"
class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>
<!-- JMS Queue Template -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory">
<ref bean="jmsQueueConnectionFactory" />
</property>
<property name="destinationResolver">
<ref bean="jmsDestinationResolver" />
</property>
<property name="pubSubDomain">
<value>false</value>
</property>
<property name="receiveTimeout">
<value>20000</value>
</property>
</bean>
JMS 规范中没有关于延迟重新传递的内容。一些经纪人有一个自定义 mechanism/policy 来实现它;您必须查看经纪人文档。
如前所述,您可以使用递送计数 header 在重试一定次数后放弃。
编辑
回应您在下方的评论...
仅当您使用支持 JMS 2.0 的 MQ 版本时 - 看起来您使用的是需要 JmsTemplate102
的非常旧的版本。 1.0.2 是古老的; 1.1 已经推出多年; Spring 已经支持 JMS 2.0 将近 3 年了。如果您有 JMS 2.0 代理(和客户端库),请设置 deliveryDelay on a JmsTemplate bean。然后,配置出站通道适配器以通过 jms-template
属性.
为了获得重新投递计数的可见性,要么将整个消息传递到您的服务中,要么使用 POJO 方法对其进行配置以获取 header...
public MyReply process(@Payload MyObject foo,
@Header("JMSXRedeliveryCount") int redeliveryCOunt) {
...
}
同样,这是一个 JMS 2.0 功能(header),尽管一些代理在 1.1 中提供了它。
IBM MQ 没有延迟重新交付,但一种选择是考虑使用 Delayed Delivery 的 JMS2.0 概念。这不会设置 redelivered 标志,因此不会涉及任何退出逻辑,但它会实现定时行为。
例如,当无法处理消息时,应用程序可能会延迟重新排队。这将不会再看到 5 分钟。应用程序可能必须用计数器标记消息。两者都可以在事务会话下完成以实现