使用 rabbitmq 的 masstransit:为什么消息在发布时自动移动到 _skipped 队列
masstransit with rabbitmq: Why message auto moved to _skipped queue when it published
MassTransit.3.1.2 MassTransit.Autofac.3.1.1 MassTransit.RabbitMQ.3.1.1 RabbitMQ.Client.3.6.0 Topshelf.3.3.1
一个 Topshelf Windows 服务,像这样创建一个总线实例:
var builder = new ContainerBuilder();
builder.RegisterConsumers(Assembly.GetExecutingAssembly());
builder.Register<IBusControl>(context =>
{
return Bus.Factory.CreateUsingRabbitMq(rbmq =>
{
var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
{
h.Username(BusConfig.Instance.UserName);
h.Password(BusConfig.Instance.Password);
});
rbmq.UseJsonSerializer();
rbmq.UseNLog();
rbmq.ReceiveEndpoint(BusConfig.Instance.Queue, edp =>
{
edp.UseRetry(Retry.Incremental(5, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)));
edp.LoadFrom(context);
});
});
}).SingleInstance().As<IBusControl>().As<IBus>();
return builder.Build().Resolve<IBusControl>();
一个这样的控制台应用程序:
var bus = Bus.Factory.CreateUsingRabbitMq(rbmq =>
{
var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
{
h.Username(BusConfig.Instance.UserName);
h.Password(BusConfig.Instance.Password);
});
});
bus.Start();
var msg = new OrderCreatedMessage() { OrderId = 10102 };
bus.Publish<OrderCreatedMessage>(msg);
当控制台应用程序发布消息时,名为“BusConfig.Instance.Queue”的Rabbitmq队列收到OrderCreatedMessage:
Rabbitmq queue view when message published
然后我启动了topshelf服务,OrderCreatedMessage自动移除到_skipped队列:
Rabbitmq queue view when topshelf service started
这样的公共交通日志:
MOVE:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue?prefetch=8:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue_skipped?bind=true&queue=zst.order.queue_skipped:N\/A:Moved
但是,当我发布消息并使用同一总线(Topshelf 服务)使用消息时,它有效!!!
如能提供有关此架构的任何帮助或额外见解,我们将不胜感激!
我猜它在为 OrderCreatedMessage
消息创建消费者时遇到问题。
您是否尝试过在构建器中自行解析消费者?
var test = builder.Resolve<OrderCreatedMessageConsumer>();
查看已发布消息类型的交换与服务端点输入队列的交换之间的绑定。确保正确绑定了正确的类型交换。由于消息正在传递,我猜那部分是正确的。
对于接收端点,消费者似乎在某一时刻是正确的(这解释了绑定存在的原因)但目前可能没有使用正确的消息类型。消息类型必须是消费者和发布者中相同的消息契约,消息才能被消费者消费。
当消息移动到 _skipped 时,该端点上没有消费者实际使用消息本身中的消息类型。我建议发布以下输出以供审核:
bus.GetProbeResult().ToJsonString()
这将显示正在注册的消费者和正在使用的消息类型。它还将极大地帮助解决您遇到的问题。
我刚刚开始使用 Masstransit 和 RabbitMQ。上面的答案确实让我朝着正确的方向前进,但供将来参考。我在 RabbitMQ 中得到了额外的队列,因为交换不正确(重复绑定),我没有注意到这一点,因为代码似乎是正确的,但 RabbitMQ 中的不正确设置仍然存在。删除队列并重新开始解决了它。
MassTransit.3.1.2 MassTransit.Autofac.3.1.1 MassTransit.RabbitMQ.3.1.1 RabbitMQ.Client.3.6.0 Topshelf.3.3.1
一个 Topshelf Windows 服务,像这样创建一个总线实例:
var builder = new ContainerBuilder();
builder.RegisterConsumers(Assembly.GetExecutingAssembly());
builder.Register<IBusControl>(context =>
{
return Bus.Factory.CreateUsingRabbitMq(rbmq =>
{
var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
{
h.Username(BusConfig.Instance.UserName);
h.Password(BusConfig.Instance.Password);
});
rbmq.UseJsonSerializer();
rbmq.UseNLog();
rbmq.ReceiveEndpoint(BusConfig.Instance.Queue, edp =>
{
edp.UseRetry(Retry.Incremental(5, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)));
edp.LoadFrom(context);
});
});
}).SingleInstance().As<IBusControl>().As<IBus>();
return builder.Build().Resolve<IBusControl>();
一个这样的控制台应用程序:
var bus = Bus.Factory.CreateUsingRabbitMq(rbmq =>
{
var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
{
h.Username(BusConfig.Instance.UserName);
h.Password(BusConfig.Instance.Password);
});
});
bus.Start();
var msg = new OrderCreatedMessage() { OrderId = 10102 };
bus.Publish<OrderCreatedMessage>(msg);
当控制台应用程序发布消息时,名为“BusConfig.Instance.Queue”的Rabbitmq队列收到OrderCreatedMessage:
Rabbitmq queue view when message published
然后我启动了topshelf服务,OrderCreatedMessage自动移除到_skipped队列:
Rabbitmq queue view when topshelf service started
这样的公共交通日志:
MOVE:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue?prefetch=8:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue_skipped?bind=true&queue=zst.order.queue_skipped:N\/A:Moved
但是,当我发布消息并使用同一总线(Topshelf 服务)使用消息时,它有效!!!
如能提供有关此架构的任何帮助或额外见解,我们将不胜感激!
我猜它在为 OrderCreatedMessage
消息创建消费者时遇到问题。
您是否尝试过在构建器中自行解析消费者?
var test = builder.Resolve<OrderCreatedMessageConsumer>();
查看已发布消息类型的交换与服务端点输入队列的交换之间的绑定。确保正确绑定了正确的类型交换。由于消息正在传递,我猜那部分是正确的。
对于接收端点,消费者似乎在某一时刻是正确的(这解释了绑定存在的原因)但目前可能没有使用正确的消息类型。消息类型必须是消费者和发布者中相同的消息契约,消息才能被消费者消费。
当消息移动到 _skipped 时,该端点上没有消费者实际使用消息本身中的消息类型。我建议发布以下输出以供审核:
bus.GetProbeResult().ToJsonString()
这将显示正在注册的消费者和正在使用的消息类型。它还将极大地帮助解决您遇到的问题。
我刚刚开始使用 Masstransit 和 RabbitMQ。上面的答案确实让我朝着正确的方向前进,但供将来参考。我在 RabbitMQ 中得到了额外的队列,因为交换不正确(重复绑定),我没有注意到这一点,因为代码似乎是正确的,但 RabbitMQ 中的不正确设置仍然存在。删除队列并重新开始解决了它。