如何聚合 AWS SQS ApproximateNumberOfMessages
How to aggregate AWS SQS ApproximateNumberOfMessages
给定 X 个 SQS 队列,我如何将 ApproximateNumberOfMessages 聚合到一个 CloudWatch 指标中?
我想要一个基于队列中有多少消息的自动缩放组规模。使用多个 CloudWatch 警报(每个队列一个)会导致问题,因为一个队列将为空,而其他队列为 "full".
我实现这一目标的方法是将 AWS Lambda 与 Node.js 结合使用。我每分钟向 运行 lambda 函数添加一个 CloudWatch 事件触发器。这会查询 sqs 队列,然后创建一个自定义 CloudWatch 指标,然后您可以将其用于自动扩展组进行扩展。
var AWS = require('aws-sdk');
var sqs = new AWS.SQS();
var cloudWatch = new AWS.CloudWatch();
var queueUrls = ['https://sqs.REGION.amazonaws.com/ACCOUNT-NUMBER/queueUrl1','https://sqs.REGION.amazonaws.com/ACCOUNT-NUMBER/queueUrl2'];
exports.handler = (event, context, callback) => {
var fn = function (url) {
return new Promise(resolve => {
var sqsParams = {
AttributeNames: ['ApproximateNumberOfMessages'],
QueueUrl: url
};
sqs.getQueueAttributes(sqsParams, function(err,data){
if(err)
{
console.log(err,err.stack);
context.fail(err);
}
else
{
resolve({name: url.split('/').pop(), messageCount: parseInt(data.Attributes.ApproximateNumberOfMessages)});
}
});
});
};
var actions = queueUrls.map(fn);
Promise.all(actions).then(function(queues) {
var messageCount = queues.map(function(m){return m.messageCount;});
var queueNames = queues.map(function(n){return n.name;}).join();
var metricParams = {
MetricData:[{
MetricName: 'ApproximateNumberOfMessages',
Dimensions:[{
Name: 'QueueName',
Value: queueNames
}],
Unit: 'Count',
StatisticValues: {
Maximum: Math.max.apply(Math, messageCount),
Minimum: Math.min.apply(Math, messageCount),
SampleCount: queues.length,
Sum: messageCount.reduce((pv, cv) => pv+cv, 0)
}
}],
Namespace: 'AWS/SQS'
};
cloudWatch.putMetricData(metricParams, function(err, metricData){
if(err) console.log(err,err.stack);
else console.log(metricData);
});
});
};
此代码显然可以优化以处理 2 个以上的队列,并且可能受益于异步瀑布。
编辑:更新为使用承诺。
EDIT2:连接 CloudWatch 指标的队列名称
给定 X 个 SQS 队列,我如何将 ApproximateNumberOfMessages 聚合到一个 CloudWatch 指标中?
我想要一个基于队列中有多少消息的自动缩放组规模。使用多个 CloudWatch 警报(每个队列一个)会导致问题,因为一个队列将为空,而其他队列为 "full".
我实现这一目标的方法是将 AWS Lambda 与 Node.js 结合使用。我每分钟向 运行 lambda 函数添加一个 CloudWatch 事件触发器。这会查询 sqs 队列,然后创建一个自定义 CloudWatch 指标,然后您可以将其用于自动扩展组进行扩展。
var AWS = require('aws-sdk');
var sqs = new AWS.SQS();
var cloudWatch = new AWS.CloudWatch();
var queueUrls = ['https://sqs.REGION.amazonaws.com/ACCOUNT-NUMBER/queueUrl1','https://sqs.REGION.amazonaws.com/ACCOUNT-NUMBER/queueUrl2'];
exports.handler = (event, context, callback) => {
var fn = function (url) {
return new Promise(resolve => {
var sqsParams = {
AttributeNames: ['ApproximateNumberOfMessages'],
QueueUrl: url
};
sqs.getQueueAttributes(sqsParams, function(err,data){
if(err)
{
console.log(err,err.stack);
context.fail(err);
}
else
{
resolve({name: url.split('/').pop(), messageCount: parseInt(data.Attributes.ApproximateNumberOfMessages)});
}
});
});
};
var actions = queueUrls.map(fn);
Promise.all(actions).then(function(queues) {
var messageCount = queues.map(function(m){return m.messageCount;});
var queueNames = queues.map(function(n){return n.name;}).join();
var metricParams = {
MetricData:[{
MetricName: 'ApproximateNumberOfMessages',
Dimensions:[{
Name: 'QueueName',
Value: queueNames
}],
Unit: 'Count',
StatisticValues: {
Maximum: Math.max.apply(Math, messageCount),
Minimum: Math.min.apply(Math, messageCount),
SampleCount: queues.length,
Sum: messageCount.reduce((pv, cv) => pv+cv, 0)
}
}],
Namespace: 'AWS/SQS'
};
cloudWatch.putMetricData(metricParams, function(err, metricData){
if(err) console.log(err,err.stack);
else console.log(metricData);
});
});
};
此代码显然可以优化以处理 2 个以上的队列,并且可能受益于异步瀑布。
编辑:更新为使用承诺。
EDIT2:连接 CloudWatch 指标的队列名称