使用 spring 在 Ibm Websphere MQ 中实现重试逻辑

Implementing retry logic in Ibm Websphere MQ with spring

我正在使用 Spring 和 Webphere MQ 进行消息传递的以下配置。

我需要为一个场景实现重试逻辑,在该场景中,如果搜索服务器关闭,我会从队列中接收一条消息并将消息数据放入 Elastic 搜索服务器(搜索服务器是非事务性的)我必须再次将消息回滚到队列中,并在一段时间后(例如:30 秒)处理消息。此重试必须进行 5 次。 5 次后必须将消息放入死信队列。我们使用 Tomcat 作为服务器。

我们正在使用 spring 集成 jms:message-driven-channel-adapter 来接收消息

如何使用 spring 和 Websphere MQ 实现此行为?

我已经爬过很多站点,我可以找到对 Active MQ 但不支持 IBM MQ 的支持。

<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
        <property name="hostName">
            <value>${queue_hostname}</value>
        </property>
        <property name="port">
            <value>${queue_port}</value>
        </property>
        <property name="queueManager">
            <value>${queue_manager}</value>
        </property>
        <property name="transportType">
            <value>1</value>
        </property>
    </bean>

    <!-- JMS Queue Connection Factory -->
    <bean id="jmsQueueConnectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory102">
        <property name="targetConnectionFactory">
            <ref bean="mqConnectionFactory" />
        </property>
        <property name="pubSubDomain">
            <value>false</value>
        </property>
    </bean>

    <!-- JMS Destination Resolver -->
    <bean id="jmsDestinationResolver"
        class="org.springframework.jms.support.destination.DynamicDestinationResolver">
    </bean>

    <!-- JMS Queue Template -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate102">
        <property name="connectionFactory">
            <ref bean="jmsQueueConnectionFactory" />
        </property>
        <property name="destinationResolver">
            <ref bean="jmsDestinationResolver" />
        </property>
        <property name="pubSubDomain">
            <value>false</value>
        </property>
        <property name="receiveTimeout">
            <value>20000</value>
        </property>
    </bean>

JMS 规范中没有关于延迟重新传递的内容。一些经纪人有一个自定义 mechanism/policy 来实现它;您必须查看经纪人文档。

如前所述,您可以使用递送计数 header 在重试一定次数后放弃。

编辑

回应您在下方的评论...

仅当您使用支持 JMS 2.0 的 MQ 版本时 - 看起来您使用的是需要 JmsTemplate102 的非常旧的版本。 1.0.2 是古老的; 1.1 已经推出多年; Spring 已经支持 JMS 2.0 将近 3 年了。如果您有 JMS 2.0 代理(和客户端库),请设置 deliveryDelay on a JmsTemplate bean。然后,配置出站通道适配器以通过 jms-template 属性.

使用该模板

为了获得重新投递计数的可见性,要么将整个消息传递到您的服务中,要么使用 POJO 方法对其进行配置以获取 header...

public MyReply process(@Payload MyObject foo,
              @Header("JMSXRedeliveryCount") int redeliveryCOunt) {
    ...
}

同样,这是一个 JMS 2.0 功能(header),尽管一些代理在 1.1 中提供了它。

IBM MQ 没有延迟重新交付,但一种选择是考虑使用 Delayed Delivery 的 JMS2.0 概念。这不会设置 redelivered 标志,因此不会涉及任何退出逻辑,但它会实现定时行为。

例如,当无法处理消息时,应用程序可能会延迟重新排队。这将不会再看到 5 分钟。应用程序可能必须用计数器标记消息。两者都可以在事务会话下完成以实现