如何有效管理消息可见性超时?
How to manage message visibility timeout effectively?
根据事件的不同,我们的流程需要 30 秒到 5 分钟不等。我们的基本架构如下所示(python 3.9,boto3 到 send_message 到 sqs):
- lambda 函数 A 查找数据库并决定事件
- 事件被推入标准 SQS 队列(VisibilityTimeout:600)
- lambda 函数 B 由点 (2) 的 SQS 队列触发 - 由于我们希望一次处理一条消息,因此触发器的 BatchSize 设置为 1,函数的 ReservedConcurentExecutions 也设置为 1。函数超时是 600。
VPC和NAT网关配置完成。
我们发现后续消息总是以 10 分钟为间隔进行处理,尽管函数调用早先已成功完成。
我们打算更有效地管理时间,并尝试将消息可见性超时默认缩短为 1 分钟,并在函数 B 执行期间每 30 秒延长 1 分钟(我们使用 boto3 client.change_message_visibility)。它似乎不起作用 - 几乎所有消息都在我们的死信队列中结束(CloudWatch 日志没有提示任何错误)。
- 是否应该在成功执行 lambda 后删除消息导致立即处理下一条消息(我们的基本场景)?
- 我们在尝试主动管理消息可见性超时时做错了什么?
我们的代码:
- 基本代码(在我们尝试优化整个过程的时间之前;至少它的工作方式是成功处理所有消息;但是它花费了太多时间,因为消息以 10 分钟为间隔处理即使有些处理在例如 2 分钟内完成):
Template.yaml 关于 SQS 和 lambda 函数的摘录 B:
Globals:
Function:
Timeout: 30
Environment:
Variables:
SQS_QUEUE_URL: !Ref SqsQueue
Resources:
SqsQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 600
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- DeadLetterQueue
- Arn
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue
LambdaFunctionBRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: receiveAndDeleteFromQueue
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
sqs:ReceiveMessage,
sqs:ChangeMessageVisibility,
sqs:DeleteMessage,
sqs:GetQueueAttributes
]
Resource: !GetAtt SqsQueue.Arn
- PolicyName: accessVpc
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
logs:CreateLogGroup,
logs:CreateLogStream,
logs:PutLogEvents,
ec2:CreateNetworkInterface,
ec2:DescribeNetworkInterfaces,
ec2:DeleteNetworkInterface,
ec2:AssignPrivateIpAddresses,
ec2:UnassignPrivateIpAddresses
]
Resource: "*"
LambdaFunctionB:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/function_b/
Handler: app.lambda_handler
Runtime: python3.9
Role: !GetAtt LambdaFunctionBExecutionRole.Arn
Architectures:
- x86_64
VpcConfig:
SecurityGroupIds:
- "xxx"
SubnetIds:
- "xxx"
- "xxx"
- "xxx"
FileSystemConfigs:
- Arn: !GetAtt AccessPoint.Arn
LocalMountPath: /mnt/lambda
ReservedConcurrentExecutions: 1
Events:
SqsEvent:
Type: SQS
Properties:
Queue: !GetAtt SqsQueue.Arn
BatchSize: 1
Enabled: True
Timeout: 600
MemorySize: 512
Lambda 函数 B 代码app.py:
import os
import boto3
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
<some business logic here>
return status
- 在我们决定尝试优化时间效率后,我们为 SqsQueue 更改了 VisibilityTimeout:60,并为 lambda 函数 B 更改了 app.py 文件,如下所示:
import os
import boto3
from threading import Timer
from time import sleep
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
class RepeatingTimer(Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, timeout):
sqs_client.change_message_visibility(
QueueUrl=sqs_queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=timeout
)
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
t = RepeatingTimer(30, increase_visibility_timeout, [sqs_client, sqs_queue_url, receipt_handle, 60])
t.start()
<some business logic here>
increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, 5)
t.cancel()
return status
处理来自 Amazon SQS 队列的消息的正常过程是:
- 进程向 SQS 队列发送消息
- 由于您有一个在 SQS 队列上带有触发器的 AWS Lambda 函数,并且它配置了 批量大小 1,因此只会将一条消息传递给 Lambda函数
- 此外,由于 Lambda 函数配置了 预留并发 1,因此不会有并行处理——所有消息都将由 Lambda 的一个实例处理函数
- 当消息传递给 Lambda 函数时,隐形超时开始
- 当Lambda函数成功退出后,Lambda服务会从SQS队列中删除消息
- 但是,如果在 之前 Lambda 函数完成处理消息,则消息将重新出现在队列中(期望消息无法处理)。这将导致将来再次处理该消息。
- 也可以向 Amazon SQS 发送一个 heartbeat 来告诉它消息仍在处理中并且 延长隐身超时
一般来说,隐身超时应该设置得足够高,以确保它仅在处理出现问题时触发。例如,如果处理一条消息通常需要 1 分钟,您可以将隐身期设置为 4 分钟。这允许处理可能需要更长时间的异常情况,而不会在实际成功处理消息时意外地重新处理消息。
你的情况
您提到 “lambda 会等待 SQS 队列超时过去以开始处理下一条消息”。这表明您的过程中发生了一些奇怪的事情。通常,当 Lambda 函数无错退出时,Lambda 服务将使用下一条消息再次调用 Lambda 函数。在消息不可见期和执行 Lambda 函数之间没有直接的 link——也就是说,根据您的配置,当达到消息超时时,没有任何东西会 'kill' 现有的 Lambda 函数,也没有任何东西会延迟下一次 Lambda 调用,直到发生消息超时。但是,当达到 Lambda 函数超时时,函数 将 变为 'killed'。因此,听起来您的 Lambda 函数在完成处理消息 之前是 timing-out。您应该将 Lambda 函数的超时时间增加到比预期 运行 持续时间大几倍。 (这是一个独立于 SQS 队列隐身期的配置。)
根据事件的不同,我们的流程需要 30 秒到 5 分钟不等。我们的基本架构如下所示(python 3.9,boto3 到 send_message 到 sqs):
- lambda 函数 A 查找数据库并决定事件
- 事件被推入标准 SQS 队列(VisibilityTimeout:600)
- lambda 函数 B 由点 (2) 的 SQS 队列触发 - 由于我们希望一次处理一条消息,因此触发器的 BatchSize 设置为 1,函数的 ReservedConcurentExecutions 也设置为 1。函数超时是 600。
VPC和NAT网关配置完成。
我们发现后续消息总是以 10 分钟为间隔进行处理,尽管函数调用早先已成功完成。 我们打算更有效地管理时间,并尝试将消息可见性超时默认缩短为 1 分钟,并在函数 B 执行期间每 30 秒延长 1 分钟(我们使用 boto3 client.change_message_visibility)。它似乎不起作用 - 几乎所有消息都在我们的死信队列中结束(CloudWatch 日志没有提示任何错误)。
- 是否应该在成功执行 lambda 后删除消息导致立即处理下一条消息(我们的基本场景)?
- 我们在尝试主动管理消息可见性超时时做错了什么?
我们的代码:
- 基本代码(在我们尝试优化整个过程的时间之前;至少它的工作方式是成功处理所有消息;但是它花费了太多时间,因为消息以 10 分钟为间隔处理即使有些处理在例如 2 分钟内完成): Template.yaml 关于 SQS 和 lambda 函数的摘录 B:
Globals:
Function:
Timeout: 30
Environment:
Variables:
SQS_QUEUE_URL: !Ref SqsQueue
Resources:
SqsQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 600
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- DeadLetterQueue
- Arn
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue
LambdaFunctionBRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: receiveAndDeleteFromQueue
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
sqs:ReceiveMessage,
sqs:ChangeMessageVisibility,
sqs:DeleteMessage,
sqs:GetQueueAttributes
]
Resource: !GetAtt SqsQueue.Arn
- PolicyName: accessVpc
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: [
logs:CreateLogGroup,
logs:CreateLogStream,
logs:PutLogEvents,
ec2:CreateNetworkInterface,
ec2:DescribeNetworkInterfaces,
ec2:DeleteNetworkInterface,
ec2:AssignPrivateIpAddresses,
ec2:UnassignPrivateIpAddresses
]
Resource: "*"
LambdaFunctionB:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/function_b/
Handler: app.lambda_handler
Runtime: python3.9
Role: !GetAtt LambdaFunctionBExecutionRole.Arn
Architectures:
- x86_64
VpcConfig:
SecurityGroupIds:
- "xxx"
SubnetIds:
- "xxx"
- "xxx"
- "xxx"
FileSystemConfigs:
- Arn: !GetAtt AccessPoint.Arn
LocalMountPath: /mnt/lambda
ReservedConcurrentExecutions: 1
Events:
SqsEvent:
Type: SQS
Properties:
Queue: !GetAtt SqsQueue.Arn
BatchSize: 1
Enabled: True
Timeout: 600
MemorySize: 512
Lambda 函数 B 代码app.py:
import os
import boto3
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
<some business logic here>
return status
- 在我们决定尝试优化时间效率后,我们为 SqsQueue 更改了 VisibilityTimeout:60,并为 lambda 函数 B 更改了 app.py 文件,如下所示:
import os
import boto3
from threading import Timer
from time import sleep
<some other imports>
sqs_queue_url = os.environ.get("SQS_QUEUE_URL")
sqs_client = boto3.client("sqs")
class RepeatingTimer(Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
def increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, timeout):
sqs_client.change_message_visibility(
QueueUrl=sqs_queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=timeout
)
def lambda_handler(event, context):
status = "ok"
record = event["Records"][0]
message = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
t = RepeatingTimer(30, increase_visibility_timeout, [sqs_client, sqs_queue_url, receipt_handle, 60])
t.start()
<some business logic here>
increase_visibility_timeout(sqs_client, sqs_queue_url, receipt_handle, 5)
t.cancel()
return status
处理来自 Amazon SQS 队列的消息的正常过程是:
- 进程向 SQS 队列发送消息
- 由于您有一个在 SQS 队列上带有触发器的 AWS Lambda 函数,并且它配置了 批量大小 1,因此只会将一条消息传递给 Lambda函数
- 此外,由于 Lambda 函数配置了 预留并发 1,因此不会有并行处理——所有消息都将由 Lambda 的一个实例处理函数
- 当消息传递给 Lambda 函数时,隐形超时开始
- 当Lambda函数成功退出后,Lambda服务会从SQS队列中删除消息
- 但是,如果在 之前 Lambda 函数完成处理消息,则消息将重新出现在队列中(期望消息无法处理)。这将导致将来再次处理该消息。
- 也可以向 Amazon SQS 发送一个 heartbeat 来告诉它消息仍在处理中并且 延长隐身超时
一般来说,隐身超时应该设置得足够高,以确保它仅在处理出现问题时触发。例如,如果处理一条消息通常需要 1 分钟,您可以将隐身期设置为 4 分钟。这允许处理可能需要更长时间的异常情况,而不会在实际成功处理消息时意外地重新处理消息。
你的情况
您提到 “lambda 会等待 SQS 队列超时过去以开始处理下一条消息”。这表明您的过程中发生了一些奇怪的事情。通常,当 Lambda 函数无错退出时,Lambda 服务将使用下一条消息再次调用 Lambda 函数。在消息不可见期和执行 Lambda 函数之间没有直接的 link——也就是说,根据您的配置,当达到消息超时时,没有任何东西会 'kill' 现有的 Lambda 函数,也没有任何东西会延迟下一次 Lambda 调用,直到发生消息超时。但是,当达到 Lambda 函数超时时,函数 将 变为 'killed'。因此,听起来您的 Lambda 函数在完成处理消息 之前是 timing-out。您应该将 Lambda 函数的超时时间增加到比预期 运行 持续时间大几倍。 (这是一个独立于 SQS 队列隐身期的配置。)