MassTransit 在单个服务器上消耗

MassTransit consumed on single server

我正在努力使用 RabbitMQ 配置 MassTransit 以发布消息和订阅队列。我认为这是一个需要完成的简单配置更改。我连接了多个服务,但消息交替使用,永远不会在另一台服务器上传递/使用。

我希望每条消息都能传送到每个连接/订阅者。

我 运行 在 ASP.net 核心 6 上使用最新版本的 MassTransit。

 services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);
        services.AddMassTransit(cfg =>
   {
       cfg.AddBus(context => Bus.Factory.CreateUsingRabbitMq(c =>
       {
            c.Host(connectionString, c =>
            {
                c.Heartbeat(10);
            });
            c.ConfigureEndpoints(
                 context, 
                 KebabCaseEndpointNameFormatter.Instance);

            c.Publish<VideoManagerResultEvent>(x =>
            {
                x.BindQueue("result", "video-msgs");
                x.ExchangeType = Fanout;
            });

            c.ReceiveEndpoint("result:video-msgs", e =>
            {
                e.Consumer<VideoManagerResultConsumer>();
            });
        }));

        // Request clients / DTO
        RegisterRequestClients(cfg);
    });

    services.AddMassTransitHostedService();
  }




 private static void RegisterRequestClients(IServiceCollectionBusConfigurator cfg)
 {
    cfg.AddRequestClient<VideoManagerResultEvent>();
 }




 // consumer
 public class VideoManagerResultConsumer : BaseConsumer<VideoManagerResultEvent>
 {
      public override async Task Consume(ConsumeContext<VideoManagerResultEvent> context)
     {
         Logger.Debug("Consumed video event");
         await context.RespondAsync(new GenericResponse());
      }
 }

我调用“SendMessage()”向 RabbitMQ 发布消息。

// constructor and DI
public EventBusV2(IPublishEndpoint publishEndpoint)
{
    _publishEndpoint = publishEndpoint;
}

public async Task SendMessage()
{
    await _publishEndpoint.Publish<VideoManagerResultEvent>(msg);
}

要添加到原始问题的图表显示当前正在发生的事情。

图 1 - 请求或消息被发布到事件总线,但只被传送到消费者的一个实例。

图 2 - 消息发布到消费者的两个实例所需的结果。

RaabitMQ 队列

您唯一需要的配置如下:

services.AddMassTransit(cfg =>
{
    cfg.AddConsumer<VideoManagerResultConsumer>()
        .Endpoint(e => e.InstanceId = "Web1"); // or 2, etc.

    cfg.SetKebabCaseEndpointNameFormatter();

    cfg.UsingRabbitMq((context, c) =>
    {
        c.Host(connectionString, c =>
        {
            c.Heartbeat(10);
        });

        c.ConfigureEndpoints(context);
    });

    // Request clients / DTO
    RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();

类型为 VideoManagerResultEvent 的任何已发布消息都将根据消费者名称在队列 video-manager-result 中结束。