SNS FIFO 主题不会将消息扇出到 SQS FIFO 队列

SNS FIFO Topic Doesn't fan out messages to SQS FIFO queue

我正在尝试使用 SQS FIFO 队列探索 SNS FIFO 主题,这是我简单尝试过的。我创建了 SNS FIFO 主题和 SQS FIFO 队列,并将 FIFO 队列订阅到 FIFO 主题。根据文档,对于上述设置,每当我们将消息发布到 SNS FIFO 队列时,它应该将该消息扇出到 SQS 队列,但它没有发生。我能够获得 PublishResult#getMessageId() 意味着发布部分正在成功进行,但队列中没有任何消息。由于 SNS FIFO 主题不支持电子邮件协议订阅,我断言此发布-订阅架构的唯一可用方法是从队列中轮询消息。由于没有发生扇出,队列似乎总是空的。

完整代码块:

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.UUID;

class FifoTopicsITest {

    @Test
    void test() {
        final String topicName = UUID.randomUUID().toString().substring(15);
        //creating sns client
        AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        "<accessKey>", "<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
                        "us-west-1")).build();
        //creating sqs client
        AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                "<accessKey>", "<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
                        "us-west-1")).build();

        //creating SNS topic
        CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
        createTopicRequest
                .addAttributesEntry("FifoTopic", "true")
                .addAttributesEntry("ContentBasedDeduplication", "false");
        CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
        String topicArn = topicResult.getTopicArn();

        //creating dead-letter sqs queue
        CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
        createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
        createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
        createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
        CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);

        //getting ARN value of dead-letter queue
        GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
                        .withAttributeNames("QueueArn"));
        String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //creating sqs queue
        CreateQueueRequest createQueueRequest = new CreateQueueRequest();
        createQueueRequest.addAttributesEntry("FifoQueue", "true");
        createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
        createQueueRequest.withQueueName(topicName + ".fifo");
        String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
                + deleteQueueArn + "\"}";
        createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
        CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
        String queueUrl = createQueueResult.getQueueUrl();

        //getting ARN value of queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("QueueArn"));
        String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //Subscribe FIFO queue to FIFO Topic
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.withProtocol("sqs")
                .withTopicArn(topicArn)
                .withEndpoint(queueArn);
        SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
        Assertions.assertNotNull(subscribeResult.getSubscriptionArn());

        //Publishing 4 sample message to FIFO SNS Topic
        for (int i = 0; i < 5; i++) {
            PublishRequest publishRequest = new PublishRequest()
                    .withTopicArn(topicArn)
                    .withMessage("Test Message" + i)
                    .withMessageGroupId(topicName)
                    .withMessageDeduplicationId(UUID.randomUUID().toString());
            PublishResult publishResult = amazonSNS.publish(publishRequest);
            Assertions.assertNotNull(publishResult.getMessageId());
        }

        //Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("All"));
        String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
                                                     .get("ApproximateNumberOfMessages");

        //My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
        Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
    }
}

SNS 访问策略(权限)

{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish",
        "SNS:Receive"
      ],
      "Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
      "Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<account>"
        }
      }
    }
  ]
}

SQS 访问策略(权限)


{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

我错过了什么?为什么消息不存在于 SQS 队列中。我应该对 SQS 队列权限执行以下操作吗?

{
  "Id": "Policy1611770719125",
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt1611770707743",
      "Action": [
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueueTags",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:SendMessageBatch",
        "sqs:SetQueueAttributes"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
      "Principal": {
        "AWS": "*"
      }
    }
  ]
}

为后代分享我的答案,怀疑实际问题与 Access Policy 有关,当我们创建 FIFO SNS 队列并使用 AWS SDK V1 将其订阅到 SQS FIFO 队列时,默认访问策略将是以下

{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

即使我尝试使用 AWS SDK v2 link 创建 SQS FIFO 队列,上述访问策略也是相同的。因此,当我如下手动更改访问策略时,问题已解决并且 FIFO SNS 主题扇出按照指定发生:

{
  "Statement": [
    {
      "Action": [
        "sqs:*"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
      "Principal": {
        "AWS": "*"
      }
    }
  ]
}

为每个 FIFO 队列添加上述 Access policy 的代码块:

Policy policy = new Policy().withStatements(
        new Statement(Statement.Effect.Allow)
                .withPrincipals(Principal.AllUsers)
                .withResources(new Resource(queueArn))
                .withActions(SQSActions.AllSQSActions));

Map<String, String> policyQueueAttributes = new HashMap<>();
policyQueueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, policyQueueAttributes));

在创建 SQS FIFO 队列后添加上面的代码块最终解决了问题。

如果您正在寻找非编程解决方案并且更愿意使用 AWS 控制台,请查看 AWS 官方文档(下面的 link)中的 'How to Use SNS FIFO Topics' 部分,他们在其中提到您需要向您尝试扇出的 FIFO SQS 队列的访问策略添加语句。这个新添加的语句授予 FIFO SNS 主题向队列发送消息的权限。

使用他们的示例,如果您有一个名为 updates.fifo 的 FIFO SNS 主题,并且您在订阅队列后尝试将消息分散到两个队列 customer.fifo 和 loyalty.fifo对于主题,您将导航到控制台中的 customer.fifo 队列并通过添加上述语句来编辑访问策略:

{
    "Effect": "Allow",
    "Principal": {
        "Service": "sns.amazonaws.com"
    },
    "Action": "SQS:SendMessage",
    "Resource": "arn:aws:sqs:us-east-2:123412341234:customer.fifo",
    "Condition": {
        "ArnLike": {
            "aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
        }
    }
}

这也适用于 loyalty.fifo 队列:

{
    "Effect": "Allow",
    "Principal": {
        "Service": "sns.amazonaws.com"
    },
    "Action": "SQS:SendMessage",
    "Resource": "arn:aws:sqs:us-east-2:123412341234:loyalty.fifo",
    "Condition": {
        "ArnLike": {
            "aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
        }
    }
}

资源:Introducing Amazon SNS FIFO