Paho javascript mqtt 压缩负载

Paho javascript mqtt compressed payload

我正在开发一个可以连接到 mqtt 代理并获取负载消息的网站。

我使用的库是(https://eclipse.org/paho/clients/js/)。

我的问题如下。当我尝试获取标准压缩消息(gzipped)时,库抛出异常“Error: AMQJS0009E Malformed UTF data:f5 3 -52.”

如何处理压缩邮件?

这是我的代码:

var selected_tags   = '';
var checked_tags    = [];

var hostname        = 'xxx';
var port            = 80;
var qos             = 1;

var user            = 'xxx';
var pass            = 'xxx';

var keepAlive       = 60;
var timeout         = 3;
var ssl             = false;
var cleanSession    = true;
var lastWillTopic   = '';
var lastWillQos     = 1;
var lastWillRetain  = false;
var lastWillMessage = '';
var g_topic;
var client = new Messaging.Client(hostname, port, "myclientid_" + parseInt(Math.random() * 100, 10));

var options = {
    timeout             : 3,
    userName            : user,
    password            : pass,
    keepAliveInterval   : keepAlive,
    cleanSession        : cleanSession,
    useSSL              : ssl,
    onSuccess           : function () {
        console_log('<span style="color:green">Connected</span>');
    },
    onFailure           : function (message) {
        console_log('<span style="color:red">Connection failed: ' + message.errorMessage + '</span>');
        client.connect(options);
    }
};
function _subscribe(){
    g_topic = $("#sel_reader").val();
    console_log('<span style="color:green">subscribe to: ' + g_topic + '</span>');
    client.subscribe(g_topic, {qos: qos});
    $("#_subscribe").addClass('hide');
    $("#_unsubscribe").removeClass('hide');
}
function _unsubscribe(){
    console_log('<span style="color:red">unsubscribe from: ' + g_topic + '</span>');
    client.unsubscribe(g_topic);
    $("#_subscribe").removeClass('hide');
    $("#_unsubscribe").addClass('hide');
}
client.onConnectionLost = function (responseObject) {
    console_log('<span style="color:red">Connection lost: ' + responseObject.errorMessage + '</span>');
    client.connect(options);
};
client.onMessageArrived = function (message) {
    // console.log("message arrived");

    var live_search = [];
    var live_found  = '';
    var json = JSON.parse(message.payloadString);

    console_log('<hr />Message Recieved: Topic: ' + message.destinationName + '<br />' 
                  +message.payloadString + 
                '. QoS: ' + message.qos
    );

};
var publish = function (payload, topic, qos) {
    var message = new Messaging.Message(payload);
    message.destinationName = topic;
    message.qos = qos;
    client.send(message);
}
function console_log(txt){
    $("#console").append('<b>'+txt + "</b><br />");
}
var prev_imei = '';
var find = ':';
var re   = new RegExp(find, 'g');
function SortByRssi(a, b){
    var arssi = a.rssi;
    var brssi = b.rssi; 
    return ((arssi > brssi) ? -1 : ((arssi < brssi) ? 1 : 0));
}
$(document).ready(function(){
    client.connect(options);
});

好的,所以在 MQTT 或 Paho Javascript 客户端中没有内置的 gzip 压缩负载处理。

这意味着当您的代码运行时,它会尝试将 gzip 流解析为 UTF-8 字符串。

client.onMessageArrived = function (message) {
    // console.log("message arrived");

    var live_search = [];
    var live_found  = '';
    var json = JSON.parse(message.payloadString);

    console_log('<hr />Message Recieved: Topic: ' + message.destinationName + '<br />' 
                  +message.payloadString + 
                '. QoS: ' + message.qos
    );
};

首先,您需要使用原始字节而不是消息的字符串形式。您可以从 message.payloadBytes 字段访问原始字节。

现在你遇到了实际解压的问题。这个旧的 question 看起来会有帮助。

client.onMessageArrived = function (message) {
    // console.log("message arrived");

    var live_search = [];
    var live_found  = '';
    var compressedJSON = message.payloadBytes;
    var json = pako.inflate(compressedJSON);
    console.log('<hr />Message Recieved: Topic: ' + message.destinationName + '<br />' 
              + json + 
            '. QoS: ' + message.qos
    );
};

如果有人需要 Python 的答案:

import zlib

def _on_message(self, mqttc, obj, msg):
    dec_msg = zlib.decompress(msg.payload)
    # do whatever you need with dec_msg
    print dec_msg