使用 C# 和 RabbitMQ 在 MassTransit v3 中实现仅发布总线

Implementing Publish Only Bus in MassTransit v3 with C# and RabbitMQ

我正在尝试使用 C# 和 RabbitMQ 在 MassTransit v3 中实现一个仅发布总线,其中该总线没有消费者。这个概念是消息将被发布和排队,然后一个单独的微服务将从队列中消费消息。查看 , receive endpoints must be specified so that messages are actually queued. However this appears to contradict the common gotchas in the MassTransit docs,其中说明 If you need to only send or publish messages, don’t create any receive endpoints

下面是一些示例代码:

    public class Program
    {
        static void Main(string[] args)
        {
            var bus = BusConfigurator.ConfigureBus();

            bus.Start();

            bus.Publish<IItemToQueue>(new ItemToQueue { Text = "Hello World" }).Wait();

            Console.ReadKey();

            bus.Stop();
        }
    }

    public static class BusConfigurator
    {
        public static IBusControl ConfigureBus()
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst =>
                {
                    hst.Username("guest");
                    hst.Password("guest");
                });

                cfg.ReceiveEndpoint(host, "queuename", e =>
                {
                    e.Consumer<MyConsumer>();
                });
            });

            return bus;
        }
    }

    public interface IItemToQueue
    {
        string Text { get; set; }
    }

    public class ItemToQueue : IItemToQueue
    {
        public string Text { get; set; }
    }

    public class MyConsumer : IConsumer<IItemToQueue>
    {
        public async Task Consume(ConsumeContext<IItemToQueue> context)
        {
            await Console.Out.WriteLineAsync(context.Message.Text);
        }
    }

在此示例中,我按预期收到了 RabbitMQ 队列中的消息,这被 MyConsumer 使用,后者将 Hello World 写入控制台,然后从队列中删除消息。

但是,当我从上面删除以下代码并重新运行示例时:

cfg.ReceiveEndpoint(host, RabbitMqConstants.ValidationQueue, e =>
{
    e.Consumer<MyConsumer>();
}); 

创建了一个临时队列(使用生成的名称)并且邮件似乎从未被放入临时队列。当总线停止时,此队列将被删除。

我遇到的问题是指定了 ReceiveEndpoint,消息将在发布程序中被消耗并从队列中删除(这意味着消费者微服务不会处理排队的项目)。如果没有指定 RecieveEndpoint,将使用一个临时队列(消费者微服务不会知道这个临时队列的名称),消息似乎永远不会排队,并且在总线停止时删除队列,如果程序失败了。

an example of a send only bus in the MassTransit docs 但它非常基础,所以我想知道是否有人有任何建议?

接收端点应该在您的服务中,与仅发布应用程序分开。这样,该服务将具有接收端点并使用应用程序发布的消息。

如果应用程序中有接收端点,应用程序将使用消息,因为它具有在接收端点中指定的相同队列名称。

您需要做的是创建另一个具有相同配置(包括接收端点)的服务,并将接收端点从您的应用程序中取出。届时,该服务将具有接收端点并使用队列中的消息。即使服务停止,消息也会继续传送到队列,一旦服务启动,它们就会开始消费。