使用 Spring JMS 和 qpid 使用 Azure 服务总线消息时标记为延迟的消息
Messages marked as Deferred when consuming Azure Service Bus messages with Spring JMS and qpid
我可以很好地从我的服务总线订阅中接收消息,但是当我的侦听器中发生异常时,似乎最终将带有 state=Modified 和 undeliverableHere=true 的 Disposition 帧发送到服务总线。服务总线的 docs 表示它不支持 amqp 修改后的配置。
消息在服务总线中以延迟状态结束,我不知道如何将消息推回到 activity。
JMS 配置:
@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreProperties properties) throws UnsupportedEncodingException {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrlString());
connectionFactory.setClientID(clientId);
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public JmsDefaultRedeliveryPolicy redeliveryPolicy() {
JmsDefaultRedeliveryPolicy policy = new JmsDefaultRedeliveryPolicy();
policy.setMaxRedeliveries(50);
return policy;
}
@Bean
public JmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSubscriptionDurable(true);
factory.setPubSubDomain(true);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
听众:
@JmsListener(destination = "crm-customer-event/subscriptions/test-sub", containerFactory = "topicContainerFactory")
public void receiveCustomerEvent(@Payload ExecutionContextDTO dto) {
logger.debug("Got payload: " + dto);
throw new RuntimeException("Oops");
}
这是我在日志中看到的内容:
消息传输开始
[299309872:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x97\x1c\x87\x04\xb7\xea\x86@\xb3n\xbd\x9fg\x81\x00\x11, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (16695) "\x00Sp\xc0\x0a\x05@@p.........
抛出异常,然后JMS在本地重新投递消息50次(为什么要这样做?)
之后我看到的下一个amqp协议帧是
[299309872:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Modified{deliveryFailed=true, undeliverableHere=true, messageAnnotations=null}, batchable=false}
在我看来,最后一个处置帧导致消息进入延迟状态。消息上还有一个锁定令牌。即使 TTL 通过,该消息仍停留在订阅中,并且通过 REST api 对其进行任何检查都无济于事。我试过解锁它(用 PUT)和删除它(用 DELETE)。我也试过用 REST api(PeekLock 和接收和删除变体)接收它,但看起来它们不存在。我在订阅上设置了选项,可以在过期后将消息自动移动到死信队列,并且它们永远不会移动。
来自 qpid-jms 的使 ack 发生的代码是 here,看来库的这一部分不打算扩展,否则我会自己实现 returns 不同的确认。
如何让 qpid/JMS 变成
- 使用 FAILED ack 而不是 MODIFIED_FAILED_UNDELIVERABLE ack
- 清除那些延迟消息。
首先,JMS 规范正确地指出,从 MessageListener 回调抛出的异常实际上是一个应用程序编程错误,您的应用程序应该自行处理这些错误。
其次,客户端使用正确的配置来指示消息未能传递到此客户端,远程应支持 AMQP 1.0 规范中概述的所有配置,我会联系 Microsoft 并请求他们实施规范更紧密。
要解决上述两个问题,您可以在客户端确认模式下接收消息,并且当您拦截抛出的异常时,您可以使用 this JIRA Issue 中描述的机制配置消息确认。
代码如下所示:
message.setIntProperty("JMS_AMQP_ACK_TYPE", 2);
message.acknowledge();
模式定义为:
ACCEPTED = 1;
REJECTED = 2;
RELEASED = 3;
MODIFIED_FAILED = 4;
MODIFIED_FAILED_UNDELIVERABLE = 5;
我可以很好地从我的服务总线订阅中接收消息,但是当我的侦听器中发生异常时,似乎最终将带有 state=Modified 和 undeliverableHere=true 的 Disposition 帧发送到服务总线。服务总线的 docs 表示它不支持 amqp 修改后的配置。
消息在服务总线中以延迟状态结束,我不知道如何将消息推回到 activity。
JMS 配置:
@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreProperties properties) throws UnsupportedEncodingException {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrlString());
connectionFactory.setClientID(clientId);
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public JmsDefaultRedeliveryPolicy redeliveryPolicy() {
JmsDefaultRedeliveryPolicy policy = new JmsDefaultRedeliveryPolicy();
policy.setMaxRedeliveries(50);
return policy;
}
@Bean
public JmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSubscriptionDurable(true);
factory.setPubSubDomain(true);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
听众:
@JmsListener(destination = "crm-customer-event/subscriptions/test-sub", containerFactory = "topicContainerFactory")
public void receiveCustomerEvent(@Payload ExecutionContextDTO dto) {
logger.debug("Got payload: " + dto);
throw new RuntimeException("Oops");
}
这是我在日志中看到的内容:
消息传输开始
[299309872:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x97\x1c\x87\x04\xb7\xea\x86@\xb3n\xbd\x9fg\x81\x00\x11, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (16695) "\x00Sp\xc0\x0a\x05@@p.........
抛出异常,然后JMS在本地重新投递消息50次(为什么要这样做?)
之后我看到的下一个amqp协议帧是
[299309872:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Modified{deliveryFailed=true, undeliverableHere=true, messageAnnotations=null}, batchable=false}
在我看来,最后一个处置帧导致消息进入延迟状态。消息上还有一个锁定令牌。即使 TTL 通过,该消息仍停留在订阅中,并且通过 REST api 对其进行任何检查都无济于事。我试过解锁它(用 PUT)和删除它(用 DELETE)。我也试过用 REST api(PeekLock 和接收和删除变体)接收它,但看起来它们不存在。我在订阅上设置了选项,可以在过期后将消息自动移动到死信队列,并且它们永远不会移动。
来自 qpid-jms 的使 ack 发生的代码是 here,看来库的这一部分不打算扩展,否则我会自己实现 returns 不同的确认。
如何让 qpid/JMS 变成
- 使用 FAILED ack 而不是 MODIFIED_FAILED_UNDELIVERABLE ack
- 清除那些延迟消息。
首先,JMS 规范正确地指出,从 MessageListener 回调抛出的异常实际上是一个应用程序编程错误,您的应用程序应该自行处理这些错误。
其次,客户端使用正确的配置来指示消息未能传递到此客户端,远程应支持 AMQP 1.0 规范中概述的所有配置,我会联系 Microsoft 并请求他们实施规范更紧密。
要解决上述两个问题,您可以在客户端确认模式下接收消息,并且当您拦截抛出的异常时,您可以使用 this JIRA Issue 中描述的机制配置消息确认。
代码如下所示:
message.setIntProperty("JMS_AMQP_ACK_TYPE", 2);
message.acknowledge();
模式定义为:
ACCEPTED = 1;
REJECTED = 2;
RELEASED = 3;
MODIFIED_FAILED = 4;
MODIFIED_FAILED_UNDELIVERABLE = 5;