使用来自一个 rabbitmq 主机的消息并使用 mass transit 和 .net core 发布到不同的 rabbitmq 主机

Consume a message from one rabbitmq host and publish to a different rabbitmq host using mass transit and .net core

我似乎在使用不同的 rabbitmq 主机发布/使用消息时遇到了问题。最初,发布/订阅发生在一台 rabbitmq 主机上。决定创建另一个主机(位于不同的服务器上),从那以后我 运行 遇到了一些问题。我已经阅读了多总线文档 (https://masstransit-project.com/usage/containers/multibus.html) 并尝试实现它,以便我可以有一个总线用于生成消息和一个总线用于使用消息。请在下面找到代码:

--- Startup.cs ---

services.AddMassTransit(x =>
{
    var section = configuration.GetSection("Rabbit1");

    x.AddConsumer<SomeConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
           e.ConfigureConsumer<SomeConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransit<ISecondBus>(x =>
{
    var section = configuration.GetSection("Rabbit2");

    x.AddConsumer<SomeOtherConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
            e.PrefetchCount = 4;
            e.UseMessageRetry(r => r.Interval(10, 100));

            e.ConfigureConsumer<SomeOtherConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransitHostedService();          

--- ISecondBus.cs ---

public interface ISecondBus : IBus
{
}

--- Consumer.cs ---

public class SomeConsumer : IConsumer<MessageObjectList>
{
   private readonly IBus _bus;

   public SomeConsumer(IBus bus)
   {
     _bus = bus;
   }

   public async Task Consume(ConsumeContext<MessageObjectList> context)
   {
       var message = context.Message;

       //Aggregate data

       var newList = new MessageObjectList
                    {
                        Message = message
                    };

      var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
      await endpoint.Send(newList);
   }
}

以上代码运行良好,成功消费了来自rabbit1的消息。但是,当向Rabbit2发送消息时,我可以看到正在创建队列,但是没有数据被持久化到队列中。

见下方第二个消费者:

--- Startup.cs ---

services.AddMassTransit(x =>
{
    var section = configuration.GetSection("Rabbitmq2");

    x.AddConsumer<SomeOtherConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
            e.ConfigureConsumer<SomeOtherConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransitHostedService();

--- 其他Consumer.cs ---

public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
     public async Task Consume(ConsumeContext<MessageObjectList> context)
     {
        //Persist data to db
     }
}

这里的问题是 rabbit2 的队列永远不会被填充(但它确实被创建)所以没有消息被第二个消费者消费。

如有任何帮助,我们将不胜感激。

如果要从第一条总线发送到第二条总线,需要在第一条总线上的consumer中使用第二条总线接口。

public class SomeConsumer : IConsumer<MessageObjectList>
{
   private readonly ISecondBus _bus;

   public SomeConsumer(ISecondBus bus)
   {
     _bus = bus;
   }

   public async Task Consume(ConsumeContext<MessageObjectList> context)
   {
       var message = context.Message;

       //Aggregate data

       var newList = new MessageObjectList
                    {
                        Message = message
                    };

      var endpoint = await _bus.GetSendEndpoint(new Uri("queue:" + QUEUE_NAME));
      await endpoint.Send(newList);
   }
}