在amazon sqs, alpine sqs spring boot中批量消费消息
Consuming messages in batches in amazon sqs, alpine sqs spring boot
我已将 SQS 侦听器配置为使用消息列表中的消息,但我一次只收到一条消息并出现错误,因为无法将 model.StudentData 转换为 java.util.ArrayList<com.amazonaws.services.sqs.model.Message>
[= 的实例13=]
我的代码是:-
@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receiveMessage(final StudentData studentData,
@Header("SenderId") final String senderId, final Acknowledgment acknowledgment) {
// business logic
acknowledgment.acknowledge();
}
关于如何配置 sqs 侦听器以使用多条消息的任何建议
任何帮助将不胜感激
SQS监听注解提供了最简单的配置,它会一条一条的消费消息。此限制直接来自 spring's
QueueMessagingTemplate.
要使用批次,您可以直接使用 AmazonSQS 客户端。
@Autowire AmazonSQSAsync amazonSqs;
...
String queueUrl = amazonSqs.getQueueUrl("queueName").getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
receiveMessageRequest.setQueueUrl(queueUrl);
receiveMessageRequest.setWaitTimeSeconds(10); // Listener for messages in the next 10 seconds
receiveMessageRequest.setMaxNumberOfMessages(1000); // If 10000 messages are read stop listening
ReceiveMessageResult receiveMessageResult = amazonSqs.receiveMessage(receiveMessageRequest);
receiveMessageResult.getMessages(); // batch of messages
上述问题的解决方案是:-
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
final String queueUrl = amazonSqs.getQueueUrl("enter your queue name").getQueueUrl();
final var receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
.withWaitTimeSeconds(20);
List<Message> messages = amazonSqs.receiveMessage(receiveMessageRequest).getMessages();
while (messages.size() > 0) {
for (final Message queueMessage : messages) {
try {
String message = queueMessage.getBody();
amazonSqs.deleteMessage(new DeleteMessageRequest(queueUrl, queueMessage
.getReceiptHandle()));
} catch (Exception e) {
log.error("Received message with errors " + e);
}
}
messages = amazonSqs.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages();
}
}
});
executorService.shutdown();
我已将 SQS 侦听器配置为使用消息列表中的消息,但我一次只收到一条消息并出现错误,因为无法将 model.StudentData 转换为 java.util.ArrayList<com.amazonaws.services.sqs.model.Message>
[= 的实例13=]
我的代码是:-
@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receiveMessage(final StudentData studentData,
@Header("SenderId") final String senderId, final Acknowledgment acknowledgment) {
// business logic
acknowledgment.acknowledge();
}
关于如何配置 sqs 侦听器以使用多条消息的任何建议
任何帮助将不胜感激
SQS监听注解提供了最简单的配置,它会一条一条的消费消息。此限制直接来自 spring's QueueMessagingTemplate.
要使用批次,您可以直接使用 AmazonSQS 客户端。
@Autowire AmazonSQSAsync amazonSqs;
...
String queueUrl = amazonSqs.getQueueUrl("queueName").getQueueUrl();
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
receiveMessageRequest.setQueueUrl(queueUrl);
receiveMessageRequest.setWaitTimeSeconds(10); // Listener for messages in the next 10 seconds
receiveMessageRequest.setMaxNumberOfMessages(1000); // If 10000 messages are read stop listening
ReceiveMessageResult receiveMessageResult = amazonSqs.receiveMessage(receiveMessageRequest);
receiveMessageResult.getMessages(); // batch of messages
上述问题的解决方案是:-
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
final String queueUrl = amazonSqs.getQueueUrl("enter your queue name").getQueueUrl();
final var receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
.withWaitTimeSeconds(20);
List<Message> messages = amazonSqs.receiveMessage(receiveMessageRequest).getMessages();
while (messages.size() > 0) {
for (final Message queueMessage : messages) {
try {
String message = queueMessage.getBody();
amazonSqs.deleteMessage(new DeleteMessageRequest(queueUrl, queueMessage
.getReceiptHandle()));
} catch (Exception e) {
log.error("Received message with errors " + e);
}
}
messages = amazonSqs.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages();
}
}
});
executorService.shutdown();