AWS SQS Java。并非所有消息都从 SQS 队列中检索

AWS SQS Java. Not all messages are retrieved from the SQS queue

我一直在尝试使用 AWS SDK for Java 从 SQS 队列中检索所有消息的几种方法都无济于事。我已经了解了 AWS SQS 的分布式特性,并且消息存储在不同的服务器上。但我不明白的是为什么这种架构没有对最终用户隐藏。我必须在 Java 代码中应用什么技巧来检索所有消息并 100% 确定没有遗漏任何人?

我用 "Long Polling" 试过了:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println(" Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
for (Entry<String, String> entry : message.getAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue());
}
}
System.out.println();

这与请求批处理/客户端缓​​冲:

    // Create the basic Amazon SQS async client
    AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();

    // Create the buffered client
    AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

    CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue");

    CreateQueueResult res = bufferedSqs.createQueue(createRequest);

    SendMessageRequest request = new SendMessageRequest();
    String body = "test message_" + System.currentTimeMillis();
    request.setMessageBody( body );
    request.setQueueUrl(res.getQueueUrl());

    SendMessageResult sendResult = bufferedSqs.sendMessage(request);

    ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(10)
    .withQueueUrl(queueUrl);
    ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

    List<Message> messages = rx.getMessages();
    for (Message message : messages) {
    System.out.println(" Message");
    System.out.println(" MessageId: " + message.getMessageId());
    System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
    System.out.println(" MD5OfBody: " + message.getMD5OfBody());
    System.out.println(" Body: " + message.getBody());
    for (Entry<String, String> entry : message.getAttributes().entrySet()) {
    System.out.println(" Attribute");
    System.out.println(" Name: " + entry.getKey());
    System.out.println(" Value: " + entry.getValue());
    }
    }

但我仍然无法检索所有消息。

有什么想法吗?

AWS 论坛对我的post保持沉默。

从SQS队列接收消息时,需要重复调​​用sqs:ReceiveMessage

每次调用 sqs:ReceiveMessage 时,您将从队列中获得 0 条或多条消息,您需要遍历这些消息。对于每条消息,您还需要在处理完每条消息后调用 sqs:DeleteMessage 将消息从队列中删除。

在上面的 "Long Polling" 示例周围添加一个循环以接收所有消息。

for (;;) {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
    for (Message message : messages) {
        System.out.println(" Message");
        System.out.println(" MessageId: " + message.getMessageId());
        System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
        System.out.println(" MD5OfBody: " + message.getMD5OfBody());
        System.out.println(" Body: " + message.getBody());
        for (Entry<String, String> entry : message.getAttributes().entrySet()) {
            System.out.println(" Attribute");
            System.out.println(" Name: " + entry.getKey());
            System.out.println(" Value: " + entry.getValue());
        }
    }
    System.out.println();
}

另请注意,您可能会多次收到同一条消息。因此,允许您的工作"reprocess" 相同的消息,或检测重复的消息。

如果队列中没有消息,则长轮询将等待。这意味着如果您在循环中使用长轮询调用 ReceiveMessage,则可以保证您将收到所有消息。当收到 0 条消息响应时,您已收到所有消息。

您提到您还使用了 Web 控制台。 Web 控制台的工作方式与使用 SDK 调用 API 的方式相同。这意味着当您在控制台中接收和查看消息时,消息对其他客户端是不可见的,直到可见性超时到期。这可能是您看不到消息的原因。

查看有关可见性超时的更多信息:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

SQS 队列不是数据库。您无法像您尝试的那样将所有消息读入列表。队列没有开始也没有结束。您轮询队列并询问一些消息,它 return 如果它们存在,它会为您提供一些消息。

如果您想要一种可以 return 整个数据集的方法,那么 sqs 不是正确的工具 - 在这种情况下,传统数据库可能更好。

我也遇到了同样的问题 - 只返回了一条消息,然后我尝试了 receiveMessageRequest.setMaxNumberOfMessages(10) ,这将帮助我循环检索 10 条消息,

因为我的队列有超过 500 条记录,所以我所做的是

    List<String> messagelist = new ArrayList<>();

    try
    {
        AmazonSQS sqs = new AmazonSQSClient(credentials);
        Region usWest2 = Region.getRegion(Regions.US_WEST_2);
        sqs.setRegion(usWest2);
        boolean flag = true;

        while(flag)
        {
            ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queuename);
            receiveMessageRequest.setMaxNumberOfMessages(number_of_message_);
            receiveMessageRequest.withMaxNumberOfMessages(number_of_message_).withWaitTimeSeconds(wait_time_second_);
            List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

            for (Message message : messages) 
                  {
                    //   System.out.println("    Body:          " + message.getBody());
                       messagelist.add( message.getBody());

                       String messageReceiptHandle = message.getReceiptHandle();
                       sqs.deleteMessage(new DeleteMessageRequest().withQueueUrl(queuename).withReceiptHandle(messageReceiptHandle));
                    }
            if(messages.size()==0)
            {
                flag = false;
            }
        }         

    }
    catch (AmazonServiceException ase) {
        ase.printStackTrace();
    } catch (AmazonClientException ace) {
       ace.printStackTrace();
    }
    finally {
        return messagelist ;
    }

我正在从 SQS 读取记录,然后将其保存到字符串列表中,然后从队列中删除记录。

所以最后我会将队列中的所有数据放在一个列表中