基于 SQS 的自动缩放 Fargate 服务 ApproximateNumberOfMessagesVisible
Auto Scale Fargate Service Based On SQS ApproximateNumberOfMessagesVisible
我想根据 SQS 队列的大小扩展我的 aws fargate 容器。看来我只能根据容器的 CPU 或内存使用情况进行扩展。有没有一种方法可以创建根据队列大小向外扩展和向内扩展的策略?有没有人能够根据其他 cloudwatch 指标进行扩展?
是的,你可以做到。您必须使用步长扩展策略,并且需要为 SQS 队列深度 (ApproximateNumberOfMessagesVisible) 创建警报。
转到 CloudWatch,创建一个新警报。我们将此警报称为 sqs-queue-depth-high,并在可见消息的大约数量为 1000 时触发它。
完成后,转到 ECS 到您要自动缩放的服务。单击该服务的更新。添加缩放策略并选择步进跟踪品种。您会看到有一个选项可以创建新警报(只允许您在 CPU 或 MemoryUtilization 之间进行选择),或使用现有警报。
在“使用现有警报”字段中键入 sqs-queue-depth-high 并按回车键,您应该会看到一个绿色复选标记,让您知道该名称是有效的(即警报存在)。您会看到新的下拉菜单,您现在可以在其中调整步骤策略。
这适用于任何指标警报和 ECS 服务。如果您打算尝试扩展此设置,例如针对多个环境,或者使其比 2 步更复杂,请帮自己一个忙,使用 CloudFormation 或 Terraform 来帮助管理它。没有什么比必须在 10 项服务中调整 5 步警报更糟糕的了。
AWS提供基于SQS队列扩展的解决方案:https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-using-sqs-queue.html
主要思想
- 使用公式创建 CloudWatch 自定义指标
sqs-backlog-per-task
:
sqs-backlog-per-task = sqs-messages-number / running-task-number
.
- 根据
backlogPerInstance
指标创建目标跟踪缩放策略。
实施细节
自定义指标
就我而言,所有基础设施(Fargate、SQS 和其他资源)都在 CloudFormation 堆栈中进行了描述。因此,为了计算和记录自定义指标,我决定使用 AWS Lambda 函数,该函数也在 CloudFormation 堆栈中进行了描述,并与整个基础设施一起部署。
您可以在下面找到用于记录以下自定义指标的 AWS Lambda 函数的代码片段:
sqs-backlog-per-task
- 用于缩放
running-task-number
- 用于缩放优化和调试
CloudFormation 堆栈中的 AWS SAM 语法中描述的 AWS Lambda 函数 (infrastructure.yml):
CustomMetricLoggerFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: custom-metric-logger
Handler: custom-metric-logger.handler
Runtime: nodejs8.10
MemorySize: 128
Timeout: 3
Role: !GetAtt CustomMetricLoggerFunctionRole.Arn
Environment:
Variables:
ECS_CLUSTER_NAME: !Ref Cluster
ECS_SERVICE_NAME: !GetAtt Service.Name
SQS_URL: !Ref Queue
Events:
Schedule:
Type: Schedule
Properties:
Schedule: 'cron(0/1 * * * ? *)' # every one minute
AWS Lambda Javascript 用于计算和记录的代码 (custom-metric-logger.js):
var AWS = require('aws-sdk');
exports.handler = async () => {
try {
var sqsMessagesNumber = await getSqsMessagesNumber();
var runningContainersNumber = await getRunningContainersNumber();
var backlogPerInstance = sqsMessagesNumber;
if (runningContainersNumber > 0) {
backlogPerInstance = parseInt(sqsMessagesNumber / runningContainersNumber);
}
await putRunningTaskNumberMetricData(runningContainersNumber);
await putSqsBacklogPerTaskMetricData(backlogPerInstance);
return {
statusCode: 200
};
} catch (err) {
console.log(err);
return {
statusCode: 500
};
}
};
function getSqsMessagesNumber() {
return new Promise((resolve, reject) => {
var data = {
QueueUrl: process.env.SQS_URL,
AttributeNames: ['ApproximateNumberOfMessages']
};
var sqs = new AWS.SQS();
sqs.getQueueAttributes(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(parseInt(data.Attributes.ApproximateNumberOfMessages));
}
});
});
}
function getRunningContainersNumber() {
return new Promise((resolve, reject) => {
var data = {
services: [
process.env.ECS_SERVICE_NAME
],
cluster: process.env.ECS_CLUSTER_NAME
};
var ecs = new AWS.ECS();
ecs.describeServices(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data.services[0].runningCount);
}
});
});
}
function putRunningTaskNumberMetricData(value) {
return new Promise((resolve, reject) => {
var data = {
MetricData: [{
MetricName: 'running-task-number',
Value: value,
Unit: 'Count',
Timestamp: new Date()
}],
Namespace: 'fargate-sqs-service'
};
var cloudwatch = new AWS.CloudWatch();
cloudwatch.putMetricData(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
function putSqsBacklogPerTaskMetricData(value) {
return new Promise((resolve, reject) => {
var data = {
MetricData: [{
MetricName: 'sqs-backlog-per-task',
Value: value,
Unit: 'Count',
Timestamp: new Date()
}],
Namespace: 'fargate-sqs-service'
};
var cloudwatch = new AWS.CloudWatch();
cloudwatch.putMetricData(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
目标跟踪缩放策略
然后根据 sqs-backlog-per-task
指标,我在 Cloud Formation 模板中创建了目标跟踪缩放策略。
基于 sqs-backlog-per-task
指标 (infrastructure.yml) 的目标跟踪缩放策略:
ServiceScalingPolicy:
Type: AWS::ApplicationAutoScaling::ScalingPolicy
Properties:
PolicyName: service-scaling-policy
PolicyType: TargetTrackingScaling
ScalingTargetId: !Ref ServiceScalableTarget
TargetTrackingScalingPolicyConfiguration:
ScaleInCooldown: 60
ScaleOutCooldown: 60
CustomizedMetricSpecification:
Namespace: fargate-sqs-service
MetricName: sqs-backlog-per-task
Statistic: Average
Unit: Count
TargetValue: 2000
因此,AWS Application Auto Scaling 创建和管理触发扩展策略的 CloudWatch 警报,并根据指标和目标值计算扩展调整。扩展策略根据需要添加或删除容量,以将指标保持在或接近指定的目标值。除了使指标接近目标值外,目标跟踪扩展策略还会根据负载模式变化导致的指标变化进行调整。
我写了一篇关于这个主题的博客文章,包括 docker 到 运行 的容器。
该文章可在以下位置找到:
https://allaboutaws.com/how-to-auto-scale-aws-ecs-containers-sqs-queue-metrics
DockerHub 上提供了预构建容器:
https://hub.docker.com/r/sh39sxn/ecs-autoscaling-sqs-metrics
文件位于 GitHub:
https://github.com/sh39sxn/ecs-autoscaling-sqs-metrics
希望对你有所帮助
更新到 2021 年(也许之前...)
对于那些需要它但在 CDK 中的人
示例用例:
// Create the vpc and cluster used by the queue processing service
const vpc = new ec2.Vpc(stack, 'Vpc', { maxAzs: 2 });
const cluster = new ecs.Cluster(stack, 'FargateCluster', { vpc });
const queue = new sqs.Queue(stack, 'ProcessingQueue', {
QueueName: 'FargateEventQueue'
});
// Create the queue processing service
new QueueProcessingFargateService(stack, 'QueueProcessingFargateService', {
cluster,
image: ecs.ContainerImage.fromRegistry('amazon/amazon-ecs-sample'),
desiredTaskCount: 2,
maxScalingCapacity: 5,
queue
});
来自:
https://github.com/aws/aws-cdk/blob/master/design/aws-ecs/aws-ecs-autoscaling-queue-worker.md
我想根据 SQS 队列的大小扩展我的 aws fargate 容器。看来我只能根据容器的 CPU 或内存使用情况进行扩展。有没有一种方法可以创建根据队列大小向外扩展和向内扩展的策略?有没有人能够根据其他 cloudwatch 指标进行扩展?
是的,你可以做到。您必须使用步长扩展策略,并且需要为 SQS 队列深度 (ApproximateNumberOfMessagesVisible) 创建警报。
转到 CloudWatch,创建一个新警报。我们将此警报称为 sqs-queue-depth-high,并在可见消息的大约数量为 1000 时触发它。
完成后,转到 ECS 到您要自动缩放的服务。单击该服务的更新。添加缩放策略并选择步进跟踪品种。您会看到有一个选项可以创建新警报(只允许您在 CPU 或 MemoryUtilization 之间进行选择),或使用现有警报。
在“使用现有警报”字段中键入 sqs-queue-depth-high 并按回车键,您应该会看到一个绿色复选标记,让您知道该名称是有效的(即警报存在)。您会看到新的下拉菜单,您现在可以在其中调整步骤策略。
这适用于任何指标警报和 ECS 服务。如果您打算尝试扩展此设置,例如针对多个环境,或者使其比 2 步更复杂,请帮自己一个忙,使用 CloudFormation 或 Terraform 来帮助管理它。没有什么比必须在 10 项服务中调整 5 步警报更糟糕的了。
AWS提供基于SQS队列扩展的解决方案:https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-using-sqs-queue.html
主要思想
- 使用公式创建 CloudWatch 自定义指标
sqs-backlog-per-task
:sqs-backlog-per-task = sqs-messages-number / running-task-number
. - 根据
backlogPerInstance
指标创建目标跟踪缩放策略。
实施细节
自定义指标
就我而言,所有基础设施(Fargate、SQS 和其他资源)都在 CloudFormation 堆栈中进行了描述。因此,为了计算和记录自定义指标,我决定使用 AWS Lambda 函数,该函数也在 CloudFormation 堆栈中进行了描述,并与整个基础设施一起部署。
您可以在下面找到用于记录以下自定义指标的 AWS Lambda 函数的代码片段:
sqs-backlog-per-task
- 用于缩放running-task-number
- 用于缩放优化和调试
CloudFormation 堆栈中的 AWS SAM 语法中描述的 AWS Lambda 函数 (infrastructure.yml):
CustomMetricLoggerFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: custom-metric-logger
Handler: custom-metric-logger.handler
Runtime: nodejs8.10
MemorySize: 128
Timeout: 3
Role: !GetAtt CustomMetricLoggerFunctionRole.Arn
Environment:
Variables:
ECS_CLUSTER_NAME: !Ref Cluster
ECS_SERVICE_NAME: !GetAtt Service.Name
SQS_URL: !Ref Queue
Events:
Schedule:
Type: Schedule
Properties:
Schedule: 'cron(0/1 * * * ? *)' # every one minute
AWS Lambda Javascript 用于计算和记录的代码 (custom-metric-logger.js):
var AWS = require('aws-sdk');
exports.handler = async () => {
try {
var sqsMessagesNumber = await getSqsMessagesNumber();
var runningContainersNumber = await getRunningContainersNumber();
var backlogPerInstance = sqsMessagesNumber;
if (runningContainersNumber > 0) {
backlogPerInstance = parseInt(sqsMessagesNumber / runningContainersNumber);
}
await putRunningTaskNumberMetricData(runningContainersNumber);
await putSqsBacklogPerTaskMetricData(backlogPerInstance);
return {
statusCode: 200
};
} catch (err) {
console.log(err);
return {
statusCode: 500
};
}
};
function getSqsMessagesNumber() {
return new Promise((resolve, reject) => {
var data = {
QueueUrl: process.env.SQS_URL,
AttributeNames: ['ApproximateNumberOfMessages']
};
var sqs = new AWS.SQS();
sqs.getQueueAttributes(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(parseInt(data.Attributes.ApproximateNumberOfMessages));
}
});
});
}
function getRunningContainersNumber() {
return new Promise((resolve, reject) => {
var data = {
services: [
process.env.ECS_SERVICE_NAME
],
cluster: process.env.ECS_CLUSTER_NAME
};
var ecs = new AWS.ECS();
ecs.describeServices(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data.services[0].runningCount);
}
});
});
}
function putRunningTaskNumberMetricData(value) {
return new Promise((resolve, reject) => {
var data = {
MetricData: [{
MetricName: 'running-task-number',
Value: value,
Unit: 'Count',
Timestamp: new Date()
}],
Namespace: 'fargate-sqs-service'
};
var cloudwatch = new AWS.CloudWatch();
cloudwatch.putMetricData(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
function putSqsBacklogPerTaskMetricData(value) {
return new Promise((resolve, reject) => {
var data = {
MetricData: [{
MetricName: 'sqs-backlog-per-task',
Value: value,
Unit: 'Count',
Timestamp: new Date()
}],
Namespace: 'fargate-sqs-service'
};
var cloudwatch = new AWS.CloudWatch();
cloudwatch.putMetricData(data, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
目标跟踪缩放策略
然后根据 sqs-backlog-per-task
指标,我在 Cloud Formation 模板中创建了目标跟踪缩放策略。
基于 sqs-backlog-per-task
指标 (infrastructure.yml) 的目标跟踪缩放策略:
ServiceScalingPolicy:
Type: AWS::ApplicationAutoScaling::ScalingPolicy
Properties:
PolicyName: service-scaling-policy
PolicyType: TargetTrackingScaling
ScalingTargetId: !Ref ServiceScalableTarget
TargetTrackingScalingPolicyConfiguration:
ScaleInCooldown: 60
ScaleOutCooldown: 60
CustomizedMetricSpecification:
Namespace: fargate-sqs-service
MetricName: sqs-backlog-per-task
Statistic: Average
Unit: Count
TargetValue: 2000
因此,AWS Application Auto Scaling 创建和管理触发扩展策略的 CloudWatch 警报,并根据指标和目标值计算扩展调整。扩展策略根据需要添加或删除容量,以将指标保持在或接近指定的目标值。除了使指标接近目标值外,目标跟踪扩展策略还会根据负载模式变化导致的指标变化进行调整。
我写了一篇关于这个主题的博客文章,包括 docker 到 运行 的容器。 该文章可在以下位置找到: https://allaboutaws.com/how-to-auto-scale-aws-ecs-containers-sqs-queue-metrics
DockerHub 上提供了预构建容器: https://hub.docker.com/r/sh39sxn/ecs-autoscaling-sqs-metrics
文件位于 GitHub: https://github.com/sh39sxn/ecs-autoscaling-sqs-metrics
希望对你有所帮助
更新到 2021 年(也许之前...)
对于那些需要它但在 CDK 中的人
示例用例:
// Create the vpc and cluster used by the queue processing service
const vpc = new ec2.Vpc(stack, 'Vpc', { maxAzs: 2 });
const cluster = new ecs.Cluster(stack, 'FargateCluster', { vpc });
const queue = new sqs.Queue(stack, 'ProcessingQueue', {
QueueName: 'FargateEventQueue'
});
// Create the queue processing service
new QueueProcessingFargateService(stack, 'QueueProcessingFargateService', {
cluster,
image: ecs.ContainerImage.fromRegistry('amazon/amazon-ecs-sample'),
desiredTaskCount: 2,
maxScalingCapacity: 5,
queue
});
来自:
https://github.com/aws/aws-cdk/blob/master/design/aws-ecs/aws-ecs-autoscaling-queue-worker.md