在amazon sqs, alpine sqs spring boot中批量消费消息

Consuming messages in batches in amazon sqs, alpine sqs spring boot

我已将 SQS 侦听器配置为使用消息列表中的消息,但我一次只收到一条消息并出现错误,因为无法将 model.StudentData 转换为 java.util.ArrayList<com.amazonaws.services.sqs.model.Message>[= 的实例13=]

我的代码是:-

@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void receiveMessage(final StudentData studentData,
                               @Header("SenderId") final String senderId, final Acknowledgment acknowledgment) {

        // business logic
        acknowledgment.acknowledge();
    }

关于如何配置 sqs 侦听器以使用多条消息的任何建议

任何帮助将不胜感激

SQS监听注解提供了最简单的配置,它会一条一条的消费消息。此限制直接来自 spring's QueueMessagingTemplate.

要使用批次,您可以直接使用 AmazonSQS 客户端。

    @Autowire AmazonSQSAsync amazonSqs;
    ...

    String queueUrl = amazonSqs.getQueueUrl("queueName").getQueueUrl();
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
    receiveMessageRequest.setQueueUrl(queueUrl);
    receiveMessageRequest.setWaitTimeSeconds(10); // Listener for messages in the next 10 seconds
    receiveMessageRequest.setMaxNumberOfMessages(1000); // If 10000 messages are read stop listening
    ReceiveMessageResult receiveMessageResult = amazonSqs.receiveMessage(receiveMessageRequest);
    receiveMessageResult.getMessages(); // batch of messages

上述问题的解决方案是:-

final ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
        while (true) {
            final String queueUrl = amazonSqs.getQueueUrl("enter your queue name").getQueueUrl();
            final var receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
                    .withWaitTimeSeconds(20);

            List<Message> messages = amazonSqs.receiveMessage(receiveMessageRequest).getMessages();

            while (messages.size() > 0) {
                for (final Message queueMessage : messages) {
                    try {
                        String message = queueMessage.getBody();
                        amazonSqs.deleteMessage(new DeleteMessageRequest(queueUrl, queueMessage
                                .getReceiptHandle()));
                    } catch (Exception e) {
                        log.error("Received message with errors " + e);
                    }
                }
                messages = amazonSqs.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages();
            }
        }
    });
        executorService.shutdown();