spring 引导中的 AWS SQS、DLQ

AWS SQS, DLQ in spring boot

如何将 DLQ 配置添加到我的 SQS 配置?我不确定如何将 DLQ 与现有队列集成。我使用的是 aws 消息传递而不是 JMS,因此我的侦听器方法的注释将是 @SQSListener。我的配置 class 具有以下内容

@Bean
    public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setMaxNumberOfMessages(10);
        SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
        simpleMessageListenerContainer.setQueueStopTimeout(queueStopTimeout*1000);
        simpleMessageListenerContainer.setMessageHandler(messageHandler(amazonSQSAsync));
        return simpleMessageListenerContainer;
    }

    @Bean
    public QueueMessageHandler messageHandler(AmazonSQSAsync amazonSQSAsync) {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
        QueueMessageHandler messageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return messageHandler;
    }

    @Bean
    public AmazonSQSAsync awsSqsAsync() {
        AmazonSQSAsyncClient amazonSQSAsyncClient  = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
        amazonSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(region)));
        return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient);
    }

我找不到任何正确的文档来正确设置重试,以便如果重试超过阈值,消息应该进入死信队列

如果我没记错的话,设置 maximum retriesassociated DLQ 是在 Broker 端完成的,并且不可配置为侦听器的一部分。

然后在您的代码中,您将执行如下操作:

@SqsListener(value = "MainQueue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, @Header("SenderId") String senderId, Acknowledgment ack) throws IOException {

  ack.acknowledge();  

}

@SqsListener(value = "DLQ-AssociatedWithMain")
public void receiveDlq(String message) throws IOException {

}

如果消息未被确认,则将重试指定的最长期限,然后发送到 DLQ。

=== 已编辑 ===

以下 LocalStack 是建议(从未测试过),但是 LocalStack(免费版)目前应该支持 AWS CLI:

因此,如果您查看 AWS CLI:您使用 aws create-queue 创建队列,如果您想指定 DLQ 信息,则使用 --attributes,但我相信您必须在引用 RN 之前还要创建 DLQ 队列。

  create-queue
--queue-name <value>
[--attributes <value>]
[--tags <value>]
[--cli-input-json <value>]
[--generate-cli-skeleton <value>]

DLQ 属性详细信息:

以下属性仅适用于死信队列:

RedrivePolicy – The string that includes the parameters for the dead-letter queue functionality of the source queue as a JSON object. The parameters are as follows:
    deadLetterTargetArn – The Amazon Resource Name (ARN) of the dead-letter queue to which Amazon SQS moves messages after the value of maxReceiveCount is exceeded.
    maxReceiveCount – The number of times a message is delivered to the source queue before being moved to the dead-letter queue. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, Amazon SQS moves the message to the dead-letter-queue.
RedriveAllowPolicy – The string that includes the parameters for the permissions for the dead-letter queue redrive permission and which source queues can specify dead-letter queues as a JSON object. The parameters are as follows:
    redrivePermission – The permission type that defines which source queues can specify the current queue as the dead-letter queue. Valid values are:
        allowAll – (Default) Any source queues in this Amazon Web Services account in the same Region can specify this queue as the dead-letter queue.
        denyAll – No source queues can specify this queue as the dead-letter queue.
        byQueue – Only queues specified by the sourceQueueArns parameter can specify this queue as the dead-letter queue.
    sourceQueueArns – The Amazon Resource Names (ARN)s of the source queues that can specify this queue as the dead-letter queue and redrive messages. You can specify this parameter only when the redrivePermission parameter is set to byQueue . You can specify up to 10 source queue ARNs. To allow more than 10 source queues to specify dead-letter queues, set the redrivePermission parameter to allowAll .

https://docs.aws.amazon.com/cli/latest/reference/sqs/create-queue.html



LocalStack SQS 文档中,他们有一个创建 SQS Queue 的示例:

awslocal sqs create-queue --queue-name sample-queue
{
    "QueueUrl": "http://localhost:4566/000000000000/sample-queue"
}

所以就拿这个例子来说,创建你的 DLQ,然后用 --attributes 创建你的 Queue 指向 DLQ RN.

https://docs.localstack.cloud/aws/sqs/

希望这有助于引导您朝着正确的方向前进,

==== 已编辑 ===

使用 LocalStack 创建队列和 DLQ:

首先创建 DLQ:

aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MyDLQ --region eu-west-1

回复:

{
    "QueueUrl": "http://localhost:4566/000000000000/MyDLQ"
}

创建一个包含以下内容的 attributes.json 文件:

{
  "RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:eu-west-1:000000000000:MyDLQ\",\"maxReceiveCount\":\"1000\"}",
  "MessageRetentionPeriod": "259200"
}

创建指向attributes.json 文件的主队列:

aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MyMainQueue --attributes file://attributes.json --region eu-west-1

回复:

{
    "QueueUrl": "http://localhost:4566/000000000000/MyMainQueue"
}