节点应用程序:尝试从 firebird.attach 例程中发布 MQTT 消息
Node App: Trying to publish an MQTT message from within a firebird.attach routine
所以我有一个 firebird.attach 成功运行的例程
res.render('success');
client.publish('time', 'test');
console.log(`MQTT sent`)
但我正在尝试从其中发布上述 mqtt 消息。
client.on('connect', function () {
client.subscribe('presence', function (err) {
if (!err) {
client.publish('test', 'Hello')
}
})
})
以上消息有效并被发送和接收,它在 firebird.attach 代码块之外独立存在,所以我想知道是否有可能 运行 mqtt 代码在 firebird.attach 块内? console.log 消息(MQTT 发送)确实出现了,所以我很困惑。
谢谢
代码在这里:
require('dotenv').config()
// Express intialization
const express = require('express')
const app = express()
app.set('view engine', 'pug')
const port = 4010
var moment = require('moment'); // require
app.use(express.static('public'))
// Firebird initialization
var firebird = require('node-firebird')
var options = {}
options.host = process.env.HOST
options.port = process.env.PORT
options.database = process.env.DB_PATH
options.user = process.env.DB_USER
options.password = process.env.DB_PASSWORD
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://192.168.1.1')
client.on('connect', function () {
client.subscribe('presence', function (err) {
if (!err) {
client.publish('presence', 'Hello mqtt')
}
})
})
client.on('message', function (topic, message) {
// message is Buffer
console.log(message.toString())
client.end()
})
firebird.attach(options, function(err, db) {
if(err) throw err;
app.get('/', (req, res) => {
db.query('SELECT opid, name FROM PXOPS WHERE OPID NOT IN (1, -1, 10, 14)', function(err, rows) {
if(err) throw err;
res.render('index', { operators: rows });
});
})
app.get('/clock/:id', (req, res) => {
if(req.params.id) {
db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, operator_row = []) {
if(operator_row.length === 1)
{
var clockedIn = false;
db.query('SELECT FIRST 1 * FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' AND DATETIME >= \'' + moment().startOf('day').format('YYYY-MM-DD HH:mm:ss') + '\' ORDER BY DATETIME DESC', function(err, last_row) {
console.log(last_row);
if(err) throw err;
if(last_row.length === 1 && last_row[0].EPXNTTYPE === 0) clockedIn = true;
if(clockedIn)
{
db.query('SELECT FIRST 10 * FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' AND DATETIME >= \'' + moment().startOf('day').format('YYYY-MM-DD HH:mm:ss') + '\'', function(err, today_rows) {
totalMinutes = 0;
console.log(today_rows);
today_rows.push({ DATETIME: moment().format('YYYY-MM-DD HH:mm:ss')});
splitRows = today_rows.reduce(function(result, value, index, array) {
if (index % 2 === 0)
result.push(array.slice(index, index + 2));
return result;
}, []);
splitRows.forEach(pair => {
totalMinutes += Math.round(moment.duration(moment(pair[1].DATETIME).diff(moment(pair[0].DATETIME))).asMinutes());
});
console.log(totalMinutes);
minuteString = null;
hours = 0;
remainingMinutes = 0;
if(totalMinutes > 60)
{
hours = Math.floor(totalMinutes / 60);
remainingMinutes = totalMinutes - (hours * 60);
}
if(hours > 0 && remainingMinutes > 0)
{
minuteString = hours + ' hours and ' + remainingMinutes + ' minutes'
}
else if(hours > 0 && remainingMinutes === 0)
{
minuteString = hours + ' hours';
}
else {
minuteString = totalMinutes + ' minutes';
}
res.render('clock', { operator: operator_row[0], working_string: minuteString, clockedIn: clockedIn });
});
}
else
{
res.render('clock', {operator: operator_row[0], clockedIn: clockedIn})
}
});
}
});
}
})
app.get('/clock-in/:id', (req, res) => {
db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, rows) {
if(rows.length === 0)
{
res.render('error');
}
// Check if he's already clocked in
db.query('SELECT FIRST 1 OPID FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
if(err) throw err;
if(rows.length === 1 && rows[0].EPXNTTYPE == 0)
{
res.render('error');
}
else
{
db.query('SELECT FIRST 1 ATTENDEPXNTID FROM PXATTENDEPXNTS ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
lastID = 0;
if(rows.length === 1)
{
lastID = rows[0].ATTENDEPXNTID;
}
db.query('INSERT INTO PXATTENDEPXNTS (ATTENDEPXNTID, OPID, DATETIME, EPXNTTYPE, SCANNEDSTR, BRANCHID, CHANGED_FLAG, DEPTID) VALUES (' + (lastID + 1) + ','+ req.params.id +', \'' + moment().format('YYYY-MM-DD HH:mm:ss') + '\', 0, null, 1, 1, 0)', function(err, rows) {
if(err) throw err;
res.render('success');
client.publish('timeclock', 'clocked in');
console.log(`MQTT sent`)
});
});
}
});
});
})
app.get('/clock-out/:id', (req, res) => {
db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, rows) {
if(rows.length === 0)
{
res.render('error');
}
// Check if he's already clocked out
db.query('SELECT FIRST 1 OPID FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
if(err) throw err;
if(rows.length === 1 && rows[0].EPXNTTYPE == 1)
{
res.render('error');
}
else
{
db.query('SELECT FIRST 1 ATTENDEPXNTID FROM PXATTENDEPXNTS ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
lastID = 0;
if(rows.length === 1)
{
lastID = rows[0].ATTENDEPXNTID;
}
db.query('INSERT INTO PXATTENDEPXNTS (ATTENDEPXNTID, OPID, DATETIME, EPXNTTYPE, SCANNEDSTR, BRANCHID, CHANGED_FLAG, DEPTID) VALUES (' + (lastID + 1) + ','+ req.params.id +', \''+ moment().format('YYYY-MM-DD HH:mm:ss') +'\', 1, null, 1, 1, 0)', function(err, rows) {
if(err) throw err;
res.render('success')
});
});
}
});
});
})
})
app.listen(port, () => {
console.log(`App listening at http://localhost:${port}`)
})
on message
回调中的 client.end()
将关闭 MQTT 客户端。因此,您在 connect
回调中发布的消息将导致客户端立即退出。
(如果订阅主题,客户端会收到自己发布的消息,这可以使用 MQTT v5 iirc 进行更改,但需要明确设置)
所以我有一个 firebird.attach 成功运行的例程
res.render('success');
client.publish('time', 'test');
console.log(`MQTT sent`)
但我正在尝试从其中发布上述 mqtt 消息。
client.on('connect', function () {
client.subscribe('presence', function (err) {
if (!err) {
client.publish('test', 'Hello')
}
})
})
以上消息有效并被发送和接收,它在 firebird.attach 代码块之外独立存在,所以我想知道是否有可能 运行 mqtt 代码在 firebird.attach 块内? console.log 消息(MQTT 发送)确实出现了,所以我很困惑。
谢谢
代码在这里:
require('dotenv').config()
// Express intialization
const express = require('express')
const app = express()
app.set('view engine', 'pug')
const port = 4010
var moment = require('moment'); // require
app.use(express.static('public'))
// Firebird initialization
var firebird = require('node-firebird')
var options = {}
options.host = process.env.HOST
options.port = process.env.PORT
options.database = process.env.DB_PATH
options.user = process.env.DB_USER
options.password = process.env.DB_PASSWORD
var mqtt = require('mqtt')
var client = mqtt.connect('mqtt://192.168.1.1')
client.on('connect', function () {
client.subscribe('presence', function (err) {
if (!err) {
client.publish('presence', 'Hello mqtt')
}
})
})
client.on('message', function (topic, message) {
// message is Buffer
console.log(message.toString())
client.end()
})
firebird.attach(options, function(err, db) {
if(err) throw err;
app.get('/', (req, res) => {
db.query('SELECT opid, name FROM PXOPS WHERE OPID NOT IN (1, -1, 10, 14)', function(err, rows) {
if(err) throw err;
res.render('index', { operators: rows });
});
})
app.get('/clock/:id', (req, res) => {
if(req.params.id) {
db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, operator_row = []) {
if(operator_row.length === 1)
{
var clockedIn = false;
db.query('SELECT FIRST 1 * FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' AND DATETIME >= \'' + moment().startOf('day').format('YYYY-MM-DD HH:mm:ss') + '\' ORDER BY DATETIME DESC', function(err, last_row) {
console.log(last_row);
if(err) throw err;
if(last_row.length === 1 && last_row[0].EPXNTTYPE === 0) clockedIn = true;
if(clockedIn)
{
db.query('SELECT FIRST 10 * FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' AND DATETIME >= \'' + moment().startOf('day').format('YYYY-MM-DD HH:mm:ss') + '\'', function(err, today_rows) {
totalMinutes = 0;
console.log(today_rows);
today_rows.push({ DATETIME: moment().format('YYYY-MM-DD HH:mm:ss')});
splitRows = today_rows.reduce(function(result, value, index, array) {
if (index % 2 === 0)
result.push(array.slice(index, index + 2));
return result;
}, []);
splitRows.forEach(pair => {
totalMinutes += Math.round(moment.duration(moment(pair[1].DATETIME).diff(moment(pair[0].DATETIME))).asMinutes());
});
console.log(totalMinutes);
minuteString = null;
hours = 0;
remainingMinutes = 0;
if(totalMinutes > 60)
{
hours = Math.floor(totalMinutes / 60);
remainingMinutes = totalMinutes - (hours * 60);
}
if(hours > 0 && remainingMinutes > 0)
{
minuteString = hours + ' hours and ' + remainingMinutes + ' minutes'
}
else if(hours > 0 && remainingMinutes === 0)
{
minuteString = hours + ' hours';
}
else {
minuteString = totalMinutes + ' minutes';
}
res.render('clock', { operator: operator_row[0], working_string: minuteString, clockedIn: clockedIn });
});
}
else
{
res.render('clock', {operator: operator_row[0], clockedIn: clockedIn})
}
});
}
});
}
})
app.get('/clock-in/:id', (req, res) => {
db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, rows) {
if(rows.length === 0)
{
res.render('error');
}
// Check if he's already clocked in
db.query('SELECT FIRST 1 OPID FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
if(err) throw err;
if(rows.length === 1 && rows[0].EPXNTTYPE == 0)
{
res.render('error');
}
else
{
db.query('SELECT FIRST 1 ATTENDEPXNTID FROM PXATTENDEPXNTS ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
lastID = 0;
if(rows.length === 1)
{
lastID = rows[0].ATTENDEPXNTID;
}
db.query('INSERT INTO PXATTENDEPXNTS (ATTENDEPXNTID, OPID, DATETIME, EPXNTTYPE, SCANNEDSTR, BRANCHID, CHANGED_FLAG, DEPTID) VALUES (' + (lastID + 1) + ','+ req.params.id +', \'' + moment().format('YYYY-MM-DD HH:mm:ss') + '\', 0, null, 1, 1, 0)', function(err, rows) {
if(err) throw err;
res.render('success');
client.publish('timeclock', 'clocked in');
console.log(`MQTT sent`)
});
});
}
});
});
})
app.get('/clock-out/:id', (req, res) => {
db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, rows) {
if(rows.length === 0)
{
res.render('error');
}
// Check if he's already clocked out
db.query('SELECT FIRST 1 OPID FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
if(err) throw err;
if(rows.length === 1 && rows[0].EPXNTTYPE == 1)
{
res.render('error');
}
else
{
db.query('SELECT FIRST 1 ATTENDEPXNTID FROM PXATTENDEPXNTS ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
lastID = 0;
if(rows.length === 1)
{
lastID = rows[0].ATTENDEPXNTID;
}
db.query('INSERT INTO PXATTENDEPXNTS (ATTENDEPXNTID, OPID, DATETIME, EPXNTTYPE, SCANNEDSTR, BRANCHID, CHANGED_FLAG, DEPTID) VALUES (' + (lastID + 1) + ','+ req.params.id +', \''+ moment().format('YYYY-MM-DD HH:mm:ss') +'\', 1, null, 1, 1, 0)', function(err, rows) {
if(err) throw err;
res.render('success')
});
});
}
});
});
})
})
app.listen(port, () => {
console.log(`App listening at http://localhost:${port}`)
})
message
回调中的 client.end()
将关闭 MQTT 客户端。因此,您在 connect
回调中发布的消息将导致客户端立即退出。
(如果订阅主题,客户端会收到自己发布的消息,这可以使用 MQTT v5 iirc 进行更改,但需要明确设置)