如何在 RabbitMQ 中设置多个队列并使用 MassTransit 3 进行连接?
How to setup multiple queues in RabbitMQ and connect using MassTransit 3?
问题
使用 MassTransit 我看到了连接到队列的示例,如下所示:
var host = x.Host(new Uri("rabbitmq://localhost/Dev_Queue"), h =>
{
h.Username("guest");
h.Password("guest");
});
然而,当我尝试这个时,我不断收到以下异常:
{"Connect failed: guest@localhost:5672/Dev_Queue"}
内心留言:
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"意外异常\", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 一个现有的连接被远程强行关闭host. ---> System.Net.Sockets.SocketException: 一个现有连接被远程 host\r\n 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\r\n at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\r\n --- 内部异常堆栈跟踪结束 ---\r\n at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\r\n 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
如果我不指定队列,那么它工作正常。
完整配置
这会起作用,因为我没有指定队列,只是 url。
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.AutoDelete = false;
x.UseJsonSerializer();
x.UseTransaction();
x.ExchangeType = "direct";
x.Durable = true;
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.UseRetry(Retry.Immediate(2));
});
背景
我们有两台服务器,我们想要 运行 集群以实现高可用性。我能够很好地创建集群并看到跨每个节点复制的队列。
问题是,这些机器 "environments" 是共享的。它们必须共享,因为我们进行 Blue/Green 部署并且我们不知道哪些服务器正在生产中。这就是我想要 Beta 和 Production 队列的原因,但是,在 MassTransit 中,我无法成功指向我希望消息进入的特定队列。
如能提供有关此架构的任何帮助或其他见解,我们将不胜感激。
使用MassTransit,主机只指定主机名和虚拟主机名。因此,在上面的示例中,需要存在一个名为 "Dev_Queue" 的虚拟主机。显然那不是你的意图。
在您的完整示例中,您需要添加用于接收消息的接收端点。
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.AutoDelete = false;
x.UseJsonSerializer();
x.UseTransaction();
x.ExchangeType = "direct";
x.Durable = true;
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.UseRetry(Retry.Immediate(2));
x.ReceiveEndpoint("Dev_Queue", e =>
{
e.Consumer(() => new MyConsumer());
})
});
另外,查看 v2 的迁移文档:
http://docs.masstransit-project.com/en/latest/migrating/index.html
问题
使用 MassTransit 我看到了连接到队列的示例,如下所示:
var host = x.Host(new Uri("rabbitmq://localhost/Dev_Queue"), h =>
{
h.Username("guest");
h.Password("guest");
});
然而,当我尝试这个时,我不断收到以下异常:
{"Connect failed: guest@localhost:5672/Dev_Queue"}
内心留言:
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"意外异常\", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 一个现有的连接被远程强行关闭host. ---> System.Net.Sockets.SocketException: 一个现有连接被远程 host\r\n 在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\r\n at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\r\n --- 内部异常堆栈跟踪结束 ---\r\n at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\r\n 在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\r\n 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
如果我不指定队列,那么它工作正常。
完整配置
这会起作用,因为我没有指定队列,只是 url。
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.AutoDelete = false;
x.UseJsonSerializer();
x.UseTransaction();
x.ExchangeType = "direct";
x.Durable = true;
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.UseRetry(Retry.Immediate(2));
});
背景
我们有两台服务器,我们想要 运行 集群以实现高可用性。我能够很好地创建集群并看到跨每个节点复制的队列。
问题是,这些机器 "environments" 是共享的。它们必须共享,因为我们进行 Blue/Green 部署并且我们不知道哪些服务器正在生产中。这就是我想要 Beta 和 Production 队列的原因,但是,在 MassTransit 中,我无法成功指向我希望消息进入的特定队列。
如能提供有关此架构的任何帮助或其他见解,我们将不胜感激。
使用MassTransit,主机只指定主机名和虚拟主机名。因此,在上面的示例中,需要存在一个名为 "Dev_Queue" 的虚拟主机。显然那不是你的意图。
在您的完整示例中,您需要添加用于接收消息的接收端点。
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.AutoDelete = false;
x.UseJsonSerializer();
x.UseTransaction();
x.ExchangeType = "direct";
x.Durable = true;
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.UseRetry(Retry.Immediate(2));
x.ReceiveEndpoint("Dev_Queue", e =>
{
e.Consumer(() => new MyConsumer());
})
});
另外,查看 v2 的迁移文档:
http://docs.masstransit-project.com/en/latest/migrating/index.html