Emqx : 遗漏了很多mqtt协议推送的消息
Emqx : Miss a lot message that pushed by mqtt protocol
我正在使用 mqtt 协议推送一些消息,我的代理是 emqx。我用 nodejs 编写这个脚本来推送特定主题的 200,000 条消息:
const mqtt = require('mqtt');
const options = {
clientId: "tazikpush",
clean: true
}
const client = mqtt.connect("mqtt://xxxxxxxxxx", options);
var topic = "/ApplyData/";
var pushOptions = {
retain: false,
qos: 2
};
const snooze = ms => new Promise(resolve => setTimeout(resolve, ms));
const example = async () => {
console.log('Waiting 5 sec and then start');
await snooze(5000);
for (var i = 0; i < 200000; i++) {
// await snooze(250);
client.publish(topic, message, pushOptions);
console.log(`done! ${i}`);
}
};
example();
我通过 nodejs 写了一个订阅者来收听这个主题,然后将数据存储到 redis 数据库中。
但我有一个问题:
Why listener should stop till push client push all 200000 message?
Why subscriber just receive 100 message? and other message dropped.
在我的订阅者上我创建了一个 js file.In 我创建了一个客户端并使用 qos 2 订阅了我的主题
mqttClient.js
const mqtt = require('mqtt');
const log4js = require('log4js');
const config = require('config');
const topic_sub = "/ApplyData/";
log4js.configure(JSON.parse(JSON.stringify(config.get('Logger'))));
var logger = log4js.getLogger('app');
logger.level = 'debug';
const options = {
clientId: "mqttjs01",
clean: true
}
const client = mqtt.connect("mqtt://xxxxxxx", options);
client.on("connect", () => {
console.log("connected " + client.connected);
client.subscribe(topic_sub, { qos: 2 });
});
client.on("error", (error) => {
console.log("Can't connect" + error);
logger.debug(`Client Error : `, error);
});
module.exports = client;
我在控制器上使用客户端事件。我的订阅者实际上是一名工作人员,这意味着它的工作只是订阅消息并将这些消息存储到数据库中。
在App.js我要求:
const client = require('./mqttClient');
const controller = require('./controller/mainController');
在主控制器中,我通过调用 client.on
:
订阅了消息
client.on('message', async (topic, message, packet) => {
console.log(topic);
if (topic === '/ApplyData/') {
var jobject = JSON.parse(message);
jobject.nid = uuid()
try {
let res = await redis_cache.cache(jobject);
} catch (err) {
console.log(err);
}
我在推送 200,000 条消息后从控制台使用 运行 调试代理
2019-09-25 11:41:50.885 [warning] tazikpush [Session] Dropped qos2 packet 36998 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 36999 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37000 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37001 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37002 for too many awaiting_rel....
2019-09-25 11:49:57.544 [warning] tazikpush [Session] Dropped qos2 packet 40292 for too many awaiting_rel
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36898 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36899 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36900 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36901 is not found ...
日志
您的发布速度太快,使用单个客户端处理消费太慢。
这可能与您的配置有关:
etc/emqx.conf
## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit.
##
## Value: Number
zone.external.max_awaiting_rel = 100
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
##
## Value: Duration
zone.external.await_rel_timeout = 300s
最好的方法是使用 EMQ X Enterprise 或共享订阅来添加更多客户端:
https://docs.emqx.io/tutorial/v3/en/advanced/share_subscribe.html
我正在使用 mqtt 协议推送一些消息,我的代理是 emqx。我用 nodejs 编写这个脚本来推送特定主题的 200,000 条消息:
const mqtt = require('mqtt');
const options = {
clientId: "tazikpush",
clean: true
}
const client = mqtt.connect("mqtt://xxxxxxxxxx", options);
var topic = "/ApplyData/";
var pushOptions = {
retain: false,
qos: 2
};
const snooze = ms => new Promise(resolve => setTimeout(resolve, ms));
const example = async () => {
console.log('Waiting 5 sec and then start');
await snooze(5000);
for (var i = 0; i < 200000; i++) {
// await snooze(250);
client.publish(topic, message, pushOptions);
console.log(`done! ${i}`);
}
};
example();
我通过 nodejs 写了一个订阅者来收听这个主题,然后将数据存储到 redis 数据库中。 但我有一个问题:
Why listener should stop till push client push all 200000 message?
Why subscriber just receive 100 message? and other message dropped.
在我的订阅者上我创建了一个 js file.In 我创建了一个客户端并使用 qos 2 订阅了我的主题 mqttClient.js
const mqtt = require('mqtt');
const log4js = require('log4js');
const config = require('config');
const topic_sub = "/ApplyData/";
log4js.configure(JSON.parse(JSON.stringify(config.get('Logger'))));
var logger = log4js.getLogger('app');
logger.level = 'debug';
const options = {
clientId: "mqttjs01",
clean: true
}
const client = mqtt.connect("mqtt://xxxxxxx", options);
client.on("connect", () => {
console.log("connected " + client.connected);
client.subscribe(topic_sub, { qos: 2 });
});
client.on("error", (error) => {
console.log("Can't connect" + error);
logger.debug(`Client Error : `, error);
});
module.exports = client;
我在控制器上使用客户端事件。我的订阅者实际上是一名工作人员,这意味着它的工作只是订阅消息并将这些消息存储到数据库中。
在App.js我要求:
const client = require('./mqttClient');
const controller = require('./controller/mainController');
在主控制器中,我通过调用 client.on
:
client.on('message', async (topic, message, packet) => {
console.log(topic);
if (topic === '/ApplyData/') {
var jobject = JSON.parse(message);
jobject.nid = uuid()
try {
let res = await redis_cache.cache(jobject);
} catch (err) {
console.log(err);
}
我在推送 200,000 条消息后从控制台使用 运行 调试代理
2019-09-25 11:41:50.885 [warning] tazikpush [Session] Dropped qos2 packet 36998 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 36999 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37000 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37001 for too many awaiting_rel
2019-09-25 11:41:50.900 [warning] tazikpush [Session] Dropped qos2 packet 37002 for too many awaiting_rel....
2019-09-25 11:49:57.544 [warning] tazikpush [Session] Dropped qos2 packet 40292 for too many awaiting_rel
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36898 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36899 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36900 is not found
2019-09-25 11:49:57.544 [warning] tazikpush [Session] The PUBREL PacketId 36901 is not found ...
日志
您的发布速度太快,使用单个客户端处理消费太慢。
这可能与您的配置有关:
etc/emqx.conf
## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit.
##
## Value: Number
zone.external.max_awaiting_rel = 100
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
##
## Value: Duration
zone.external.await_rel_timeout = 300s
最好的方法是使用 EMQ X Enterprise 或共享订阅来添加更多客户端:
https://docs.emqx.io/tutorial/v3/en/advanced/share_subscribe.html