AWS API 网关 websocket 接收消息不一致

AWS API gateway websocket receives messages inconsistently

我在 api 网关中有一个 websocket 连接到如下所示的 lambda:

const AWS = require('aws-sdk');
const amqp = require('amqplib');

const api = new AWS.ApiGatewayManagementApi({
    endpoint: 'MY_ENDPOINT',
});

async function sendMsgToApp(response, connectionId) {
    console.log('=========== posting reply');
    const params = {
        ConnectionId: connectionId,
        Data: Buffer.from(response),
    };
    return api.postToConnection(params).promise();
}

let rmqServerUrl =
    'MY_RMQ_SERVER_URL';
let rmqServerConn = null;

exports.handler = async event => {
    console.log('websocket event:', event);
    const { routeKey: route, connectionId } = event.requestContext;

    switch (route) {
        case '$connect':
            console.log('user connected');
            const creds = event.queryStringParameters.x;
            console.log('============ x.length:', creds.length);
            const decodedCreds = Buffer.from(creds, 'base64').toString('utf-8');
            try {
                const conn = await amqp.connect(
                    `amqps://${decodedCreds}@${rmqServerUrl}`
                );
                const channel = await conn.createChannel();
                console.log('============ created channel successfully:');
                rmqServerConn = conn;
                const [userId] = decodedCreds.split(':');
                const { queue } = await channel.assertQueue(userId, {
                    durable: true,
                    autoDelete: false,
                });
                console.log('============ userId:', userId, 'queue:', queue);
                channel.consume(queue, msg => {
                    console.log('========== msg:', msg);
                    const { content } = msg;
                    const msgString = content.toString('utf-8');
                    console.log('========== msgString:', msgString);
                    sendMsgToApp(msgString, connectionId)
                        .then(res => {
                            console.log(
                                '================= sent queued message to the app, will ack, outcome:',
                                res
                            );
                            try {
                                channel.ack(msg);
                            } catch (e) {
                                console.log(
                                    '================= error acking message:',
                                    e
                                );
                            }
                        })
                        .catch(e => {
                            console.log(
                                '================= error sending queued message to the app, will not ack, error:',
                                e
                            );
                        });
                });
            } catch (e) {
                console.log(
                    '=========== error initializing amqp connection',
                    e
                );
                if (rmqServerConn) {
                    await rmqServerConn.close();
                }
                const response = {
                    statusCode: 401,
                    body: JSON.stringify('failed auth!'),
                };
                return response;
            }
            break;
        case '$disconnect':
            console.log('user disconnected');
            if (rmqServerConn) {
                await rmqServerConn.close();
            }
            break;
        case 'message':
            console.log('message route');
            await sendMsgToApp('test', connectionId);
            break;
        default:
            console.log('unknown route', route);
            break;
    }
    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from websocket Lambda!'),
    };
    return response;
};

amqp 连接用于由 amazonmq 提供的 rabbitmq 服务器。我遇到的问题是发布到队列的消息要么根本不显示在 .consume 回调中,要么只在 websocket 断开连接并重新连接后才显示。从本质上讲,它们会在很久以后的某个时间点丢失,然后它们会意外出现。那是在 websocket 中。即使它们确实出现了,它们也不会被发送到连接到 websocket 的客户端(在本例中为应用程序)。这可能是什么问题?

这里的问题是我对 API Gateway 的 websockets 的工作方式有错误的想法。 API 网关维护 websocket 连接但不维护 lambda 本身。我将我的 .consume 订阅逻辑放在 lambda 中,这不起作用,因为 lambda 运行并终止而不是保持活动状态。更好的方法是 make the queue an event source for the lambda。但是,这对我也不起作用,因为它要求您在设置 lambda 时了解您的队列。我的队列是动态创建的,因此违反了要求。我最终在 vps.

上建立了一个 rmq 服务器