SQS - 即使将 maxNumberOfMessages 设置为 1,所有消息都会发送

SQS - all messages goes into flight even when maxNumberOfMessages is set to 1

我根据 sqs 中的消息数量定义了 pod 缩放。我希望每个 pod 处理 1 条消息。

所以如果我有 3 条消息,我将有 3 条 pods 并且每个处理 1 条消息。

这就是我从 sqs 中检索消息的方式。 withMaxNumberOfMessages(1)

ReceiveMessageRequest receiveMessageRequest = new 
ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());

我可以看到它选择的消息数是 1。

我面临的问题是当 1 个 pod 运行 时,队列中的所有消息都进入 flight-mode。剩下的 pods 读取零条消息。

为什么会这样。即使我将 maxNumberOfMessages 指定为 1,为什么所有消息都进入 flight-mod。我希望它只选择 1 条消息并且该消息进入 flight-mode 并且其余消息保留在队列中并可用于其他 pods

这就是我 运行 pod 启动时的代码

@EventListener(ApplicationReadyEvent.class)
    public void init() throws InterruptedException {
        SQSS3Event message = sqsRepository.getMessage(queueUrl);
        while(message != null){
            System.out.println(message.getBucketName());
            System.out.println(message.getFileName());
            System.out.println(message.getReceiptHandle());
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 70);
            Thread.sleep(60000);
            sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 10);
            sqsRepository.deleteMessage(queueUrl,message.getReceiptHandle());

            message = sqsRepository.getMessage(queueUrl);
        }
        System.out.println("No more messages to process");
    }

这是从 SQS 检索消息的辅助方法

public SQSS3Event getMessage(String queueUrl){
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    System.out.println("Number of messages - "+messages.size());
    if(messages.size()>0) {
        S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
        return SQSS3Event.builder()
                .bucketName(notification.getRecords().get(0).getS3().getBucket().getName())
                .fileName(notification.getRecords().get(0).getS3().getObject().getKey())
                .receiptHandle(messages.get(0).getReceiptHandle())
                .build();
    }
    else {
        return null;
    }
}

在读取消息之前和之后添加打印语句来读取队列属性

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(5);
GetQueueAttributesResult att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################Before reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();

System.out.println("No of messages in the result - "+messages.size());

S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################After reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");

Output

##########################Before reading##########################
No of Messages - 4
No of Messages on Flight - 0
##################################################################
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 4
##################################################################

随踪

 ##########################Before reading##########################
No of Messages - 2
No of Messages on Flight - 0
##################################################################
2022-04-07 21:59:24.036 TRACE 21096 --- [           main] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #1 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
2022-04-07 21:59:24.166 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:14:39.050963Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "2e30a6e9"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 9428bd5190b9d47af3368b3f67c62d02
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:15:50.375772Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "a8ba5ab3"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler      : Expected  MD5 of message body: 18ce2856addac8a08c394ce8fbd7d315
2022-04-07 21:59:24.167 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Queue http://localhost:4576/queue/upload-notifications now has 1 receive results cached 
2022-04-07 21:59:24.168 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer    : Spawned receive batch #2 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 2
##################################################################

解决方案-(有一个解决方法。尽管我不完全理解)

我正在使用 spring 云 aws 依赖项并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,将 10 条消息放入某种 BufferQueue,然后一条一条地提供消息。这就是为什么所有消息都开始发送然后一条一条处理的原因。

能够通过定义一个简单的 bean 来覆盖那个。

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

现在它与预期的 withMaxNumberOfMessages(n) 同步工作

解决方案 -(有一个解决方法。尽管我不完全理解)

我正在使用 spring 云 aws 依赖项并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,将 10 条消息放入某种 BufferQueue,然后一条一条地提供消息。这就是为什么所有消息都开始发送然后一条一条处理的原因。

能够通过定义一个简单的 bean 来覆盖那个。

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(1);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
}

现在它与预期的 withMaxNumberOfMessages(n) 同步工作