SQS/SNS 的轨道交通。发布到 SNS?
MassTransit with SQS/SNS. Publish into SNS?
有一个 official 个 MassT 的例子运行与 SQS 坐在一起。 “总线”配置为使用 SQS (x.UsingAmazonSqs)。接收端点是一个 SQS,它又订阅了一个 SNS 主题。但是没有示例如何发布到 SNS。
- 如何发布成SNS话题?
- 如何配置 SQS/SNS 以使用 http,因为我是针对 localstack 开发的?
AWS SDK 版本:
var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };
更新:
在 Chris 的参考和配置实验之后,我为 'localstack' SQS/SNS 提出了以下建议。此配置执行无误,Worker 被调用,并将消息发布到总线。然而,消费者 class 没有被触发,并且消息似乎没有进入队列(或者更确切地说是主题)。
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
h.EnableScopedTopics();
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s =>
{
});
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
更新 2:
当我查看 sns 订阅时,我发现第一个是通过 aws cli 手动创建和订阅的端点正确,而第二个是由 MassT运行sit 库创建的端点不正确。如何为SQS队列配置Endpoint?
$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
"Subscriptions": [
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "http://localhost:4566/000000000000/deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
},
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
}
更新 3:
我已经克隆了该项目,运行 对该项目进行了一些针对 AmazonSQS 总线配置的单元测试,消费者似乎无法正常工作。
当我在测试后列出订阅时 运行 我可以看出端点不正确。
...
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...
会不会是 AmazonSQS for localstack 存在重大错误?
尚不清楚如何将库与 'localstack' sqs 一起使用,如何指出 SQS 队列的实际端点 (QueueUrl)。
只要在 MassTransit 中调用 Publish,消息就会发布到 SNS。然后将这些消息路由到配置的接收端点。将 MassTransit 与亚马逊一起使用时,无需了解 SQS 或 SNS SQS/SNS。
在 MassTransit 中,您创建消费者,这些消费者使用消息类型,MassTransit 根据需要配置 topics/queues。使用 RabbitMQ、Azure 服务总线等的任何示例都可以通过将 UsingRabbitMq
更改为 UsingAmazonSqs
(并添加适当的 NuGet 包)轻松转换为 SQS。
看起来您的配置设置正确,可以发布,但我能想到您收不到消息的原因至少有以下几个:
- 当前版本的 localstack 有问题。我不得不使用 0.11.2 - 请参阅
- 您正在发布到不同的主题。 Masstransit 将使用消息类型的名称创建主题。这可能与您在接收端点上配置的主题不匹配。您可以通过配置拓扑更改主题名称 - 请参阅
- 您的消费者未在接收端点上配置 - 请参阅下面的示例
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s => {});
e.Consumer<MessageConsumer>();
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
根据我在有关 Consumers 的文档中看到的内容,您应该能够像您的原始示例一样将您的消费者添加到 AddMastTransit
配置中,但它对我不起作用。
有一个 official 个 MassT 的例子运行与 SQS 坐在一起。 “总线”配置为使用 SQS (x.UsingAmazonSqs)。接收端点是一个 SQS,它又订阅了一个 SNS 主题。但是没有示例如何发布到 SNS。
- 如何发布成SNS话题?
- 如何配置 SQS/SNS 以使用 http,因为我是针对 localstack 开发的?
AWS SDK 版本:
var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };
更新:
在 Chris 的参考和配置实验之后,我为 'localstack' SQS/SNS 提出了以下建议。此配置执行无误,Worker 被调用,并将消息发布到总线。然而,消费者 class 没有被触发,并且消息似乎没有进入队列(或者更确切地说是主题)。
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
h.EnableScopedTopics();
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s =>
{
});
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
更新 2:
当我查看 sns 订阅时,我发现第一个是通过 aws cli 手动创建和订阅的端点正确,而第二个是由 MassT运行sit 库创建的端点不正确。如何为SQS队列配置Endpoint?
$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
"Subscriptions": [
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "http://localhost:4566/000000000000/deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
},
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
}
更新 3:
我已经克隆了该项目,运行 对该项目进行了一些针对 AmazonSQS 总线配置的单元测试,消费者似乎无法正常工作。
当我在测试后列出订阅时 运行 我可以看出端点不正确。
...
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
"Owner": "",
"Protocol": "sqs",
"Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...
会不会是 AmazonSQS for localstack 存在重大错误?
尚不清楚如何将库与 'localstack' sqs 一起使用,如何指出 SQS 队列的实际端点 (QueueUrl)。
只要在 MassTransit 中调用 Publish,消息就会发布到 SNS。然后将这些消息路由到配置的接收端点。将 MassTransit 与亚马逊一起使用时,无需了解 SQS 或 SNS SQS/SNS。
在 MassTransit 中,您创建消费者,这些消费者使用消息类型,MassTransit 根据需要配置 topics/queues。使用 RabbitMQ、Azure 服务总线等的任何示例都可以通过将 UsingRabbitMq
更改为 UsingAmazonSqs
(并添加适当的 NuGet 包)轻松转换为 SQS。
看起来您的配置设置正确,可以发布,但我能想到您收不到消息的原因至少有以下几个:
- 当前版本的 localstack 有问题。我不得不使用 0.11.2 - 请参阅
- 您正在发布到不同的主题。 Masstransit 将使用消息类型的名称创建主题。这可能与您在接收端点上配置的主题不匹配。您可以通过配置拓扑更改主题名称 - 请参阅
- 您的消费者未在接收端点上配置 - 请参阅下面的示例
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};
...
services.AddMassTransit(x =>
{
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.Config(AmazonSQSConfig);
h.Config(AmazonSnsConfig);
});
cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
{
e.Subscribe("deal-topic", s => {});
e.Consumer<MessageConsumer>();
});
});
});
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
根据我在有关 Consumers 的文档中看到的内容,您应该能够像您的原始示例一样将您的消费者添加到 AddMastTransit
配置中,但它对我不起作用。