Node.js 在收到来自 mqtt 代理的消息后执行 mysql 查询

Node.js executing mysql query after receiving message from mqtt broker

我有一个订阅主题的 node.js 文件,并在收到已发布的消息后扫描本地 mysql 数据库以查找名为“command”的变量中的最新条目。命令值会触发各种响应,但由于我的问题在这之前,所以我省略了这部分。

我的 mysql 查询似乎出现错误。我正在尝试查找命令列的最新条目并将值分配给 var 命令。我认为这段代码可以解决问题:

var sql = 'SELECT command FROM motoron2 ORDER BY id DESC LIMIT 1';
        con.query(sql, function (err, result) {
            if (err) throw err;
        });
        console.log(result);
        var command = result[1];
        console.log(command);

但是我收到以下响应,这似乎表明 mysql 查询中存在错误:

user@server.domain [bin]# node motorlistener.js
Connected to MYSQL!
Connected to Broker!
{"pulse":1}
1
/home/user/etc/domain/bin/motorlistener.js:62
    console.log(result);
                ^

ReferenceError: result is not defined
    at MqttClient.<anonymous> (/home/user/etc/domain/bin/motorlistener.js:62:17)
    at MqttClient.emit (events.js:314:20)
    at MqttClient._handlePublish (/home/user/node_modules/mqtt/lib/client.js:1277:12)
    at MqttClient._handlePacket (/home/user/node_modules/mqtt/lib/client.js:410:12)
    at work (/home/user/node_modules/mqtt/lib/client.js:321:12)
    at Writable.writable._write (/home/user/node_modules/mqtt/lib/client.js:335:5)
    at doWrite (/home/user/node_modules/readable-stream/lib/_stream_writable.js:409:139)
    at writeOrBuffer (/home/user/node_modules/readable-stream/lib/_stream_writable.js:398:5)
    at Writable.write (/home/user/node_modules/readable-stream/lib/_stream_writable.js:307:11)
    at TLSSocket.ondata (_stream_readable.js:718:22)

下面是完整的代码,但是有人知道是什么导致了这个错误吗?

////////////////////////////////////////////////////////////////////////////////
//setup
var mqtt    = require('mqtt'); //for client use
const fs = require('fs');
var caFile = fs.readFileSync("/home/user/etc/domain/bin/ca.crt");
var topic = "heartbeat";
var mysql      = require('mysql');

var con = mysql.createConnection({
  host     : 'localhost',
  user     : 'myuser',
  password : 'mypass',
  database : 'mydb'
});

var options={
    port:8883,
    clientId:"yo",
    username:"myuser2",
    password:"mypassw",
    protocol: 'mqtts',
    clean:true,
    rejectUnauthorized: false,
    retain:false, 
    ca:caFile
};


var client  = mqtt.connect("http://dns.org",options);

//mqtt connection dialog
client.on("connect",function(){
    console.log("Connected to Broker!");
    client.subscribe(topic, {qos:1});
});

//mqtt connection handle errors
client.on("error",function(error){
    console.log("Broker Connection Error");
    process.exit(1);
});

//database connection
con.connect(function(err) {
        if (err) throw err;
        console.log("Connected to MYSQL!");
});

//handle incoming messages from broker
client.on('message',function(topic, message, packet){
    var raw = ""+message;
    console.log(raw);
    var obj = JSON.parse(raw);
    var pulse = obj.pulse; 
    console.log(pulse);
    
    var sql = 'SELECT command FROM motoron2 ORDER BY id DESC LIMIT 1';
    con.query(sql, function (err, result) {
        if (err) throw err;
    });
    console.log(result);
    var command = result[1];
    console.log(command);
    
    if (command == 1) {
        console.log("command=1");
    }
    else {
        console.log("command not equal to 0");
    }

});

I am getting the following response which seems to indicate an error in the mysql query

这不是您的 MySQL 查询中的错误。这是一个空引用错误,因为您正试图在回调之外使用 result

将您的代码更改为此将有效:

var sql = 'SELECT command FROM motoron2 ORDER BY id DESC LIMIT 1';
con.query(sql, function (err, result) {
   if (err) { 
       throw err; 
   }
   // access result inside the callback
   console.log(result);
   var command = result[0];
   console.log(command);
});

根据您的环境,您可以re-write您的代码使用promises and async/await来减少嵌套范围。

为此,您需要将回调转换为承诺,然后您可以等待它,如下所示:

let sql = 'SELECT command FROM motoron2 ORDER BY id DESC LIMIT 1';

// 1 -- we turn the query into a promise 
const queryPromise = new Promise((resolve, reject) => {
  con.query(sql, function (queryError, queryResult) {
    if (queryError) { 
       reject(queryError); 
    }
    resolve(queryResult);
  });
});

try {
  // 2 -- we can now await the promise; note the await
  let result = await queryPromise;

  // 3 -- now we can use the result as if it executed synchronously
  console.log(result);
  let command = result[0];
  console.log(command);
} catch(err) {
  // we can catch query errors and handle them here
}

综上所述,您应该能够将 on message 事件处理程序更改为异步函数,以便利用 async/await 模式,如上所示:

client.on('message', async function(topic, message, packet) {
    
    /* .. you can use await here .. */
    
});

以上来自@Mike Dinescu 的所有代码都运行良好。最后别忘了关闭连接!

否则运行程序将在测试完成后挂起。

完整解决方案:

async function mySqlConnect(dbquery) {
    const conn = mysql.createPool({
      host: 'localhost',
      port: 3306,  
      user: 'test',
      password: 'test',
      database: 'test'
    }, { debug: true });
    
    // 1 -- we turn the query into a promise 
const queryPromise = new Promise((resolve, reject) => {
  conn.query(dbquery, function (queryError, queryResult) {
    if (queryError) { 
       reject(queryError); 
    }
    resolve(queryResult);
  });
});

try {
  // 2 -- we can now await the promise; note the await
  let result = await queryPromise;

  // 3 -- now we can use the result as if it executed synchronously
  //console.log(result);
  let command = await result[0];
  //console.log(command);
  return command;
} catch(err) {

}

finally{
  conn.end(function(err) {
    if (err) {
      return console.log('error:' + err.message);
    }
    //console.log('Close the database connection.');
  });
}
  
}