MqttClient 发送最后一条消息的前一条
MqttClient sends the one before last message
在我的处理程序代码(无服务器框架)中,我向 RabbitMQ 推送了一条消息,但问题是当我发送第一条消息时,订阅者没有收到任何东西,而第二条消息他们只收到第一条消息,其余消息也是如此(当我发送一条消息时,前一条消息已送达!)。有什么想法吗?
编辑:我用非常基本和简单的代码替换了实际代码,但结果仍然相同。
Lambda- create.ts
import { APIGatewayEvent, Context, Callback, Handler } from "aws-lambda";
import { config } from "../common/config";
import publish from "../common/publisher";
export const create: Handler = (event: APIGatewayEvent, context: Context, cb: Callback) => {
console.log('test started');
context.callbackWaitsForEmptyEventLoop = false;
const topic = 'float/push';
const num = Math.random();
const message = JSON.stringify({ floatId: num });
publish(config.PUSH_BROKER_UFRL, config.PUSH_USERNAME, config.PUSH_PASSWORD, topic, message, () => {
console.log('calling the callback');
cb(null, {
statusCode: 200,
headers: {
'Access-Control-Allow-Origin': '*', // Required for CORS support to work
},
body: JSON.stringify({ id: num })
});
});
};
publisher.ts
import { Callback } from 'aws-lambda';
import { Client, connect, MqttClient, Packet } from 'mqtt';
function publish(brokerUrl: string, username: string, password: string, topic: string,
message: string, callback: (() => void)): void {
console.log('publish started');
const client: Client = connect(brokerUrl, {
username,
password
});
client.options.clientId = 'Cashmanager.portal';
client.addListener('connect', () => {
console.log('connected to the queue');
console.log(`message to publish: ${JSON.stringify(message)}`);
client.publish(topic, message, (err, packet) => {
console.log(`err: ` + err);
console.log(`packet: ${JSON.stringify(packet)}`);
callback();
});
});
}
导出默认发布;
来自 cloudwatch 的示例日志:
START RequestId: ea63e6ca-318f-11e8-b766-b78fb7754d27 Version: $LATEST
2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27 test
started
2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27 publish
started
2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 connected
to the queue
2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 message
to publish: "{\"floatId\":0.24342369749799642}"
2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 err:
undefined
2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 packet:
undefined
2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 calling
the callback END RequestId: ea63e6ca-318f-11e8-b766-b78fb7754d27
所以答案是您应该在发布这条消息后调用 client#end
方法。这可能看起来很麻烦,但如果您想通过对 Lambda 函数的调用一次又一次地发布,我认为这将减少您需要再次打开连接的机会。
client.addListener('connect', () => {
console.log('connected to the queue');
console.log(`message to publish: ${JSON.stringify(message)}`);
client.publish(topic, message, (err, packet) => {
console.log(`err: ` + err);
console.log(`packet: ${JSON.stringify(packet)}`);
client.end(false, () => callback()); //This line should be added then it works as expected
});
});
在我的处理程序代码(无服务器框架)中,我向 RabbitMQ 推送了一条消息,但问题是当我发送第一条消息时,订阅者没有收到任何东西,而第二条消息他们只收到第一条消息,其余消息也是如此(当我发送一条消息时,前一条消息已送达!)。有什么想法吗?
编辑:我用非常基本和简单的代码替换了实际代码,但结果仍然相同。
Lambda- create.ts
import { APIGatewayEvent, Context, Callback, Handler } from "aws-lambda";
import { config } from "../common/config";
import publish from "../common/publisher";
export const create: Handler = (event: APIGatewayEvent, context: Context, cb: Callback) => {
console.log('test started');
context.callbackWaitsForEmptyEventLoop = false;
const topic = 'float/push';
const num = Math.random();
const message = JSON.stringify({ floatId: num });
publish(config.PUSH_BROKER_UFRL, config.PUSH_USERNAME, config.PUSH_PASSWORD, topic, message, () => {
console.log('calling the callback');
cb(null, {
statusCode: 200,
headers: {
'Access-Control-Allow-Origin': '*', // Required for CORS support to work
},
body: JSON.stringify({ id: num })
});
});
};
publisher.ts
import { Callback } from 'aws-lambda';
import { Client, connect, MqttClient, Packet } from 'mqtt';
function publish(brokerUrl: string, username: string, password: string, topic: string,
message: string, callback: (() => void)): void {
console.log('publish started');
const client: Client = connect(brokerUrl, {
username,
password
});
client.options.clientId = 'Cashmanager.portal';
client.addListener('connect', () => {
console.log('connected to the queue');
console.log(`message to publish: ${JSON.stringify(message)}`);
client.publish(topic, message, (err, packet) => {
console.log(`err: ` + err);
console.log(`packet: ${JSON.stringify(packet)}`);
callback();
});
});
}
导出默认发布;
来自 cloudwatch 的示例日志:
START RequestId: ea63e6ca-318f-11e8-b766-b78fb7754d27 Version: $LATEST 2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27 test started 2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27 publish started 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 connected to the queue 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 message to publish: "{\"floatId\":0.24342369749799642}" 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 err: undefined 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 packet: undefined 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 calling the callback END RequestId: ea63e6ca-318f-11e8-b766-b78fb7754d27
所以答案是您应该在发布这条消息后调用 client#end
方法。这可能看起来很麻烦,但如果您想通过对 Lambda 函数的调用一次又一次地发布,我认为这将减少您需要再次打开连接的机会。
client.addListener('connect', () => {
console.log('connected to the queue');
console.log(`message to publish: ${JSON.stringify(message)}`);
client.publish(topic, message, (err, packet) => {
console.log(`err: ` + err);
console.log(`packet: ${JSON.stringify(packet)}`);
client.end(false, () => callback()); //This line should be added then it works as expected
});
});