MassTransit 故障处理配置
MassTransit fault handling configuration
我在我的应用程序中配置故障使用者时遇到问题。问题是消耗的消息被传递到 *_error_skipped
队列并且它并没有完全消失。
下面是一个非常简单的例子。客户端应用程序收到失败的消息,它从 test_error
队列中消失,但它仍然存在于 test_error_skipped
队列中。
服务项目
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
namespace MassTransitTest.Service
{
public class RequestModel
{
public DateTime RequestTime { get; set; }
}
class MassTransitService : IDisposable
{
private readonly IBusControl _busControl;
public MassTransitService()
{
_busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
{
var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
{
h.Username("guest");
h.Password("guest");
});
configure.ReceiveEndpoint(host, "test", c =>
{
c.UseRetry(r => r.None());
c.Consumer<RequestConsumer>();
});
});
TaskUtil.Await(_busControl.StartAsync());
Console.WriteLine("bus started");
}
public void Dispose()
{
_busControl?.StopAsync().Wait();
}
}
class RequestConsumer : IConsumer<RequestModel>
{
public Task Consume(ConsumeContext<RequestModel> context)
{
Console.WriteLine($"request with message id {context.MessageId} received: {context.Message.RequestTime}");
throw new NotImplementedException();
}
}
}
客户项目
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
using MassTransitTest.Service;
namespace MassTransitTest.Client
{
class MassTransitClient
{
private readonly IBusControl _busControl;
public MassTransitClient()
{
_busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
{
var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
{
h.Username("guest");
h.Password("guest");
}); enter code here
configure.ReceiveEndpoint(host, "test_error", c =>
{
c.Consumer<ErrorConsumer>();
});
});
TaskUtil.Await(_busControl.StartAsync());
Console.WriteLine("bus started");
}
public async Task Send()
{
Console.WriteLine("sending request");
await (await _busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/mt_test/test"))).Send(new RequestModel()
{
RequestTime = DateTime.Now
});
Console.WriteLine("request sent");
}
}
class ErrorConsumer : IConsumer<Fault<RequestModel>>
{
public Task Consume(ConsumeContext<Fault<RequestModel>> context)
{
Console.WriteLine($"request with message id {context.Message.FaultedMessageId} failed. requested time: {context.Message.Message.RequestTime}");
return Task.CompletedTask;
}
}
}
我正在使用 .net core 2.1.1 和 MassTransit 5.1.3
要回答你的问题,它有几个部分,首先:
MassTransit 接收端点在消费者抛出异常时将消息移动到 _error
queue。不建议在 _error
queue 上创建接收端点,也不应这样做。
如果您只是想观察消费者是否发生故障,您可以创建一个单独的接收端点(例如fault-queue)并注册您的Fault<T>
消费者。 MassTransit 将发布一条实现 Fault<T>
的消息,代理将通过接收端点将该消息路由到您的消费者。
但是,根据您上面的示例,您正在发送请求并期望客户端知道是否发生了错误。为此,我建议使用请求客户端 - 它将消息 headers 设置为 return 错误返回给请求发起者。它还允许发送响应。如果你不想等待响应,或者等着看故障是否发生,上面的故障观察器是你最好的选择。
您可以在documentation.
中查看如何使用请求客户端
我在我的应用程序中配置故障使用者时遇到问题。问题是消耗的消息被传递到 *_error_skipped
队列并且它并没有完全消失。
下面是一个非常简单的例子。客户端应用程序收到失败的消息,它从 test_error
队列中消失,但它仍然存在于 test_error_skipped
队列中。
服务项目
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
namespace MassTransitTest.Service
{
public class RequestModel
{
public DateTime RequestTime { get; set; }
}
class MassTransitService : IDisposable
{
private readonly IBusControl _busControl;
public MassTransitService()
{
_busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
{
var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
{
h.Username("guest");
h.Password("guest");
});
configure.ReceiveEndpoint(host, "test", c =>
{
c.UseRetry(r => r.None());
c.Consumer<RequestConsumer>();
});
});
TaskUtil.Await(_busControl.StartAsync());
Console.WriteLine("bus started");
}
public void Dispose()
{
_busControl?.StopAsync().Wait();
}
}
class RequestConsumer : IConsumer<RequestModel>
{
public Task Consume(ConsumeContext<RequestModel> context)
{
Console.WriteLine($"request with message id {context.MessageId} received: {context.Message.RequestTime}");
throw new NotImplementedException();
}
}
}
客户项目
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
using MassTransitTest.Service;
namespace MassTransitTest.Client
{
class MassTransitClient
{
private readonly IBusControl _busControl;
public MassTransitClient()
{
_busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
{
var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
{
h.Username("guest");
h.Password("guest");
}); enter code here
configure.ReceiveEndpoint(host, "test_error", c =>
{
c.Consumer<ErrorConsumer>();
});
});
TaskUtil.Await(_busControl.StartAsync());
Console.WriteLine("bus started");
}
public async Task Send()
{
Console.WriteLine("sending request");
await (await _busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/mt_test/test"))).Send(new RequestModel()
{
RequestTime = DateTime.Now
});
Console.WriteLine("request sent");
}
}
class ErrorConsumer : IConsumer<Fault<RequestModel>>
{
public Task Consume(ConsumeContext<Fault<RequestModel>> context)
{
Console.WriteLine($"request with message id {context.Message.FaultedMessageId} failed. requested time: {context.Message.Message.RequestTime}");
return Task.CompletedTask;
}
}
}
我正在使用 .net core 2.1.1 和 MassTransit 5.1.3
要回答你的问题,它有几个部分,首先:
MassTransit 接收端点在消费者抛出异常时将消息移动到 _error
queue。不建议在 _error
queue 上创建接收端点,也不应这样做。
如果您只是想观察消费者是否发生故障,您可以创建一个单独的接收端点(例如fault-queue)并注册您的Fault<T>
消费者。 MassTransit 将发布一条实现 Fault<T>
的消息,代理将通过接收端点将该消息路由到您的消费者。
但是,根据您上面的示例,您正在发送请求并期望客户端知道是否发生了错误。为此,我建议使用请求客户端 - 它将消息 headers 设置为 return 错误返回给请求发起者。它还允许发送响应。如果你不想等待响应,或者等着看故障是否发生,上面的故障观察器是你最好的选择。
您可以在documentation.
中查看如何使用请求客户端