WSO2 - DAS 消费 MQTT 消息

WSO2 - DAS consuming MQTT messages

我正在使用 Eclipse Kura 1.2.2、WSO2 DAS 3.0.0 和 ActiveMQ 5.12.1 在物联网领域进行一些试验。到目前为止,我设法将 DAS 设置为 M2M 中间件服务器,将 Raspberry PI2 上的 Kura 设置为物联网网关,并将 ActiveMQ 设置为 MQTT 服务器。

我还编写了一个非常基本的 MQTT 消息生成器,定期向 MQTT 服务器发送非常简单的 MQTT 消息,以模拟实际设备发送 MQTT 消息。这个想法是用一个蓝牙设备定期发送数据来代替这个应用程序。

当我使用 MQTTSpy 监控传入消息时,我注意到 MQTT 消息是二进制格式的。这在文档中有明确说明,因为 Kura 在使用 MQTT 发送数据时使用 Google 协议缓冲区。由于 DAS 不支持这种类型的 MQTT 消息,我认为这会导致服务器不响应任何传入消息。

我使用以下定义配置了 DAS 流:

{
  "streamId": "mqtt_sample_01:1.0.0",
  "name": "mqtt_sample_01",
  "version": "1.0.0",
  "nickName": "mqtt_sample_01",
  "description": "mqtt_sample_01",
  "metaData": [],
  "correlationData": [],
  "payloadData": [
    {
      "name": "temperature",
      "type": "FLOAT"
    }
  ]
}

我还使用以下代码为传入的 MQTT 消息创建了一个接收器:

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="mqtt-protobuf">
        <property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property>
        <property name="clientId">mqtt-client-01</property>
        <property name="url">tcp://192.168.1.42:1883</property>
        <property name="cleanSession">false</property>
    </from>
    <mapping customMapping="disable" type="map"/>
    <to streamName="mqtt_sample_01" version="1.0.0"/>
</eventReceiver>

注意:我也尝试过 JSON 和 XML 作为映射类型。

为了在 DAS 控制台上显示所有内容,我使用以下方法添加了发布者:

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
    <from streamName="mqtt_sample_01" version="1.0.0"/>
    <mapping customMapping="disable" type="text"/>
    <to eventAdapterType="logger">
        <property name="uniqueId">mqtt_sample_logger_01</property>
    </to>
</eventPublisher>  

Kura 使用 WSO2-DAS 无法理解的 Google 协议缓冲区来格式化 MQTT 消息。要解决此问题,存在以下几种可能性:

  1. Kura 可以更改 MQTT 消息格式,以不使用 Google 协议缓冲区进行编码。我发现 which is more or less similar to this approach 导致失去了 CloudClient class.
  2. 提供的所有优势
  3. 一种可能性是按照 or this article 中的描述编写您自己的 DAS 接收器。
  4. 第三个选项是浏览 Kura 代码并创建自己的 CloudService/CloudClient 实现。

以我个人的观点,最好的解决方案是选择第二个选项,编写一个自定义事件接收器来理解和解码 Kura 生成的 Google 协议缓冲区格式。其他甚至更好的解决方案也非常受欢迎。

Important notice:
ActiveMQ uses the dot notation for the topic name in the GUI (mqtt-sender-topic.mqtt-client-01.MQTT_APP_V1.mydata). But the real name of the topic uses the /-notation (mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata).

为了构建自定义接收器,我决定从原始 MQTT 接收器复制现有代码并更改它以处理 protobuf 格式并将其转换为 XML(至少是这个想法)。在努力正确设置所有依赖项之后,我设法构建了一个可用的自定义接收器。

不幸的是,我们并不完全是我想去的地方。与 MQTT 代理的连接似乎有问题。接收器启动但似乎经常失去连接,在日志中写入以下消息。

DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT Connection successful
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT connection not reachable
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100)
... 1 more

对于它的价值,代理 (ActiveMQ) 抱怨并发出警告:

WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594

我的代码肯定是做错了什么导致连接断开。问题是什么。因此,我们再次欢迎任何建议、想法和解决方案!

HINT:
Start DAS with the -DosgiConsole option allowing you to investigate the status of your deployed bundle. After successful deployment of the receiver, the command diag [bundle_number] should output something like:
osgi> diag 473
reference:file:../dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver->1.0.0.jar [473]
No unresolved constraints.

WSO2 产品(例如数据分析服务器)的输入接收器示例能够处理 Google 由 Eclipse Kura(KuraPayload 格式)创建的协议缓冲区格式的消息可以是 downloaded at Google Drive

Kura示例应用发送消息也可以是downloaded at Google Drive.

接收方接收到二进制格式的KuraPayload格式,并将其转换为XML。检查 XML 格式的示例应用程序。

请分享improvements/modifications你在接收器上做的事以帮助他人。