Masstransit :: IConsumer<Fault<T>> 无法使用故障消息并创建额外的新故障主题
Mass Transit :: IConsumer<Fault<T>> not able to consume the fault message and additional new Fault Topic is created
我是 Masstransit 的新手,我已经开始使用 AWS SQS 创建 pub/Sub 的小测试用例,并在出现任何异常时消耗故障并执行可恢复的操作。
Registration and Receive End point configuration
public void ConfigureServices(IServiceCollection 服务)
{
services.AddControllers();
services.AddMassTransit(config => {
config.AddConsumer<OrderConsumer>();
config.AddConsumer<OrderFaultConsumer>();
//config.AddConsumer<OrderAccepted>();
config.UsingAmazonSqs((ctx, cfg) => {
cfg.Host("ap-southeast-2",
h =>
{
h.AccessKey("AccessKey");
h.SecretKey("SecretKey");
});
cfg.ReceiveEndpoint("order-queue", c => {
//Don't create a new topic
c.ConfigureConsumeTopology = false;
c.Subscribe("Order", s =>
{
});
c.UseMessageRetry(r=>r.Immediate(3));
c.ConfigureConsumer<OrderConsumer>(ctx);
c.ConfigureConsumer<OrderFaultConsumer>(ctx);
});
services.AddMassTransitHostedService();
示例发布代码 :
public async Task<IActionResult> Post([FromBody] OrderDto order)
{
await publishEndpoint.Publish<IOrder>(new Order()
{
Message="Fault",
order=order
});
return Ok();
}
OrderConsumer
public class OrderConsumer : IConsumer<IOrder>
{
private readonly ILogger<OrderConsumer> logger;
public OrderConsumer(ILogger<OrderConsumer> logger)
{
this.logger = logger;
}
/*
MassTransit delivers messages to consumers by calling the Consume method */
public async Task Consume(ConsumeContext<IOrder> context)
{
if(context.Message.Message=="Fault")
throw new Exception("Very bad things happened");
await Console.Out.WriteLineAsync(context.Message.Message);
}
}
OrderFaultConsumer
public class OrderFaultConsumer : IConsumer<Fault<IOrder>>
{
private readonly ILogger<OrderFaultConsumer> _logger;
public OrderFaultConsumer(ILogger<OrderFaultConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<Fault<IOrder>> context)
{
var originalMessage = context.Message.Message.order;
var exceptions = context.Message.Exceptions;
//Do something interesting
await Console.Out.WriteLineAsync($"discarding message:{JsonConvert.SerializeObject(originalMessage)}");
}
}
预期是:
- 应创建“订单”主题
- "Order_Queue" 应创建订阅订单主题。
- 如果发布成功且消息(输入)!=Fault,则应调用“OrderConsumer”。
- 如果Message(input)=="Fault"则调用"OrderConsumer",立即重试3次后抛异常,调用OrderFaultConsumer处理fault
实际:
- 已创建主题
- 队列已创建和订阅
- 成功场景已通过。
- 场景失败,创建了新的 FaultTopic(不是预期的)并且从未调用或使用 OrderFaultConsumer。
5.Order_Queue_error 已创建并推送故障消息(不是预期的)。
请指教
这里有很多东西要打开,所以和我在一起。
对,发布消息时,会根据发布的消息类型创建主题。在这种情况下,名称是根据 IOrder
类型生成的。如果要更改主题的实体名称,可以将 [EntityName("order")]
添加到消息类型,或在总线拓扑中指定自定义实体名称格式化程序。
消费者服务启动时,会创建order-queue
队列。到目前为止,还不错。
是的,发布了确切的消息类型,消费者得到了它的副本。这实际上是一个难题,因为:
指定c.ConfigureConsumeTopology = false;
,启动消费者服务既不会创建主题,也不会将主题绑定到队列。所以这必须是在添加此行之前或手动完成的。
由于使用了Subscribe("Order", ...)
,特别是Order的名称,主题名称与发布的消息类型不匹配。由于 SNS 不支持多态消息路由,发布 IOrder
不会使用 Order
主题,因此您要么使用速记,要么将该类型的实体名称更改为 Order
.
当消费者抛出异常时,MassTransit will generate一个Fault<IOrder>
将被发布到匹配的主题。通用类型将始终匹配使用的消息类型(而不是发布的消息类型)。
这会导致创建一个主题(如果它尚不存在)。
至于您的消费者,由于您指定了 c.ConfigureConsumeTopology = false;
,接收端点既不会在启动时创建主题,也不会创建对该主题的订阅。结果是您的 OrderFaultConsumer 将不会收到已发布的故障消息。
不鼓励在同一个接收端点上同时配置消费者和错误消费者。。我建议为故障消费者使用单独的队列。
创建order-queue_error队列是因为MassTransit将所有故障消息移至错误队列以防止消息丢失,以便消息可以问题解决后移回队列。
我是 Masstransit 的新手,我已经开始使用 AWS SQS 创建 pub/Sub 的小测试用例,并在出现任何异常时消耗故障并执行可恢复的操作。
Registration and Receive End point configuration
public void ConfigureServices(IServiceCollection 服务) { services.AddControllers();
services.AddMassTransit(config => {
config.AddConsumer<OrderConsumer>();
config.AddConsumer<OrderFaultConsumer>();
//config.AddConsumer<OrderAccepted>();
config.UsingAmazonSqs((ctx, cfg) => {
cfg.Host("ap-southeast-2",
h =>
{
h.AccessKey("AccessKey");
h.SecretKey("SecretKey");
});
cfg.ReceiveEndpoint("order-queue", c => {
//Don't create a new topic
c.ConfigureConsumeTopology = false;
c.Subscribe("Order", s =>
{
});
c.UseMessageRetry(r=>r.Immediate(3));
c.ConfigureConsumer<OrderConsumer>(ctx);
c.ConfigureConsumer<OrderFaultConsumer>(ctx);
});
services.AddMassTransitHostedService();
示例发布代码 :
public async Task<IActionResult> Post([FromBody] OrderDto order)
{
await publishEndpoint.Publish<IOrder>(new Order()
{
Message="Fault",
order=order
});
return Ok();
}
OrderConsumer
public class OrderConsumer : IConsumer<IOrder>
{
private readonly ILogger<OrderConsumer> logger;
public OrderConsumer(ILogger<OrderConsumer> logger)
{
this.logger = logger;
}
/*
MassTransit delivers messages to consumers by calling the Consume method */
public async Task Consume(ConsumeContext<IOrder> context)
{
if(context.Message.Message=="Fault")
throw new Exception("Very bad things happened");
await Console.Out.WriteLineAsync(context.Message.Message);
}
}
OrderFaultConsumer
public class OrderFaultConsumer : IConsumer<Fault<IOrder>>
{
private readonly ILogger<OrderFaultConsumer> _logger;
public OrderFaultConsumer(ILogger<OrderFaultConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<Fault<IOrder>> context)
{
var originalMessage = context.Message.Message.order;
var exceptions = context.Message.Exceptions;
//Do something interesting
await Console.Out.WriteLineAsync($"discarding message:{JsonConvert.SerializeObject(originalMessage)}");
}
}
预期是:
- 应创建“订单”主题
- "Order_Queue" 应创建订阅订单主题。
- 如果发布成功且消息(输入)!=Fault,则应调用“OrderConsumer”。
- 如果Message(input)=="Fault"则调用"OrderConsumer",立即重试3次后抛异常,调用OrderFaultConsumer处理fault
实际:
- 已创建主题
- 队列已创建和订阅
- 成功场景已通过。
- 场景失败,创建了新的 FaultTopic(不是预期的)并且从未调用或使用 OrderFaultConsumer。 5.Order_Queue_error 已创建并推送故障消息(不是预期的)。
请指教
这里有很多东西要打开,所以和我在一起。
对,发布消息时,会根据发布的消息类型创建主题。在这种情况下,名称是根据
IOrder
类型生成的。如果要更改主题的实体名称,可以将[EntityName("order")]
添加到消息类型,或在总线拓扑中指定自定义实体名称格式化程序。消费者服务启动时,会创建
order-queue
队列。到目前为止,还不错。是的,发布了确切的消息类型,消费者得到了它的副本。这实际上是一个难题,因为:
指定
c.ConfigureConsumeTopology = false;
,启动消费者服务既不会创建主题,也不会将主题绑定到队列。所以这必须是在添加此行之前或手动完成的。由于使用了
Subscribe("Order", ...)
,特别是Order的名称,主题名称与发布的消息类型不匹配。由于 SNS 不支持多态消息路由,发布IOrder
不会使用Order
主题,因此您要么使用速记,要么将该类型的实体名称更改为Order
.
当消费者抛出异常时,MassTransit will generate一个
Fault<IOrder>
将被发布到匹配的主题。通用类型将始终匹配使用的消息类型(而不是发布的消息类型)。这会导致创建一个主题(如果它尚不存在)。
至于您的消费者,由于您指定了
c.ConfigureConsumeTopology = false;
,接收端点既不会在启动时创建主题,也不会创建对该主题的订阅。结果是您的 OrderFaultConsumer 将不会收到已发布的故障消息。不鼓励在同一个接收端点上同时配置消费者和错误消费者。。我建议为故障消费者使用单独的队列。
创建order-queue_error队列是因为MassTransit将所有故障消息移至错误队列以防止消息丢失,以便消息可以问题解决后移回队列。