如何停止 MassTransit 为错误消息创建交换绑定
How can I stop MassTransit creating exchange bindings for error messages
我正在尝试收听错误队列以处理失败的消息,但我似乎无法让 MassTransit 不对我希望它在配置中收听的消息设置绑定。配置如下,使用的是 MassTransit 的 v3:
var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(hostAddress, h =>
{
h.Username(username);
h.Password(password);
});
configurator.ReceiveEndpoint(host, "myqueue_error",
endpointConfigurator =>
{
endpointConfigurator.Handler<SomeMessage>(context =>
{
return Console.Out.WriteLineAsync("Woop");
});
});
});
在上面的示例中,它将为任何发布 SomeMessage
的内容设置绑定,并将它们定向到 myqueue_error
,我只希望消息进入从失败的服务。有没有办法使用队列中的消息但告诉 MassTransit 不要为它们绑定?
Update - Potential Solution
我似乎不需要设置 ReceiveEndpoint,但我可以重命名控制总线以接受我关心的消息,这样就可以处理这些消息,而无需创建消息的交换绑定。
下面是修改后的代码,不确定这是否是理想的方式,但它 有效
var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator.Host(hostAddress, h =>
{
h.Username(username);
h.Password(password);
});
// We need to make the queue look like the error queue
configurator.BusQueueName = $"{_queue}_error";
configurator.Durable = true;
configurator.AutoDelete = false;
configurator.SetQueueArgument("x-expires", null);
});
var connectHandle = _busControl.ConnectHandler<SomeMessage>(context => Console.Out.WriteLineAsync("Woop"));
_busHandle = _busControl.Start();
_busHandle.Ready.Wait();
// Wait
// Clean up
connectHandle.Disconnect();
_busHandle.Stop
通过大量的挖掘,我找到了一个更好的解决方案,但我完全没有从文档中找到它。
看来我们可以通过订阅消费者来收听消息来收听 Fault 这非常适合我一直在努力实现的目标,我们还可以保持错误队列的正常运行。
http://docs.masstransit-project.com/en/mt3/usage/exceptions.html#handling-exceptions
所以我接受的最后一点配置如下:
var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(hostAddress, h =>
{
h.Username(username);
h.Password(password);
});
configurator.ReceiveEndpoint(host, "error_listener",
endpointConfigurator =>
{
endpointConfigurator.Handler<Fault<SomeMessage>>(context =>
{
return Console.Out.WriteLineAsync("Woop");
});
});
});
我正在尝试收听错误队列以处理失败的消息,但我似乎无法让 MassTransit 不对我希望它在配置中收听的消息设置绑定。配置如下,使用的是 MassTransit 的 v3:
var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(hostAddress, h =>
{
h.Username(username);
h.Password(password);
});
configurator.ReceiveEndpoint(host, "myqueue_error",
endpointConfigurator =>
{
endpointConfigurator.Handler<SomeMessage>(context =>
{
return Console.Out.WriteLineAsync("Woop");
});
});
});
在上面的示例中,它将为任何发布 SomeMessage
的内容设置绑定,并将它们定向到 myqueue_error
,我只希望消息进入从失败的服务。有没有办法使用队列中的消息但告诉 MassTransit 不要为它们绑定?
Update - Potential Solution
我似乎不需要设置 ReceiveEndpoint,但我可以重命名控制总线以接受我关心的消息,这样就可以处理这些消息,而无需创建消息的交换绑定。
下面是修改后的代码,不确定这是否是理想的方式,但它 有效
var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator.Host(hostAddress, h =>
{
h.Username(username);
h.Password(password);
});
// We need to make the queue look like the error queue
configurator.BusQueueName = $"{_queue}_error";
configurator.Durable = true;
configurator.AutoDelete = false;
configurator.SetQueueArgument("x-expires", null);
});
var connectHandle = _busControl.ConnectHandler<SomeMessage>(context => Console.Out.WriteLineAsync("Woop"));
_busHandle = _busControl.Start();
_busHandle.Ready.Wait();
// Wait
// Clean up
connectHandle.Disconnect();
_busHandle.Stop
通过大量的挖掘,我找到了一个更好的解决方案,但我完全没有从文档中找到它。
看来我们可以通过订阅消费者来收听消息来收听 Fault 这非常适合我一直在努力实现的目标,我们还可以保持错误队列的正常运行。 http://docs.masstransit-project.com/en/mt3/usage/exceptions.html#handling-exceptions
所以我接受的最后一点配置如下:
var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(hostAddress, h =>
{
h.Username(username);
h.Password(password);
});
configurator.ReceiveEndpoint(host, "error_listener",
endpointConfigurator =>
{
endpointConfigurator.Handler<Fault<SomeMessage>>(context =>
{
return Console.Out.WriteLineAsync("Woop");
});
});
});