互操作性 Azure 服务总线消息队列消息

Interoperability Azure Service Bus Message Queue Messages

我有一个 Java 应用程序和一个 NodeJS 应用程序,它们都使用单个 Azure 服务总线消息队列。

我在客户身上看到了一些奇怪的效果,如下所示。

JAVA MESSAGE PRODUCER(根据 Azure JMS 教程使用 QPID 库):

 TextMessage message = sendSession.createTextMessage();
        message.setText("Test AMQP message from JMS");
        long randomMessageID = randomGenerator.nextLong() >>>1;
        message.setJMSMessageID("ID:" + randomMessageID);
        sender.send(message);
        System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());

输出: 已发送 JMSMessageID = ID:2414932965987073843

的消息

NodeJS 消息消费者:

serviceBus.receiveQueueMessage(queue, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {
if(message !==null)console.log(util.inspect(message, {showHidden: false, depth: null}));
});

输出:

{ body: '@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001aTest AMQP message from JMS',
brokerProperties:
{ DeliveryCount: 1,
EnqueuedSequenceNumber: 5000004,
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:28:21 GMT',
MessageId: '2414932965987073843',
PartitionKey: '89',
SequenceNumber: 59672695067659070,
State: 'Active',
TimeToLive: 1209600,
To: 'moequeue' },
contentType: 'application/xml; charset=utf-8' }

如果我将其与通过 serviceBus.sendQueueMessage() 插入队列的消息进行比较,则属性如下所示:

{ body: 'test message',
brokerProperties:
{ DeliveryCount: 1,
EnqueuedSequenceNumber: 0,
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:44:03 GMT',
MessageId: 'bc0a3d4f-15ba-434f-9fb0-1a3789885f8c',
PartitionKey: '734',
SequenceNumber: 37436171906517256,
State: 'Active',
TimeToLive: 1209600 },
contentType: 'text/plain',
customProperties:
{ message_number: 0,
sent_date: Wed Nov 04 2015 21:44:03 GMT+0000 (UTC) } }

所以开始的内容类型不同 - 为什么? - 那么第一个消息有效载荷正文中的奇怪垃圾来自哪里:@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001a 那是序列化的结果吗?如何缓解这种情况?

也可以在这里找到代码: http://pastebin.com/T9RTFRBk

Azure 服务总线支持两种不同的协议:AMQP 和 HTTP。使用 qpid 库的 Java/JMS 正在使用 ServiceBus 的 AMQP 协议。然而,ServiceBus REST APIs 通过 HTTP 协议封装在 NodeJS 中。

Service Bus对AMQP的支持详情,请参考https://azure.microsoft.com/en-us/documentation/articles/service-bus-amqp-overview/

而ServiceBus的REST API请参考https://msdn.microsoft.com/en-us/library/azure/hh780717.aspx

AMQP is a binary, application layer protocol, designed to efficiently support a wide variety of messaging applications and communication patterns. - from WikiPedia

但是 HTTP 是文本协议。

消息格式如下,请参考神器http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format. And the AMQP specification can be refered to http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.htmlMessage Format部分。

                                                     Bare Message
                                                            |
                                      .---------------------+--------------------.
                                      |                                          |
 +--------+-------------+-------------+------------+--------------+--------------+--------+
 | header | delivery-   | message-    | properties | application- | application- | footer |
 |        | annotations | annotations |            | properties   | data         |        |
 +--------+-------------+-------------+------------+--------------+--------------+--------+
 |                                                                                        |
 '-------------------------------------------+--------------------------------------------'
                                             |
                                      Annotated Message

因此在 Java 中发送的消息或在 NodeJS 中发送的消息被序列化为不同的结果。

来自AMQP的body内容中格式化的内容\uXXXX是Unicode字符。

Unicode字符\u0006是Acknowledge controll charater,请参考https://en.wikipedia.org/wiki/Acknowledge_character了解。

而Unicode字符\u001a是替代控制字符,请参考https://en.wikipedia.org/wiki/Substitute_character

它们限制了消息中元数据的开始和结束 header。

我们遇到了完全相同的问题,尽管在一个使用基于 Camel 的生产者的更复杂的示例中。由于我们环境的变化,我们开始遇到这些问题。

这里的问题是 REST 服务在对节点客户端的 HTTP 响应进行编码时如何解释 JMS 消息。

我们发现 JmsTextmessage 出于某种原因(不完全清楚)被假定为 "application/xml" 类型,内容将照此转发。因此,您在示例中得到的输出。

如果改为使用 JmsByteMessage,内容将被解释为 "application/octet-stream" 并且不会在传输中被破坏。

所以尝试以下方法:

BytesMessage message = sendSession.createBytesMessage();
String body = "Test AMQP message from JMS";
message.writeBytes(body.getBytes(StandardCharsets.UTF_8));
sender.send(message);

我们使用它来传输 JSON 编码数据以供 Node.js 客户端解释。

我遇到了同样的问题,消息正文的前缀是“@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/\u0001”。

我使用 Nodejs,使用 azure-iot-device-mqttazure-iot-device 包,向 IoT 发送消息中心。我正在使用流分析作业从 IoT 中心接收消息并将它们发布到队列。我正在使用带有 amqp10 包的 Nodejs 从队列中接收事件。

问题不是由我发送或接收消息的方式引起的。相反,问题出在流分析兼容性级别上!兼容级别 1.0(至少在我部署时是默认级别)使用 DataContractSerializer 将消息序列化为 XML 流! Microsoft 使用兼容级别 1.1 更改(修复)了此问题。因此,您可能只需要将流分析作业的兼容级别(配置->兼容级别)更改为 1.1。

参见:https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-compatibility-level#major-changes-in-the-latest-compatibility-level-11: