使用 JMS 为 Azure 服务总线安排消息

Schedule message for Azure Service Bus with JMS

我想使用 JMS 将预定消息发送到 Azure 服务总线。 我的代码基于 org.apache.qpid.jms.message.JmsMessage。我找到了针对给定问题的一种解决方案,但它使用 org.apache.qpid.proton.message.Message.getMessageAnnotations(),它允许编辑消息注释并添加一些 Azure 服务总线正确识别和处理的属性。我的消息 impl 缺少该方法。

我在 node.js 的官方文档和实现中找到的内容,要使用 Azure 服务总线安排消息,您需要发送 header BrokerProperties/brokerProperties 有效 json。 其他 headers/properties 将被标记为 Customer properties 并被 Azure 服务总线忽略。

official azure docs about JMS 表示 JMS API 未正式支持设置 ScheduledEnqueueTimeUtc。但是可以通过设置属性.

手动实现

所以当我向 queue 发送消息时,我可以 post 在 lambda 中处理它并设置一些属性:

jmsTemplate.convertAndSend(queue, payload, message -> {
    var date = Date.from(ZonedDateTime.now(ZoneId.of("UTC")).plus(delay, ChronoUnit.MILLIS).toInstant());
    var brokerProps = Map.of("ScheduledEnqueueTimeUtc", date.toGMTString());
    message.setStringProperty(
        "brokerProperties",
        objectMapper.writeValueAsString(brokerProps)
    );
    return message;
});

而且它不起作用。消息到达 queue,但当我尝试在 Azure 的服务总线资源管理器中查看它时,它会在浏览器控制台中抛出错误,并且操作会一直持续下去。我想设置 属性 brokerProperties 会对服务总线产生一些影响。 我还尝试发送一个带有日期作为字符串的地图(使用 Azure 使用的日期格式),例如 "ScheduledEnqueueTimeUtc", "Thu, 25 Mar 2021 12:54:00 GMT",但它也被服务总线识别为错误(偷看永远持续并且错误在抛出浏览器控制台)。

我尝试设置字符串属性,例如 x-opt-scheduled-enqueue-timex-ms-scheduled-enqueue-time,我在 SO 的其他线程中找到了这些属性,但其中 none 适用于我的示例。

我看到 Microsoft 为 Java 提供了一些库来与 Azure 服务总线通信,但我需要在我的代码中保持与云提供商的独立性,并且不包含任何额外的库。

是否有使用包 org.apache.qpid.jms.message.JmsMessage 中的 JMS 消息实现来为 Azure 服务总线设置 BrokerProperties 的示例?

我的团队目前正面临同样的问题。

我们发现 MessageAnnotationsMap 中设置了 ScheduledEnqueueTimeUtc 属性。不幸的是,JMS 使用的 org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade 已将 getter 和 setter 设置为包私有。但我们发现您可以使用 setTracingAnnotation(String key, Object value) 方法。

示例:

public void sendDelayedMessage() {
    final var now = ZonedDateTime.now();
    jmsTemplate.send("test-queue", session -> {
        final var tenMinutesFromNow = now.plusMinutes(10);
        final var textMessage = session.createTextMessage("Hello Service Bus!");
        ((JmsTextMessage) textMessage).getFacade().setTracingAnnotation("x-opt-scheduled-enqueue-time", Date.from(tenMinutesFromNow.toInstant()));
        return textMessage;
    });
    log.info("Sent at: " + now);
}

证明:

非常感谢我的 teammate!!