带有 RabbitMQ 的 MassTransit 3.0.1(预发布版)无法使用与 2.9.9 相同的 configuration/setup 来处理消息

MassTransit 3.0.1 (prerelease) with RabbitMQ fails to process messages using same configuration/setup as 2.9.9

这是可用的 MT 2.9.9 版本:

public sealed class DiagnosticConsumer : Consumes<DiagnosticMessage>.All
{
    public void Consume(DiagnosticMessage message)
    {
        Console.WriteLine("Got {0} with timestamp {1}", message.Message, message.Timestamp);
    }
}

总线设置:

var bus = ServiceBusFactory.New(sbc =>
{
    sbc.UseRabbitMq(r => r.ConfigureHost(new Uri("rabbitmq://localhost/notifications/tests"),
        c =>
        {
            c.SetUsername("test_user");
            c.SetPassword("testuser123");
        }));
    sbc.ReceiveFrom("rabbitmq://localhost/notifications/tests");
    sbc.Subscribe(s =>
                  {
                      s.Consumer<DiagnosticConsumer>();
                  });
});


using (bus)
{
    bus.Publish(new DiagnosticMessage { Message = "Test msg", Timestamp = DateTimeOffset.Now });
    Console.WriteLine("Published!");
    Console.ReadLine();
}   

这按预期工作并且在使用 RabbitMQ Management 时我可以看到以下内容:

现在,使用 MT 3.0.1-alpha 以及 MassTransit.RabbitMQ 3.0.1-alpha 和以下设置:

public sealed class DiagnosticConsumer : IConsumer<DiagnosticMessage>
{
    public async Task Consume(ConsumeContext<DiagnosticMessage> context)
    {
        Console.WriteLine("Got message: {0}", context.Message);
    }
}

总线配置:

public static async Task RunQueue()
{
    var bus = Bus.Factory
                 .CreateUsingRabbitMq(c =>
                 {
                     var host =
                         c.Host(
                             new Uri(
                                 "rabbitmq://localhost/notifications"),
                             conf =>
                             {
                                 conf.Username("test_user");
                                 conf.Password("testuser123");
                             });
                     c.ReceiveEndpoint(host, "tests", conf =>
                     {
                         conf.Consumer<DiagnosticConsumer>();
                     });
                 });

    using (var handle = await bus.Start())
    {
        await bus.Publish(new DiagnosticMessage{ Message = "Pinging!", Timestamp = DateTimeOffset.Now});
        Console.WriteLine("Waiting to finish...");
        Console.ReadLine();
        await handle.Stop();
    }
}

在这种情况下,实际上什么都没有发生,消费者永远不会收到消息,管理控制台也讲述了一个不同的故事:

在这两种情况下,队列和设置完全相同,这本身非常简单。我这边有什么问题需要修复才能使用最新版本的 MassTransit,还是只是 alpha 版本的错误?

我找到了我自己问题的一半答案。

如果通过 ISendEndpoint 完成,显然是发布作品:

var address = new Uri("rabbitmq://localhost/notifications/tests");
var endpoint = await bus.GetSendEndpoint(address);
await endpoint.Send(new DiagnosticMessage{ Message = "Pinging!", Timestamp = DateTimeOffset.Now});

在这种情况下,消息传递成功。

也就是说,我不确定 Publish 的当前形式有什么意义,或者为什么通过它推送的消息没有传递,尤其是在使用 ConsumeContext<> 时在消费者内部。如果您想安排消息,它似乎也会导致问题,因为它们是使用 Publish.

安排的

直到行为发生改变或其他人对为什么所有这一切发生有了更好的答案,我想我找到了解决我眼前问题的方法。

[编辑]

正如 Chris Patterson 在 中指出的那样,消息的交换类型未绑定到队列。通过管理控制台手动设置 Publish 工作。

已经是第二次报告此类问题了,请问您可以检查消息类型与队列的绑定情况吗?我相信 alpha 有一个问题,默认情况下发布的消息类型没有正确绑定到队列。另一个用户上周报告了同样的事情,它在我的名单上,要用一个干净的虚拟主机进行调查。

由于 Send 有效,而 Publish 无效,这可能是问题所在。