MassTransit:为多个 API 个实例配置队列

MassTransit: Configure queue for multiple API instances

我正在使用 MassTransitRabbitMQ 在本地主机上使用 MassTransitAWS 在实时环境中,使用 .NET 5 Web APIs.

我有一个 API (#API-1) 生成事件,例如 EntityCreated , EntityUpdated, EntityDeleted, 等等

然后我有另一个 API (#API-2),它存储实体的内存副本。此 #API-2 必须订阅这些事件才能更新其本地缓存。 #API-2 将有多个 运行 实例,比方说 3.

生产者配置(#API-1):

services.AddMassTransit(x =>
{
    x.UsingRabbitMq();
});
services.AddMassTransitHostedService();

消费者配置(#API-2):

services.AddMassTransit(x =>
{
    x.AddConsumer<EntityCreatedConsumer>();
    x.AddConsumer<EntityUpdatedConsumer>();
    x.AddConsumer<EntityDeletedConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransitHostedService();

您需要使用不同的端点名称。每个端点都有自己的队列和绑定,因此如果您使用相同的端点名称,您将获得一个队列并且所有实例都开始竞争消息。

这是一个常见的错误,这就是文档中 mentioned 的原因。

要为您的消费者正确配置特定于实例的端点,您可以在添加消费者时使用 Endpoint 配置。我已经更新了您的代码以展示如何使用它。

string instanceId = "SomeUniqueValue";

services.AddMassTransit(x =>
{
    x.AddConsumer<EntityCreatedConsumer>()        
        .Endpoint(e => e.InstanceId = instanceId);
    x.AddConsumer<EntityUpdatedConsumer>()
        .Endpoint(e => e.InstanceId = instanceId);
    x.AddConsumer<EntityDeletedConsumer>()
        .Endpoint(e => e.InstanceId = instanceId);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransitHostedService();

instanceId可以从环境中拉取,甚至是随机的。如果希望在进程退出时删除队列,可以指定:

.Endpoint(e =>
{
    e.InstanceId = instanceId;
    e.Temporary = true;
});

This was added in v7.0.4