MassTransit:为多个 API 个实例配置队列
MassTransit: Configure queue for multiple API instances
我正在使用 MassTransit 和 RabbitMQ 在本地主机上使用 MassTransit 和 AWS 在实时环境中,使用 .NET 5 Web APIs.
我有一个 API (#API-1) 生成事件,例如 EntityCreated , EntityUpdated, EntityDeleted, 等等
然后我有另一个 API (#API-2),它存储实体的内存副本。此 #API-2 必须订阅这些事件才能更新其本地缓存。
#API-2 将有多个 运行 实例,比方说 3.
- 我应该如何配置 #API-2 以便 所有实例都收到相同的消息?
生产者配置(#API-1):
services.AddMassTransit(x =>
{
x.UsingRabbitMq();
});
services.AddMassTransitHostedService();
消费者配置(#API-2):
services.AddMassTransit(x =>
{
x.AddConsumer<EntityCreatedConsumer>();
x.AddConsumer<EntityUpdatedConsumer>();
x.AddConsumer<EntityDeletedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
您需要使用不同的端点名称。每个端点都有自己的队列和绑定,因此如果您使用相同的端点名称,您将获得一个队列并且所有实例都开始竞争消息。
这是一个常见的错误,这就是文档中 mentioned 的原因。
要为您的消费者正确配置特定于实例的端点,您可以在添加消费者时使用 Endpoint
配置。我已经更新了您的代码以展示如何使用它。
string instanceId = "SomeUniqueValue";
services.AddMassTransit(x =>
{
x.AddConsumer<EntityCreatedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.AddConsumer<EntityUpdatedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.AddConsumer<EntityDeletedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
instanceId可以从环境中拉取,甚至是随机的。如果希望在进程退出时删除队列,可以指定:
.Endpoint(e =>
{
e.InstanceId = instanceId;
e.Temporary = true;
});
This was added in v7.0.4
我正在使用 MassTransit 和 RabbitMQ 在本地主机上使用 MassTransit 和 AWS 在实时环境中,使用 .NET 5 Web APIs.
我有一个 API (#API-1) 生成事件,例如 EntityCreated , EntityUpdated, EntityDeleted, 等等
然后我有另一个 API (#API-2),它存储实体的内存副本。此 #API-2 必须订阅这些事件才能更新其本地缓存。 #API-2 将有多个 运行 实例,比方说 3.
- 我应该如何配置 #API-2 以便 所有实例都收到相同的消息?
生产者配置(#API-1):
services.AddMassTransit(x =>
{
x.UsingRabbitMq();
});
services.AddMassTransitHostedService();
消费者配置(#API-2):
services.AddMassTransit(x =>
{
x.AddConsumer<EntityCreatedConsumer>();
x.AddConsumer<EntityUpdatedConsumer>();
x.AddConsumer<EntityDeletedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
您需要使用不同的端点名称。每个端点都有自己的队列和绑定,因此如果您使用相同的端点名称,您将获得一个队列并且所有实例都开始竞争消息。
这是一个常见的错误,这就是文档中 mentioned 的原因。
要为您的消费者正确配置特定于实例的端点,您可以在添加消费者时使用 Endpoint
配置。我已经更新了您的代码以展示如何使用它。
string instanceId = "SomeUniqueValue";
services.AddMassTransit(x =>
{
x.AddConsumer<EntityCreatedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.AddConsumer<EntityUpdatedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.AddConsumer<EntityDeletedConsumer>()
.Endpoint(e => e.InstanceId = instanceId);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
instanceId可以从环境中拉取,甚至是随机的。如果希望在进程退出时删除队列,可以指定:
.Endpoint(e =>
{
e.InstanceId = instanceId;
e.Temporary = true;
});
This was added in v7.0.4