spring 引导中的 AWS SQS、DLQ
AWS SQS, DLQ in spring boot
如何将 DLQ 配置添加到我的 SQS 配置?我不确定如何将 DLQ 与现有队列集成。我使用的是 aws 消息传递而不是 JMS,因此我的侦听器方法的注释将是 @SQSListener
。我的配置 class 具有以下内容
@Bean
public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setMaxNumberOfMessages(10);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setQueueStopTimeout(queueStopTimeout*1000);
simpleMessageListenerContainer.setMessageHandler(messageHandler(amazonSQSAsync));
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler messageHandler(AmazonSQSAsync amazonSQSAsync) {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
QueueMessageHandler messageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return messageHandler;
}
@Bean
public AmazonSQSAsync awsSqsAsync() {
AmazonSQSAsyncClient amazonSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
amazonSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(region)));
return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient);
}
我找不到任何正确的文档来正确设置重试,以便如果重试超过阈值,消息应该进入死信队列
如果我没记错的话,设置 maximum retries
和 associated DLQ
是在 Broker
端完成的,并且不可配置为侦听器的一部分。
然后在您的代码中,您将执行如下操作:
@SqsListener(value = "MainQueue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {
ack.acknowledge();
}
@SqsListener(value = "DLQ-AssociatedWithMain")
public void receiveDlq(String message) throws IOException {
}
如果消息未被确认,则将重试指定的最长期限,然后发送到 DLQ。
=== 已编辑 ===
以下 LocalStack
是建议(从未测试过),但是 LocalStack(免费版)目前应该支持 AWS CLI:
因此,如果您查看 AWS CLI
:您使用 aws create-queue
创建队列,如果您想指定 DLQ 信息,则使用 --attributes
,但我相信您必须在引用 RN 之前还要创建 DLQ 队列。
create-queue
--queue-name <value>
[--attributes <value>]
[--tags <value>]
[--cli-input-json <value>]
[--generate-cli-skeleton <value>]
DLQ 属性详细信息:
以下属性仅适用于死信队列:
RedrivePolicy – The string that includes the parameters for the dead-letter queue functionality of the source queue as a JSON object. The parameters are as follows:
deadLetterTargetArn – The Amazon Resource Name (ARN) of the dead-letter queue to which Amazon SQS moves messages after the value of maxReceiveCount is exceeded.
maxReceiveCount – The number of times a message is delivered to the source queue before being moved to the dead-letter queue. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.
RedriveAllowPolicy – The string that includes the parameters for the permissions for the dead-letter queue redrive permission and which source queues can specify dead-letter queues as a JSON object. The parameters are as follows:
redrivePermission – The permission type that defines which source queues can specify the current queue as the dead-letter queue. Valid values are:
allowAll – (Default) Any source queues in this Amazon Web Services account in the same Region can specify this queue as the dead-letter queue.
denyAll – No source queues can specify this queue as the dead-letter queue.
byQueue – Only queues specified by the sourceQueueArns parameter can specify this queue as the dead-letter queue.
sourceQueueArns – The Amazon Resource Names (ARN)s of the source queues that can specify this queue as the dead-letter queue and redrive messages. You can specify this parameter only when the redrivePermission parameter is set to byQueue . You can specify up to 10 source queue ARNs. To allow more than 10 source queues to specify dead-letter queues, set the redrivePermission parameter to allowAll .
https://docs.aws.amazon.com/cli/latest/reference/sqs/create-queue.html
在 LocalStack SQS
文档中,他们有一个创建 SQS Queue
的示例:
awslocal sqs create-queue --queue-name sample-queue
{
"QueueUrl": "http://localhost:4566/000000000000/sample-queue"
}
所以就拿这个例子来说,创建你的 DLQ
,然后用 --attributes
创建你的 Queue
指向 DLQ RN
.
https://docs.localstack.cloud/aws/sqs/
希望这有助于引导您朝着正确的方向前进,
==== 已编辑 ===
使用 LocalStack 创建队列和 DLQ:
首先创建 DLQ:
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MyDLQ --region eu-west-1
回复:
{
"QueueUrl": "http://localhost:4566/000000000000/MyDLQ"
}
创建一个包含以下内容的 attributes.json
文件:
{
"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:eu-west-1:000000000000:MyDLQ\",\"maxReceiveCount\":\"1000\"}",
"MessageRetentionPeriod": "259200"
}
创建指向attributes.json
文件的主队列:
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MyMainQueue --attributes file://attributes.json --region eu-west-1
回复:
{
"QueueUrl": "http://localhost:4566/000000000000/MyMainQueue"
}
如何将 DLQ 配置添加到我的 SQS 配置?我不确定如何将 DLQ 与现有队列集成。我使用的是 aws 消息传递而不是 JMS,因此我的侦听器方法的注释将是 @SQSListener
。我的配置 class 具有以下内容
@Bean
public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setMaxNumberOfMessages(10);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setQueueStopTimeout(queueStopTimeout*1000);
simpleMessageListenerContainer.setMessageHandler(messageHandler(amazonSQSAsync));
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler messageHandler(AmazonSQSAsync amazonSQSAsync) {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
QueueMessageHandler messageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return messageHandler;
}
@Bean
public AmazonSQSAsync awsSqsAsync() {
AmazonSQSAsyncClient amazonSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
amazonSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(region)));
return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient);
}
我找不到任何正确的文档来正确设置重试,以便如果重试超过阈值,消息应该进入死信队列
如果我没记错的话,设置 maximum retries
和 associated DLQ
是在 Broker
端完成的,并且不可配置为侦听器的一部分。
然后在您的代码中,您将执行如下操作:
@SqsListener(value = "MainQueue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {
ack.acknowledge();
}
@SqsListener(value = "DLQ-AssociatedWithMain")
public void receiveDlq(String message) throws IOException {
}
如果消息未被确认,则将重试指定的最长期限,然后发送到 DLQ。
=== 已编辑 ===
以下 LocalStack
是建议(从未测试过),但是 LocalStack(免费版)目前应该支持 AWS CLI:
因此,如果您查看 AWS CLI
:您使用 aws create-queue
创建队列,如果您想指定 DLQ 信息,则使用 --attributes
,但我相信您必须在引用 RN 之前还要创建 DLQ 队列。
create-queue
--queue-name <value>
[--attributes <value>]
[--tags <value>]
[--cli-input-json <value>]
[--generate-cli-skeleton <value>]
DLQ 属性详细信息:
以下属性仅适用于死信队列:
RedrivePolicy – The string that includes the parameters for the dead-letter queue functionality of the source queue as a JSON object. The parameters are as follows:
deadLetterTargetArn – The Amazon Resource Name (ARN) of the dead-letter queue to which Amazon SQS moves messages after the value of maxReceiveCount is exceeded.
maxReceiveCount – The number of times a message is delivered to the source queue before being moved to the dead-letter queue. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.
RedriveAllowPolicy – The string that includes the parameters for the permissions for the dead-letter queue redrive permission and which source queues can specify dead-letter queues as a JSON object. The parameters are as follows:
redrivePermission – The permission type that defines which source queues can specify the current queue as the dead-letter queue. Valid values are:
allowAll – (Default) Any source queues in this Amazon Web Services account in the same Region can specify this queue as the dead-letter queue.
denyAll – No source queues can specify this queue as the dead-letter queue.
byQueue – Only queues specified by the sourceQueueArns parameter can specify this queue as the dead-letter queue.
sourceQueueArns – The Amazon Resource Names (ARN)s of the source queues that can specify this queue as the dead-letter queue and redrive messages. You can specify this parameter only when the redrivePermission parameter is set to byQueue . You can specify up to 10 source queue ARNs. To allow more than 10 source queues to specify dead-letter queues, set the redrivePermission parameter to allowAll .
https://docs.aws.amazon.com/cli/latest/reference/sqs/create-queue.html
在 LocalStack SQS
文档中,他们有一个创建 SQS Queue
的示例:
awslocal sqs create-queue --queue-name sample-queue
{
"QueueUrl": "http://localhost:4566/000000000000/sample-queue"
}
所以就拿这个例子来说,创建你的 DLQ
,然后用 --attributes
创建你的 Queue
指向 DLQ RN
.
https://docs.localstack.cloud/aws/sqs/
希望这有助于引导您朝着正确的方向前进,
==== 已编辑 ===
使用 LocalStack 创建队列和 DLQ:
首先创建 DLQ:
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MyDLQ --region eu-west-1
回复:
{
"QueueUrl": "http://localhost:4566/000000000000/MyDLQ"
}
创建一个包含以下内容的 attributes.json
文件:
{
"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:eu-west-1:000000000000:MyDLQ\",\"maxReceiveCount\":\"1000\"}",
"MessageRetentionPeriod": "259200"
}
创建指向attributes.json
文件的主队列:
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MyMainQueue --attributes file://attributes.json --region eu-west-1
回复:
{
"QueueUrl": "http://localhost:4566/000000000000/MyMainQueue"
}