MqttClient 发送最后一条消息的前一条

MqttClient sends the one before last message

在我的处理程序代码(无服务器框架)中,我向 RabbitMQ 推送了一条消息,但问题是当我发送第一条消息时,订阅者没有收到任何东西,而第二条消息他们只收到第一条消息,其余消息也是如此(当我发送一条消息时,前一条消息已送达!)。有什么想法吗?

编辑:我用非常基本和简单的代码替换了实际代码,但结果仍然相同。

Lambda- create.ts

import { APIGatewayEvent, Context, Callback, Handler } from "aws-lambda";
import { config } from "../common/config";
import publish from "../common/publisher";

export const create: Handler = (event: APIGatewayEvent, context: Context, cb: Callback) => {
    console.log('test started');
    context.callbackWaitsForEmptyEventLoop = false;
    const topic = 'float/push';
    const num = Math.random();
    const message = JSON.stringify({ floatId: num });
    publish(config.PUSH_BROKER_UFRL, config.PUSH_USERNAME, config.PUSH_PASSWORD, topic, message, () => {
        console.log('calling the callback');
        cb(null, {
            statusCode: 200,
            headers: {
                'Access-Control-Allow-Origin': '*', // Required for CORS support to work
            },
            body: JSON.stringify({ id: num })
        });
    });
};

publisher.ts

import { Callback } from 'aws-lambda';
import { Client, connect, MqttClient, Packet } from 'mqtt';

function publish(brokerUrl: string, username: string, password: string, topic: string,
                 message: string, callback: (() => void)): void {
    console.log('publish started');
    const client: Client = connect(brokerUrl, {
        username,
        password
    });
    client.options.clientId = 'Cashmanager.portal';
    client.addListener('connect', () => {
        console.log('connected to the queue');
        console.log(`message to publish: ${JSON.stringify(message)}`);
        client.publish(topic, message, (err, packet) => {
            console.log(`err: ` + err);
            console.log(`packet: ${JSON.stringify(packet)}`);
            callback();
        });
    });
}

导出默认发布;

来自 cloudwatch 的示例日志:

START RequestId: ea63e6ca-318f-11e8-b766-b78fb7754d27 Version: $LATEST 2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27 test started 2018-03-27T07:24:41.744Z ea63e6ca-318f-11e8-b766-b78fb7754d27 publish started 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 connected to the queue 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 message to publish: "{\"floatId\":0.24342369749799642}" 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 err: undefined 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 packet: undefined 2018-03-27T07:24:41.767Z ea63e6ca-318f-11e8-b766-b78fb7754d27 calling the callback END RequestId: ea63e6ca-318f-11e8-b766-b78fb7754d27

所以答案是您应该在发布这条消息后调用 client#end 方法。这可能看起来很麻烦,但如果您想通过对 Lambda 函数的调用一次又一次地发布,我认为这将减少您需要再次打开连接的机会。

client.addListener('connect', () => {
    console.log('connected to the queue');
    console.log(`message to publish: ${JSON.stringify(message)}`);
    client.publish(topic, message, (err, packet) => {
        console.log(`err: ` + err);
        console.log(`packet: ${JSON.stringify(packet)}`);
        client.end(false, () => callback()); //This line should be added then it works as expected

    });
});