SQS/SNS 的轨道交通。发布到 SNS?

MassTransit with SQS/SNS. Publish into SNS?

有一个 official 个 MassT 的例子运行与 SQS 坐在一起。 “总线”配置为使用 SQS (x.UsingAmazonSqs)。接收端点是一个 SQS,它又订阅了一个 SNS 主题。但是没有示例如何发布到 SNS。

  1. 如何发布成SNS话题?
  2. 如何配置 SQS/SNS 以使用 http,因为我是针对 localstack 开发的?

AWS SDK 版本:

var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };

更新:

在 Chris 的参考和配置实验之后,我为 'localstack' SQS/SNS 提出了以下建议。此配置执行无误,Worker 被调用,并将消息发布到总线。然而,消费者 class 没有被触发,并且消息似乎没有进入队列(或者更确切地说是主题)。

public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.AddConsumer<MessageConsumer>();
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);

            h.EnableScopedTopics();
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s =>
            {
            });
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
 

更新 2:

当我查看 sns 订阅时,我发现第一个是通过 aws cli 手动创建和订阅的端点正确,而第二个是由 MassT运行sit 库创建的端点不正确。如何为SQS队列配置Endpoint?

$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
    "Subscriptions": [
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "http://localhost:4566/000000000000/deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        },
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        }

更新 3:

我已经克隆了该项目,运行 对该项目进行了一些针对 AmazonSQS 总线配置的单元测试,消费者似乎无法正常工作。

当我在测试后列出订阅时 运行 我可以看出端点不正确。

...
{
    "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
    "Owner": "",
    "Protocol": "sqs",
    "Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
    "TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...

会不会是 AmazonSQS for localstack 存在重大错误?

尚不清楚如何将库与 'localstack' sqs 一起使用,如何指出 SQS 队列的实际端点 (QueueUrl)。

只要在 MassTransit 中调用 Publish,消息就会发布到 SNS。然后将这些消息路由到配置的接收端点。将 MassTransit 与亚马逊一起使用时,无需了解 SQS 或 SNS SQS/SNS。

在 MassTransit 中,您创建消费者,这些消费者使用消息类型,MassTransit 根据需要配置 topics/queues。使用 RabbitMQ、Azure 服务总线等的任何示例都可以通过将 UsingRabbitMq 更改为 UsingAmazonSqs(并添加适当的 NuGet 包)轻松转换为 SQS。

看起来您的配置设置正确,可以发布,但我能想到您收不到消息的原因至少有以下几个:

  1. 当前版本的 localstack 有问题。我不得不使用 0.11.2 - 请参阅
  2. 您正在发布到不同的主题。 Masstransit 将使用消息类型的名称创建主题。这可能与您在接收端点上配置的主题不匹配。您可以通过配置拓扑更改主题名称 - 请参阅
  3. 您的消费者未在接收端点上配置 - 请参阅下面的示例
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s => {});
            e.Consumer<MessageConsumer>();
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();

根据我在有关 Consumers 的文档中看到的内容,您应该能够像您的原始示例一样将您的消费者添加到 AddMastTransit 配置中,但它对我不起作用。