SQS - 即使将 maxNumberOfMessages 设置为 1,所有消息都会发送
SQS - all messages goes into flight even when maxNumberOfMessages is set to 1
我根据 sqs 中的消息数量定义了 pod 缩放。我希望每个 pod 处理 1 条消息。
所以如果我有 3 条消息,我将有 3 条 pods 并且每个处理 1 条消息。
这就是我从 sqs 中检索消息的方式。 withMaxNumberOfMessages(1)
ReceiveMessageRequest receiveMessageRequest = new
ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());
我可以看到它选择的消息数是 1。
我面临的问题是当 1 个 pod 运行 时,队列中的所有消息都进入 flight-mode。剩下的 pods 读取零条消息。
为什么会这样。即使我将 maxNumberOfMessages 指定为 1,为什么所有消息都进入 flight-mod。我希望它只选择 1 条消息并且该消息进入 flight-mode 并且其余消息保留在队列中并可用于其他 pods
这就是我 运行 pod 启动时的代码
@EventListener(ApplicationReadyEvent.class)
public void init() throws InterruptedException {
SQSS3Event message = sqsRepository.getMessage(queueUrl);
while(message != null){
System.out.println(message.getBucketName());
System.out.println(message.getFileName());
System.out.println(message.getReceiptHandle());
sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 70);
Thread.sleep(60000);
sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 10);
sqsRepository.deleteMessage(queueUrl,message.getReceiptHandle());
message = sqsRepository.getMessage(queueUrl);
}
System.out.println("No more messages to process");
}
这是从 SQS 检索消息的辅助方法
public SQSS3Event getMessage(String queueUrl){
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());
if(messages.size()>0) {
S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
return SQSS3Event.builder()
.bucketName(notification.getRecords().get(0).getS3().getBucket().getName())
.fileName(notification.getRecords().get(0).getS3().getObject().getKey())
.receiptHandle(messages.get(0).getReceiptHandle())
.build();
}
else {
return null;
}
}
在读取消息之前和之后添加打印语句来读取队列属性
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(5);
GetQueueAttributesResult att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################Before reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();
System.out.println("No of messages in the result - "+messages.size());
S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################After reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
Output
##########################Before reading##########################
No of Messages - 4
No of Messages on Flight - 0
##################################################################
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 4
##################################################################
随踪
##########################Before reading##########################
No of Messages - 2
No of Messages on Flight - 0
##################################################################
2022-04-07 21:59:24.036 TRACE 21096 --- [ main] c.a.s.sqs.buffered.ReceiveQueueBuffer : Spawned receive batch #1 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
2022-04-07 21:59:24.166 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:14:39.050963Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "2e30a6e9"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Expected MD5 of message body: 9428bd5190b9d47af3368b3f67c62d02
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:15:50.375772Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "a8ba5ab3"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Expected MD5 of message body: 18ce2856addac8a08c394ce8fbd7d315
2022-04-07 21:59:24.167 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer : Queue http://localhost:4576/queue/upload-notifications now has 1 receive results cached
2022-04-07 21:59:24.168 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer : Spawned receive batch #2 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 2
##################################################################
解决方案-(有一个解决方法。尽管我不完全理解)
我正在使用 spring 云 aws 依赖项并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,将 10 条消息放入某种 BufferQueue,然后一条一条地提供消息。这就是为什么所有消息都开始发送然后一条一条处理的原因。
能够通过定义一个简单的 bean 来覆盖那个。
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(1);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
现在它与预期的 withMaxNumberOfMessages(n)
同步工作
解决方案 -(有一个解决方法。尽管我不完全理解)
我正在使用 spring 云 aws 依赖项并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,将 10 条消息放入某种 BufferQueue,然后一条一条地提供消息。这就是为什么所有消息都开始发送然后一条一条处理的原因。
能够通过定义一个简单的 bean 来覆盖那个。
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(1);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
现在它与预期的 withMaxNumberOfMessages(n) 同步工作
我根据 sqs 中的消息数量定义了 pod 缩放。我希望每个 pod 处理 1 条消息。
所以如果我有 3 条消息,我将有 3 条 pods 并且每个处理 1 条消息。
这就是我从 sqs 中检索消息的方式。 withMaxNumberOfMessages(1)
ReceiveMessageRequest receiveMessageRequest = new
ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());
我可以看到它选择的消息数是 1。
我面临的问题是当 1 个 pod 运行 时,队列中的所有消息都进入 flight-mode。剩下的 pods 读取零条消息。
为什么会这样。即使我将 maxNumberOfMessages 指定为 1,为什么所有消息都进入 flight-mod。我希望它只选择 1 条消息并且该消息进入 flight-mode 并且其余消息保留在队列中并可用于其他 pods
这就是我 运行 pod 启动时的代码
@EventListener(ApplicationReadyEvent.class)
public void init() throws InterruptedException {
SQSS3Event message = sqsRepository.getMessage(queueUrl);
while(message != null){
System.out.println(message.getBucketName());
System.out.println(message.getFileName());
System.out.println(message.getReceiptHandle());
sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 70);
Thread.sleep(60000);
sqsRepository.changeMessageVisibility(queueUrl, message.getReceiptHandle(), 10);
sqsRepository.deleteMessage(queueUrl,message.getReceiptHandle());
message = sqsRepository.getMessage(queueUrl);
}
System.out.println("No more messages to process");
}
这是从 SQS 检索消息的辅助方法
public SQSS3Event getMessage(String queueUrl){
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(10);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
System.out.println("Number of messages - "+messages.size());
if(messages.size()>0) {
S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
return SQSS3Event.builder()
.bucketName(notification.getRecords().get(0).getS3().getBucket().getName())
.fileName(notification.getRecords().get(0).getS3().getObject().getKey())
.receiptHandle(messages.get(0).getReceiptHandle())
.build();
}
else {
return null;
}
}
在读取消息之前和之后添加打印语句来读取队列属性
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(1).withWaitTimeSeconds(5);
GetQueueAttributesResult att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################Before reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();
System.out.println("No of messages in the result - "+messages.size());
S3EventNotification notification = S3EventNotification.parseJson(messages.get(0).getBody());
att = sqs.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"));
System.out.println("##########################After reading##########################");
System.out.println("No of Messages - "+ att.getAttributes().get("ApproximateNumberOfMessages"));
System.out.println("No of Messages on Flight - "+ att.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
System.out.println("##################################################################");
Output
##########################Before reading##########################
No of Messages - 4
No of Messages on Flight - 0
##################################################################
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 4
##################################################################
随踪
##########################Before reading##########################
No of Messages - 2
No of Messages on Flight - 0
##################################################################
2022-04-07 21:59:24.036 TRACE 21096 --- [ main] c.a.s.sqs.buffered.ReceiveQueueBuffer : Spawned receive batch #1 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
2022-04-07 21:59:24.166 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:14:39.050963Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "2e30a6e9"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Expected MD5 of message body: 9428bd5190b9d47af3368b3f67c62d02
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Message body: {"Records": [{"eventVersion": "2.0", "eventName": "ObjectCreated:Put", "eventTime": "2022-04-07T19:15:50.375772Z", "userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"}, "eventSource": "aws:s3", "requestParameters": {"sourceIPAddress": "127.0.0.1"}, "s3": {"configurationId": "testConfigRule", "object": {"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko", "eTag": "d41d8cd98f00b204e9800998ecf8427e", "sequencer": "0055AED6DCD90281E5", "key": "3.txt", "size": 1024}, "bucket": {"arn": "arn:aws:s3:::uploads-toprocess", "name": "uploads-toprocess", "ownerIdentity": {"principalId": "A3NL1KOZZKExample"}}, "s3SchemaVersion": "1.0"}, "responseElements": {"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", "x-amz-request-id": "a8ba5ab3"}, "awsRegion": "us-east-1"}]}
2022-04-07 21:59:24.167 DEBUG 21096 --- [rWorkerThread-1] c.a.s.sqs.MessageMD5ChecksumHandler : Expected MD5 of message body: 18ce2856addac8a08c394ce8fbd7d315
2022-04-07 21:59:24.167 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer : Queue http://localhost:4576/queue/upload-notifications now has 1 receive results cached
2022-04-07 21:59:24.168 TRACE 21096 --- [rWorkerThread-1] c.a.s.sqs.buffered.ReceiveQueueBuffer : Spawned receive batch #2 (1 of 10 inflight) for queue http://localhost:4576/queue/upload-notifications
No of messages in the result- 1
##########################After reading##########################
No of Messages - 0
No of Messages on Flight - 2
##################################################################
解决方案-(有一个解决方法。尽管我不完全理解)
我正在使用 spring 云 aws 依赖项并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,将 10 条消息放入某种 BufferQueue,然后一条一条地提供消息。这就是为什么所有消息都开始发送然后一条一条处理的原因。
能够通过定义一个简单的 bean 来覆盖那个。
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(1);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
现在它与预期的 withMaxNumberOfMessages(n)
同步工作
解决方案 -(有一个解决方法。尽管我不完全理解)
我正在使用 spring 云 aws 依赖项并使用 @Autowired AmazonSQS 实例与队列交互。默认情况下,将 10 条消息放入某种 BufferQueue,然后一条一条地提供消息。这就是为什么所有消息都开始发送然后一条一条处理的原因。
能够通过定义一个简单的 bean 来覆盖那个。
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(1);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
现在它与预期的 withMaxNumberOfMessages(n) 同步工作