如何在 RabbitMQ 中设置多个队列并使用 MassTransit 3 进行连接?

How to setup multiple queues in RabbitMQ and connect using MassTransit 3?

问题

使用 MassTransit 我看到了连接到队列的示例,如下所示:

var host = x.Host(new Uri("rabbitmq://localhost/Dev_Queue"), h =>
{
    h.Username("guest");
    h.Password("guest");
});

然而,当我尝试这个时,我不断收到以下异常:

{"Connect failed: guest@localhost:5672/Dev_Queue"}

内心留言:

{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"意外异常\", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 一个现有的连接被远程强行关闭host. ---> System.Net.Sockets.SocketException: 一个现有连接被远程 host\r\n 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\r\n at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\r\n --- 内部异常堆栈跟踪结束 ---\r\n at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\r\n 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}

如果我不指定队列,那么它工作正常。

完整配置

这会起作用,因为我没有指定队列,只是 url。

var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
    x.AutoDelete = false;
    x.UseJsonSerializer();
    x.UseTransaction();
    x.ExchangeType = "direct";
    x.Durable = true;
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });
    x.UseRetry(Retry.Immediate(2));
});

背景

我们有两台服务器,我们想要 运行 集群以实现高可用性。我能够很好地创建集群并看到跨每个节点复制的队列。

问题是,这些机器 "environments" 是共享的。它们必须共享,因为我们进行 Blue/Green 部署并且我们不知道哪些服务器正在生产中。这就是我想要 Beta 和 Production 队列的原因,但是,在 MassTransit 中,我无法成功指向我希望消息进入的特定队列。

如能提供有关此架构的任何帮助或其他见解,我们将不胜感激。

使用MassTransit,主机只指定主机名和虚拟主机名。因此,在上面的示例中,需要存在一个名为 "Dev_Queue" 的虚拟主机。显然那不是你的意图。

在您的完整示例中,您需要添加用于接收消息的接收端点。

var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
    x.AutoDelete = false;
    x.UseJsonSerializer();
    x.UseTransaction();
    x.ExchangeType = "direct";
    x.Durable = true;
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });
    x.UseRetry(Retry.Immediate(2));

    x.ReceiveEndpoint("Dev_Queue", e =>
    {
        e.Consumer(() => new MyConsumer());
    })
});

另外,查看 v2 的迁移文档:

http://docs.masstransit-project.com/en/latest/migrating/index.html