AWS FIFO SQS 队列消息在我成功删除后重新发布相同消息时消失
AWS FIFO SQS queue message is disappearing when I repost the same message even after successfully deleting it
我在 SQS 中遇到了一个奇怪的问题。让我简化我的用例,我的 FIFO 队列中有 7 条消息,我的独立应用程序应该继续以相同的顺序为我的业务案例无限地轮询消息。例如,我的应用程序读取消息 1,经过一些业务处理后,应用程序将删除它并将相同的消息重新发布到相同的队列(队列的尾部),并且这些步骤将无休止地继续下一组消息。在这里,我的期望是我的应用程序将连续轮询消息并根据队列中的消息以相同的顺序执行操作,但这就是问题所在。当第一次从队列中读取消息时,将其删除,并将相同的消息重新发布到同一队列中,即使成功发送消息结果后,重新发布的消息也不在队列中。
我包含了下面的代码来模拟这个问题,简而言之,创建了 Test_Queue.fifo
队列,Test_Queue_DLQ.fifo
配置为 reDrivePolicy
。在创建队列后的第一时间,消息被发布 -> "Test_Message"
到 Test_Queue.fifo
队列(在响应中获取 MessageId )并长轮询队列以读取消息,并在迭代之后ReceiveMessageResult#getMessages,删除消息(响应中获取 MessageId)。同样,在成功删除消息后,相同的消息被重新发布到同一队列的尾部(获取响应中的 MessageId)。但是,重新发布的消息不在队列中。当我检查 AWS 管理控制台时,Messages available
和 Messages in flight
部分中的消息计数为 0,并且 Test_Queue_DLQ.fifo queue
中甚至不存在重新发布的消息。根据 SQS 文档,如果我们删除消息,即使它出现在飞行模式下也应该被删除,因此重新发布相同的消息应该不是问题。我怀疑在 SQS 方面,他们正在执行一些相等比较并在 visibleTimeOut 间隔期间跳过相同的消息以避免在分布式环境中对相同消息进行重复数据删除,但无法获得任何清晰的图片。
模拟上述问题的代码片段
public class SQSIssue {
@Test
void sqsMessageAbsenceIssueTest() {
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-east-2.amazonaws.com", "us-east-2"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>"))).build();
//create queue
String queueUrl = createQueues(amazonSQS);
String message = "Test_Message";
String groupId = "Group1";
//Sending message -> "Test_Message"
sendMessage(amazonSQS, queueUrl, message, groupId);
//Reading the message and deleting using message.getReceiptHandle()
readAndDeleteMessage(amazonSQS, queueUrl);
//Reposting the same message into the queue -> "Test_Message"
sendMessage(amazonSQS, queueUrl, message, groupId);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(5)
.withMessageAttributeNames("All")
.withVisibilityTimeout(30)
.withMaxNumberOfMessages(10);
ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(receiveMessageRequest);
//Here I am expecting the message presence in the queue as I recently reposted the same message into the same queue after the message deletion
Assertions.assertFalse(receiveMessageResult.getMessages().isEmpty());
}
private void readAndDeleteMessage(AmazonSQS amazonSQS, String queueUrl) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(5)
.withMessageAttributeNames("All")
.withVisibilityTimeout(30)
.withMaxNumberOfMessages(10);
ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(receiveMessageRequest);
receiveMessageResult.getMessages().forEach(message -> amazonSQS.deleteMessage(queueUrl, message.getReceiptHandle()));
}
private String createQueues(AmazonSQS amazonSQS) {
String queueName = "Test_Queue.fifo";
String deadLetterQueueName = "Test_Queue_DLQ.fifo";
//Creating DeadLetterQueue
CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
.addAttributesEntry("FifoQueue", "true")
.addAttributesEntry("ContentBasedDeduplication", "true")
.addAttributesEntry("VisibilityTimeout", "600")
.addAttributesEntry("MessageRetentionPeriod", "262144");
createDeadLetterQueueRequest.withQueueName(deadLetterQueueName);
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDeadLetterQueueRequest);
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deadLetterQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Creating Actual Queue with DeadLetterQueue configured
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.addAttributesEntry("FifoQueue", "true")
.addAttributesEntry("ContentBasedDeduplication", "true")
.addAttributesEntry("VisibilityTimeout", "600")
.addAttributesEntry("MessageRetentionPeriod", "262144");
createQueueRequest.withQueueName(queueName);
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deadLetterQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
return createQueueResult.getQueueUrl();
}
private void sendMessage(AmazonSQS amazonSQS, String queueUrl, String message, String groupId) {
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(message)
.withMessageGroupId(groupId);
SendMessageResult sendMessageResult = amazonSQS.sendMessage(sendMessageRequest);
Assertions.assertNotNull(sendMessageResult.getMessageId());
}
}
来自Using the Amazon SQS message deduplication ID:
The message deduplication ID is the token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.
因此,每次将消息放回队列时,您都应该提供不同的重复数据删除 ID。
是合适的,我应该添加 SendMessageRequest#withMessageDeduplicationId
,但我想在答案中再补充几点,消息消失背后的技术原因是因为我启用了ContentBasedDeduplication
队列。如果在发送消息时未明确提及 MessageDeduplicationId,Amazon SQS 使用 SHA-256
哈希通过消息正文(而不是消息的属性)生成 MessageDeduplicationId
。当 ContentBasedDeduplication
生效时,在重复数据删除间隔内发送的具有相同内容的邮件将被视为重复邮件,并且只传递邮件的一个副本。因此,即使我们为重新发布到同一队列的同一消息添加不同的属性,也不会按预期工作。添加 MessageDeduplicationId
有助于解决问题,因为即使队列设置了 ContentBasedDeduplication,显式 MessageDeduplicationId
也会覆盖生成的
代码段
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(message)
.withMessageGroupId(groupId)
// Adding explicit MessageDeduplicationId
.withMessageDeduplicationId(UUID.randomUUID().toString());
SendMessageResult sendMessageResult = amazonSQS.sendMessage(sendMessageRequest);
我在 SQS 中遇到了一个奇怪的问题。让我简化我的用例,我的 FIFO 队列中有 7 条消息,我的独立应用程序应该继续以相同的顺序为我的业务案例无限地轮询消息。例如,我的应用程序读取消息 1,经过一些业务处理后,应用程序将删除它并将相同的消息重新发布到相同的队列(队列的尾部),并且这些步骤将无休止地继续下一组消息。在这里,我的期望是我的应用程序将连续轮询消息并根据队列中的消息以相同的顺序执行操作,但这就是问题所在。当第一次从队列中读取消息时,将其删除,并将相同的消息重新发布到同一队列中,即使成功发送消息结果后,重新发布的消息也不在队列中。
我包含了下面的代码来模拟这个问题,简而言之,创建了 Test_Queue.fifo
队列,Test_Queue_DLQ.fifo
配置为 reDrivePolicy
。在创建队列后的第一时间,消息被发布 -> "Test_Message"
到 Test_Queue.fifo
队列(在响应中获取 MessageId )并长轮询队列以读取消息,并在迭代之后ReceiveMessageResult#getMessages,删除消息(响应中获取 MessageId)。同样,在成功删除消息后,相同的消息被重新发布到同一队列的尾部(获取响应中的 MessageId)。但是,重新发布的消息不在队列中。当我检查 AWS 管理控制台时,Messages available
和 Messages in flight
部分中的消息计数为 0,并且 Test_Queue_DLQ.fifo queue
中甚至不存在重新发布的消息。根据 SQS 文档,如果我们删除消息,即使它出现在飞行模式下也应该被删除,因此重新发布相同的消息应该不是问题。我怀疑在 SQS 方面,他们正在执行一些相等比较并在 visibleTimeOut 间隔期间跳过相同的消息以避免在分布式环境中对相同消息进行重复数据删除,但无法获得任何清晰的图片。
模拟上述问题的代码片段
public class SQSIssue {
@Test
void sqsMessageAbsenceIssueTest() {
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-east-2.amazonaws.com", "us-east-2"))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>"))).build();
//create queue
String queueUrl = createQueues(amazonSQS);
String message = "Test_Message";
String groupId = "Group1";
//Sending message -> "Test_Message"
sendMessage(amazonSQS, queueUrl, message, groupId);
//Reading the message and deleting using message.getReceiptHandle()
readAndDeleteMessage(amazonSQS, queueUrl);
//Reposting the same message into the queue -> "Test_Message"
sendMessage(amazonSQS, queueUrl, message, groupId);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(5)
.withMessageAttributeNames("All")
.withVisibilityTimeout(30)
.withMaxNumberOfMessages(10);
ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(receiveMessageRequest);
//Here I am expecting the message presence in the queue as I recently reposted the same message into the same queue after the message deletion
Assertions.assertFalse(receiveMessageResult.getMessages().isEmpty());
}
private void readAndDeleteMessage(AmazonSQS amazonSQS, String queueUrl) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(5)
.withMessageAttributeNames("All")
.withVisibilityTimeout(30)
.withMaxNumberOfMessages(10);
ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(receiveMessageRequest);
receiveMessageResult.getMessages().forEach(message -> amazonSQS.deleteMessage(queueUrl, message.getReceiptHandle()));
}
private String createQueues(AmazonSQS amazonSQS) {
String queueName = "Test_Queue.fifo";
String deadLetterQueueName = "Test_Queue_DLQ.fifo";
//Creating DeadLetterQueue
CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
.addAttributesEntry("FifoQueue", "true")
.addAttributesEntry("ContentBasedDeduplication", "true")
.addAttributesEntry("VisibilityTimeout", "600")
.addAttributesEntry("MessageRetentionPeriod", "262144");
createDeadLetterQueueRequest.withQueueName(deadLetterQueueName);
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDeadLetterQueueRequest);
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deadLetterQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Creating Actual Queue with DeadLetterQueue configured
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.addAttributesEntry("FifoQueue", "true")
.addAttributesEntry("ContentBasedDeduplication", "true")
.addAttributesEntry("VisibilityTimeout", "600")
.addAttributesEntry("MessageRetentionPeriod", "262144");
createQueueRequest.withQueueName(queueName);
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deadLetterQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
return createQueueResult.getQueueUrl();
}
private void sendMessage(AmazonSQS amazonSQS, String queueUrl, String message, String groupId) {
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(message)
.withMessageGroupId(groupId);
SendMessageResult sendMessageResult = amazonSQS.sendMessage(sendMessageRequest);
Assertions.assertNotNull(sendMessageResult.getMessageId());
}
}
来自Using the Amazon SQS message deduplication ID:
The message deduplication ID is the token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.
因此,每次将消息放回队列时,您都应该提供不同的重复数据删除 ID。
SendMessageRequest#withMessageDeduplicationId
,但我想在答案中再补充几点,消息消失背后的技术原因是因为我启用了ContentBasedDeduplication
队列。如果在发送消息时未明确提及 MessageDeduplicationId,Amazon SQS 使用 SHA-256
哈希通过消息正文(而不是消息的属性)生成 MessageDeduplicationId
。当 ContentBasedDeduplication
生效时,在重复数据删除间隔内发送的具有相同内容的邮件将被视为重复邮件,并且只传递邮件的一个副本。因此,即使我们为重新发布到同一队列的同一消息添加不同的属性,也不会按预期工作。添加 MessageDeduplicationId
有助于解决问题,因为即使队列设置了 ContentBasedDeduplication,显式 MessageDeduplicationId
也会覆盖生成的
代码段
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(message)
.withMessageGroupId(groupId)
// Adding explicit MessageDeduplicationId
.withMessageDeduplicationId(UUID.randomUUID().toString());
SendMessageResult sendMessageResult = amazonSQS.sendMessage(sendMessageRequest);