AWS API 网关 websocket 接收消息不一致
AWS API gateway websocket receives messages inconsistently
我在 api 网关中有一个 websocket 连接到如下所示的 lambda:
const AWS = require('aws-sdk');
const amqp = require('amqplib');
const api = new AWS.ApiGatewayManagementApi({
endpoint: 'MY_ENDPOINT',
});
async function sendMsgToApp(response, connectionId) {
console.log('=========== posting reply');
const params = {
ConnectionId: connectionId,
Data: Buffer.from(response),
};
return api.postToConnection(params).promise();
}
let rmqServerUrl =
'MY_RMQ_SERVER_URL';
let rmqServerConn = null;
exports.handler = async event => {
console.log('websocket event:', event);
const { routeKey: route, connectionId } = event.requestContext;
switch (route) {
case '$connect':
console.log('user connected');
const creds = event.queryStringParameters.x;
console.log('============ x.length:', creds.length);
const decodedCreds = Buffer.from(creds, 'base64').toString('utf-8');
try {
const conn = await amqp.connect(
`amqps://${decodedCreds}@${rmqServerUrl}`
);
const channel = await conn.createChannel();
console.log('============ created channel successfully:');
rmqServerConn = conn;
const [userId] = decodedCreds.split(':');
const { queue } = await channel.assertQueue(userId, {
durable: true,
autoDelete: false,
});
console.log('============ userId:', userId, 'queue:', queue);
channel.consume(queue, msg => {
console.log('========== msg:', msg);
const { content } = msg;
const msgString = content.toString('utf-8');
console.log('========== msgString:', msgString);
sendMsgToApp(msgString, connectionId)
.then(res => {
console.log(
'================= sent queued message to the app, will ack, outcome:',
res
);
try {
channel.ack(msg);
} catch (e) {
console.log(
'================= error acking message:',
e
);
}
})
.catch(e => {
console.log(
'================= error sending queued message to the app, will not ack, error:',
e
);
});
});
} catch (e) {
console.log(
'=========== error initializing amqp connection',
e
);
if (rmqServerConn) {
await rmqServerConn.close();
}
const response = {
statusCode: 401,
body: JSON.stringify('failed auth!'),
};
return response;
}
break;
case '$disconnect':
console.log('user disconnected');
if (rmqServerConn) {
await rmqServerConn.close();
}
break;
case 'message':
console.log('message route');
await sendMsgToApp('test', connectionId);
break;
default:
console.log('unknown route', route);
break;
}
const response = {
statusCode: 200,
body: JSON.stringify('Hello from websocket Lambda!'),
};
return response;
};
amqp 连接用于由 amazonmq 提供的 rabbitmq 服务器。我遇到的问题是发布到队列的消息要么根本不显示在 .consume
回调中,要么只在 websocket 断开连接并重新连接后才显示。从本质上讲,它们会在很久以后的某个时间点丢失,然后它们会意外出现。那是在 websocket 中。即使它们确实出现了,它们也不会被发送到连接到 websocket 的客户端(在本例中为应用程序)。这可能是什么问题?
这里的问题是我对 API Gateway 的 websockets 的工作方式有错误的想法。 API 网关维护 websocket 连接但不维护 lambda 本身。我将我的 .consume
订阅逻辑放在 lambda 中,这不起作用,因为 lambda 运行并终止而不是保持活动状态。更好的方法是 make the queue an event source for the lambda。但是,这对我也不起作用,因为它要求您在设置 lambda 时了解您的队列。我的队列是动态创建的,因此违反了要求。我最终在 vps.
上建立了一个 rmq 服务器
我在 api 网关中有一个 websocket 连接到如下所示的 lambda:
const AWS = require('aws-sdk');
const amqp = require('amqplib');
const api = new AWS.ApiGatewayManagementApi({
endpoint: 'MY_ENDPOINT',
});
async function sendMsgToApp(response, connectionId) {
console.log('=========== posting reply');
const params = {
ConnectionId: connectionId,
Data: Buffer.from(response),
};
return api.postToConnection(params).promise();
}
let rmqServerUrl =
'MY_RMQ_SERVER_URL';
let rmqServerConn = null;
exports.handler = async event => {
console.log('websocket event:', event);
const { routeKey: route, connectionId } = event.requestContext;
switch (route) {
case '$connect':
console.log('user connected');
const creds = event.queryStringParameters.x;
console.log('============ x.length:', creds.length);
const decodedCreds = Buffer.from(creds, 'base64').toString('utf-8');
try {
const conn = await amqp.connect(
`amqps://${decodedCreds}@${rmqServerUrl}`
);
const channel = await conn.createChannel();
console.log('============ created channel successfully:');
rmqServerConn = conn;
const [userId] = decodedCreds.split(':');
const { queue } = await channel.assertQueue(userId, {
durable: true,
autoDelete: false,
});
console.log('============ userId:', userId, 'queue:', queue);
channel.consume(queue, msg => {
console.log('========== msg:', msg);
const { content } = msg;
const msgString = content.toString('utf-8');
console.log('========== msgString:', msgString);
sendMsgToApp(msgString, connectionId)
.then(res => {
console.log(
'================= sent queued message to the app, will ack, outcome:',
res
);
try {
channel.ack(msg);
} catch (e) {
console.log(
'================= error acking message:',
e
);
}
})
.catch(e => {
console.log(
'================= error sending queued message to the app, will not ack, error:',
e
);
});
});
} catch (e) {
console.log(
'=========== error initializing amqp connection',
e
);
if (rmqServerConn) {
await rmqServerConn.close();
}
const response = {
statusCode: 401,
body: JSON.stringify('failed auth!'),
};
return response;
}
break;
case '$disconnect':
console.log('user disconnected');
if (rmqServerConn) {
await rmqServerConn.close();
}
break;
case 'message':
console.log('message route');
await sendMsgToApp('test', connectionId);
break;
default:
console.log('unknown route', route);
break;
}
const response = {
statusCode: 200,
body: JSON.stringify('Hello from websocket Lambda!'),
};
return response;
};
amqp 连接用于由 amazonmq 提供的 rabbitmq 服务器。我遇到的问题是发布到队列的消息要么根本不显示在 .consume
回调中,要么只在 websocket 断开连接并重新连接后才显示。从本质上讲,它们会在很久以后的某个时间点丢失,然后它们会意外出现。那是在 websocket 中。即使它们确实出现了,它们也不会被发送到连接到 websocket 的客户端(在本例中为应用程序)。这可能是什么问题?
这里的问题是我对 API Gateway 的 websockets 的工作方式有错误的想法。 API 网关维护 websocket 连接但不维护 lambda 本身。我将我的 .consume
订阅逻辑放在 lambda 中,这不起作用,因为 lambda 运行并终止而不是保持活动状态。更好的方法是 make the queue an event source for the lambda。但是,这对我也不起作用,因为它要求您在设置 lambda 时了解您的队列。我的队列是动态创建的,因此违反了要求。我最终在 vps.