AWS SQS Java。并非所有消息都从 SQS 队列中检索
AWS SQS Java. Not all messages are retrieved from the SQS queue
我一直在尝试使用 AWS SDK for Java 从 SQS 队列中检索所有消息的几种方法都无济于事。我已经了解了 AWS SQS 的分布式特性,并且消息存储在不同的服务器上。但我不明白的是为什么这种架构没有对最终用户隐藏。我必须在 Java 代码中应用什么技巧来检索所有消息并 100% 确定没有遗漏任何人?
我用 "Long Polling" 试过了:
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
这与请求批处理/客户端缓冲:
// Create the basic Amazon SQS async client
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
// Create the buffered client
AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");
CreateQueueResult res = bufferedSqs.createQueue(createRequest);
SendMessageRequest request = new SendMessageRequest();
String body = "test message_" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
SendMessageResult sendResult = bufferedSqs.sendMessage(request);
ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
.withMaxNumberOfMessages(10)
.withQueueUrl(queueUrl);
ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
List<Message> messages = rx.getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
但我仍然无法检索所有消息。
有什么想法吗?
AWS 论坛对我的post保持沉默。
从SQS队列接收消息时,需要重复调用sqs:ReceiveMessage
。
每次调用 sqs:ReceiveMessage
时,您将从队列中获得 0 条或多条消息,您需要遍历这些消息。对于每条消息,您还需要在处理完每条消息后调用 sqs:DeleteMessage
将消息从队列中删除。
在上面的 "Long Polling" 示例周围添加一个循环以接收所有消息。
for (;;) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
}
另请注意,您可能会多次收到同一条消息。因此,允许您的工作"reprocess" 相同的消息,或检测重复的消息。
如果队列中没有消息,则长轮询将等待。这意味着如果您在循环中使用长轮询调用 ReceiveMessage,则可以保证您将收到所有消息。当收到 0 条消息响应时,您已收到所有消息。
您提到您还使用了 Web 控制台。 Web 控制台的工作方式与使用 SDK 调用 API 的方式相同。这意味着当您在控制台中接收和查看消息时,消息对其他客户端是不可见的,直到可见性超时到期。这可能是您看不到消息的原因。
查看有关可见性超时的更多信息:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html
SQS 队列不是数据库。您无法像您尝试的那样将所有消息读入列表。队列没有开始也没有结束。您轮询队列并询问一些消息,它 return 如果它们存在,它会为您提供一些消息。
如果您想要一种可以 return 整个数据集的方法,那么 sqs 不是正确的工具 - 在这种情况下,传统数据库可能更好。
我也遇到了同样的问题 - 只返回了一条消息,然后我尝试了
receiveMessageRequest.setMaxNumberOfMessages(10) ,这将帮助我循环检索 10 条消息,
因为我的队列有超过 500 条记录,所以我所做的是
List<String> messagelist = new ArrayList<>();
try
{
AmazonSQS sqs = new AmazonSQSClient(credentials);
Region usWest2 = Region.getRegion(Regions.US_WEST_2);
sqs.setRegion(usWest2);
boolean flag = true;
while(flag)
{
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages)
{
// System.out.println(" Body: " + message.getBody());
messagelist.add( message.getBody());
String messageReceiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
}
if(messages.size()==0)
{
flag = false;
}
}
}
catch (AmazonServiceException ase) {
ase.printStackTrace();
} catch (AmazonClientException ace) {
ace.printStackTrace();
}
finally {
return messagelist ;
}
我正在从 SQS 读取记录,然后将其保存到字符串列表中,然后从队列中删除记录。
所以最后我会将队列中的所有数据放在一个列表中
我一直在尝试使用 AWS SDK for Java 从 SQS 队列中检索所有消息的几种方法都无济于事。我已经了解了 AWS SQS 的分布式特性,并且消息存储在不同的服务器上。但我不明白的是为什么这种架构没有对最终用户隐藏。我必须在 Java 代码中应用什么技巧来检索所有消息并 100% 确定没有遗漏任何人?
我用 "Long Polling" 试过了:
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
这与请求批处理/客户端缓冲:
// Create the basic Amazon SQS async client
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
// Create the buffered client
AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");
CreateQueueResult res = bufferedSqs.createQueue(createRequest);
SendMessageRequest request = new SendMessageRequest();
String body = "test message_" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
SendMessageResult sendResult = bufferedSqs.sendMessage(request);
ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
.withMaxNumberOfMessages(10)
.withQueueUrl(queueUrl);
ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
List<Message> messages = rx.getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
但我仍然无法检索所有消息。
有什么想法吗?
AWS 论坛对我的post保持沉默。
从SQS队列接收消息时,需要重复调用sqs:ReceiveMessage
。
每次调用 sqs:ReceiveMessage
时,您将从队列中获得 0 条或多条消息,您需要遍历这些消息。对于每条消息,您还需要在处理完每条消息后调用 sqs:DeleteMessage
将消息从队列中删除。
在上面的 "Long Polling" 示例周围添加一个循环以接收所有消息。
for (;;) {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();
}
另请注意,您可能会多次收到同一条消息。因此,允许您的工作"reprocess" 相同的消息,或检测重复的消息。
如果队列中没有消息,则长轮询将等待。这意味着如果您在循环中使用长轮询调用 ReceiveMessage,则可以保证您将收到所有消息。当收到 0 条消息响应时,您已收到所有消息。
您提到您还使用了 Web 控制台。 Web 控制台的工作方式与使用 SDK 调用 API 的方式相同。这意味着当您在控制台中接收和查看消息时,消息对其他客户端是不可见的,直到可见性超时到期。这可能是您看不到消息的原因。
查看有关可见性超时的更多信息:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html
SQS 队列不是数据库。您无法像您尝试的那样将所有消息读入列表。队列没有开始也没有结束。您轮询队列并询问一些消息,它 return 如果它们存在,它会为您提供一些消息。
如果您想要一种可以 return 整个数据集的方法,那么 sqs 不是正确的工具 - 在这种情况下,传统数据库可能更好。
我也遇到了同样的问题 - 只返回了一条消息,然后我尝试了 receiveMessageRequest.setMaxNumberOfMessages(10) ,这将帮助我循环检索 10 条消息,
因为我的队列有超过 500 条记录,所以我所做的是
List<String> messagelist = new ArrayList<>();
try
{
AmazonSQS sqs = new AmazonSQSClient(credentials);
Region usWest2 = Region.getRegion(Regions.US_WEST_2);
sqs.setRegion(usWest2);
boolean flag = true;
while(flag)
{
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages)
{
// System.out.println(" Body: " + message.getBody());
messagelist.add( message.getBody());
String messageReceiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
}
if(messages.size()==0)
{
flag = false;
}
}
}
catch (AmazonServiceException ase) {
ase.printStackTrace();
} catch (AmazonClientException ace) {
ace.printStackTrace();
}
finally {
return messagelist ;
}
我正在从 SQS 读取记录,然后将其保存到字符串列表中,然后从队列中删除记录。
所以最后我会将队列中的所有数据放在一个列表中