使用 Spring JMS 和 qpid 使用 Azure 服务总线消息时标记为延迟的消息

Messages marked as Deferred when consuming Azure Service Bus messages with Spring JMS and qpid

我可以很好地从我的服务总线订阅中接收消息,但是当我的侦听器中发生异常时,似乎最终将带有 state=Modified 和 undeliverableHere=true 的 Disposition 帧发送到服务总线。服务总线的 docs 表示它不支持 amqp 修改后的配置。

消息在服务总线中以延迟状态结束,我不知道如何将消息推回到 activity。

JMS 配置:

@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreProperties properties) throws UnsupportedEncodingException {
    JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrlString());
    connectionFactory.setClientID(clientId);
    connectionFactory.setUsername(properties.getUsername());
    connectionFactory.setPassword(properties.getPassword());
    connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
    return new CachingConnectionFactory(connectionFactory);
}

@Bean
public JmsDefaultRedeliveryPolicy redeliveryPolicy() {
    JmsDefaultRedeliveryPolicy policy = new JmsDefaultRedeliveryPolicy();
    policy.setMaxRedeliveries(50);
    return policy;
}

@Bean
public JmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setSubscriptionDurable(true);
    factory.setPubSubDomain(true);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

听众:

@JmsListener(destination = "crm-customer-event/subscriptions/test-sub", containerFactory = "topicContainerFactory")
public void receiveCustomerEvent(@Payload ExecutionContextDTO dto) {
    logger.debug("Got payload: " + dto);
    throw new RuntimeException("Oops");
}

这是我在日志中看到的内容:

消息传输开始

[299309872:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x97\x1c\x87\x04\xb7\xea\x86@\xb3n\xbd\x9fg\x81\x00\x11, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (16695) "\x00Sp\xc0\x0a\x05@@p.........

抛出异常,然后JMS在本地重新投递消息50次(为什么要这样做?)

之后我看到的下一个amqp协议帧是

[299309872:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Modified{deliveryFailed=true, undeliverableHere=true, messageAnnotations=null}, batchable=false}

在我看来,最后一个处置帧导致消息进入延迟状态。消息上还有一个锁定令牌。即使 TTL 通过,该消息仍停留在订阅中,并且通过 REST api 对其进行任何检查都无济于事。我试过解锁它(用 PUT)和删除它(用 DELETE)。我也试过用 REST api(PeekLock 和接收和删除变体)接收它,但看起来它们不存在。我在订阅上设置了选项,可以在过期后将消息自动移动到死信队列,并且它们永远不会移动。

来自 qpid-jms 的使 ack 发生的代码是 here,看来库的这一部分不打算扩展,否则我会自己实现 returns 不同的确认。

如何让 qpid/JMS 变成

  1. 使用 FAILED ack 而不是 MODIFIED_FAILED_UNDELIVERABLE ack
  2. 清除那些延迟消息。

首先,JMS 规范正确地指出,从 MessageListener 回调抛出的异常实际上是一个应用程序编程错误,您的应用程序应该自行处理这些错误。

其次,客户端使用正确的配置来指示消息未能传递到此客户端,远程应支持 AMQP 1.0 规范中概述的所有配置,我会联系 Microsoft 并请求他们实施规范更紧密。

要解决上述两个问题,您可以在客户端确认模式下接收消息,并且当您拦截抛出的异常时,您可以使用 this JIRA Issue 中描述的机制配置消息确认。

代码如下所示:

message.setIntProperty("JMS_AMQP_ACK_TYPE", 2);
message.acknowledge();

模式定义为:

ACCEPTED = 1;
REJECTED = 2;
RELEASED = 3;
MODIFIED_FAILED = 4;
MODIFIED_FAILED_UNDELIVERABLE = 5;