Spring 集成捕获 JMSTemplate 生存时间超时

Spring Integration Capturing JMSTemplate Time To Live Time Outs

我有发送到 IBM MQ 队列的 JMS 消息,如果远程客户端(我无法控制远程客户端)在给定时间(比如 1 分钟)内没有使用该消息,该消息应该过期(我有过期部分工作,"MQ deletes the message" 在 JMSTemplate 上使用 setExplicitQosEnabled 和 setTimeToLive)并且应该通知消息发送者(我的 SI 代码)消息没有传递到远程客户端,以便过期的消息可以重新路由到另一个队列。

我不确定如何使用 Spring 集成来实现此模式,尤其是因为消息是异步发送的并且仅在 return 上相关(MessageID -> CorrelationID)。

我想我可以有某种形式的:

1) ErrorMessageExceptionTypeRouter,但我需要有效载荷,以便我可以重新发送消息,但我不确定如何实现它(How to call back from MQ to JMSTemplate or Route to another Queue on Time out, and have a监听该队列并重新路由的辅助路由)

2) WireTap,但我认为这意味着在发件人(Request/Response 模型)上使用计时器阻塞线程,该计时器监视远程客户端删除的消息。我再次不确定如何实现它。

非常感谢任何有关如何最好地实施上述内容的帮助。

进度报告: 我尝试使用 Artem Bilan 建议的方法 JMSTemplate.receiveSelected(destination, messageSelector); 来查询 MQ。计划是手动查找客户端在给定时间内未使用的消息。 (提供者必须跟踪所有消息并尝试在计时器过期后使用其 messageID 检索每条消息,而不是使用 TimeToLive 消息到期)此解决方案让提供者有责任跟踪每条消息并为其计时并尝试检索每条消息(大多数不应该可用)使其效率低下但可行的解决方案。不幸的是,IBM MQ 在我调用时不喜欢它:JMSTemplate.receiveSelected(destination, messageSelector); 我收到以下错误:

org.springframework.jms.InvalidSelectorException: JMSWMQ2008: Failed to open MQ queue ‘MY.TEST.IN'.; nested exception is com.ibm.msg.client.jms.DetailedInvalidSelectorException: JMSWMQ2008: Failed to open MQ queue 'MY.TEST.IN'. JMS attempted to perform an MQOPEN, but WebSphere MQ reported an error. Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2459' ('MQRC_SELECTOR_SYNTAX_ERROR').

然而,使用具有相同目的地的 JMSTemplate.receive(destination); 确实会从队列中读取消息。

您所解释的内容完全适用于 DLQ 场景:https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_8.0.0/com.ibm.mq.adm.doc/q020730_.htm

您需要为您的原始队列配置 DLQ,过期消息将移至 DLQ。

您确实可以在您的应用程序中为该 DLQ 配置一个侦听器,并通过 Spring 集成执行适当的逻辑:http://docs.spring.io/spring-integration/reference/html/jms.html#jms-message-driven-channel-adapter

以下代码导致 IBM MQ V8 自动将过期消息重新路由到 JMSReplyTo 队列,(感谢 JoshMc 在上面的评论中提出的建议)

jmsTemplate.setTimeToLive(3000l);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
jmsTemplate.send(destinationName, session -> {
    TextMessage toSend = session.createTextMessage(message)
    toSend.setIntProperty("JMS_IBM_Report_Expiration", 14680064);
    Queue queue = session.createQueue(“TEST.EXPIRE.REPORT”);
    toSend.setJMSReplyTo(queue);

    return toSend;
});

我在这里找到的14680064的JMS_IBM_Report_Expiration值:https://www.ibm.com/support/knowledgecenter/en/SS7K4U_8.0.0/com.ibm.websphere.javadoc.doc/web/apidocs/constant-values.html#com.ibm.websphere.sib.api.jms.ApiJmsConstants.MQRO_EXPIRATION_WITH_FULL_DATA

我需要找到这个 JAR 并将其添加到我的项目中,但是 MQ 删除了过期的消息并将其存放在我指定的回复队列中。