为什么当消费者在 MassTransit RouterSlip 中抛出异常时不调用 Compensate 方法
Why Compensate method doesn't Call when a consumer thrown exception in MassTransit RouterSlip
我在 saga 状态机中构建了一个路由器 slip :
var builder = new RoutingSlipBuilder(NewId.NextGuid());
var submitOrderUrl = QueueNames.GetActivityUri(nameof(SubmitOrderActivity));
builder.AddActivity("SubmitOrder", submitOrderUrl, new
{
context.Message.OrderId
});;
builder.AddActivity("Payment", QueueNames.GetActivityUri(nameof(PaymentActivity)), new {
context.Message.OrderId,
context.Message.CustomerId,
context.Message.Credit
});
builder.AddActivity("TakeProduct", QueueNames.GetActivityUri(nameof(TakeProductActivity)), new
{
context.Message.OrderId,
Baskets
});
builder.AddVariable("OrderId", context.Message.OrderId);
var routingSlip = builder.Build();
await context.Execute(routingSlip);
我有 TakeProductActivity activity:
public class TakeProductActivity : IActivity:
...
public async Task<ExecutionResult> Execute(ExecuteContext<TakeProductArgument> context)
{
logger.LogInformation($"Take Product Courier called for order {context.Arguments.OrderId}");
var uri = QueueNames.GetMessageUri(nameof(TakeProductTransactionMessage));
var sendEndpoint = await context.GetSendEndpoint(uri);
await sendEndpoint.Send<TakeProductTransactionMessage>(new
{
ProductBaskets = context.Arguments.Baskets
});
return context.Completed(new { Baskets = context.Arguments.Baskets, OrderId=context.Arguments.OrderId });
}
当我使用 sendEndpoint.Send() 方法(即发即弃)时,当服务发生异常时,补偿方法不会自动激活,
但是当我使用requestClient.GetResponse(request/reply)方法调用服务时,出现异常时自动调用Compensate方法。
在 PaymentConsumer 中,当抛出异常时,必须调用支付补偿方法,但它没有!
///this class has implemented in another micro-service hosted separate process:
public class TakeProductTransactionConsumer : IConsumer<TakeProductTransactionMessage>
....
public async Task Consume(ConsumeContext<TakeProductTransactionMessage> context)
{
if(context.Message.ProductBaskets.Count>0)
{
throw new Exception("Process Failed!");
}
logger.LogInformation($"Take product called ");
Dictionary<int, int> productCounts = new Dictionary<int, int>();
foreach (var item in context.Message.ProductBaskets)
{
productCounts.Add(item.ProductId, item.Count);
}
var products = await productService.TakeProducts(productCounts);
await publishEndpoint.Publish<ProductsUpdatedEvent>(new
{
ProductUpdatedEvents = products.Select(p =>new { ProductId = p.Id,p.Price,p.Count}).ToList()
});
}
问题是 MassTransit 无法从 rabbitMQ 获取异常并自动调用补偿方法。
当router slip 活动抛出异常时我应该如何告诉MassTransit 调用compensate
如果您的 Take Product activity 使用 Send to fire-and-forget to the take product 服务,并且该服务抛出异常,activity 永远不会知道它,因为它已经完成.即发即弃,在目标服务中没有观察到异常。
如果想在拍品服务抛出异常时拍品activity失败,需要使用request/response从服务中观察异常
我在 saga 状态机中构建了一个路由器 slip :
var builder = new RoutingSlipBuilder(NewId.NextGuid());
var submitOrderUrl = QueueNames.GetActivityUri(nameof(SubmitOrderActivity));
builder.AddActivity("SubmitOrder", submitOrderUrl, new
{
context.Message.OrderId
});;
builder.AddActivity("Payment", QueueNames.GetActivityUri(nameof(PaymentActivity)), new {
context.Message.OrderId,
context.Message.CustomerId,
context.Message.Credit
});
builder.AddActivity("TakeProduct", QueueNames.GetActivityUri(nameof(TakeProductActivity)), new
{
context.Message.OrderId,
Baskets
});
builder.AddVariable("OrderId", context.Message.OrderId);
var routingSlip = builder.Build();
await context.Execute(routingSlip);
我有 TakeProductActivity activity:
public class TakeProductActivity : IActivity
public async Task<ExecutionResult> Execute(ExecuteContext<TakeProductArgument> context)
{
logger.LogInformation($"Take Product Courier called for order {context.Arguments.OrderId}");
var uri = QueueNames.GetMessageUri(nameof(TakeProductTransactionMessage));
var sendEndpoint = await context.GetSendEndpoint(uri);
await sendEndpoint.Send<TakeProductTransactionMessage>(new
{
ProductBaskets = context.Arguments.Baskets
});
return context.Completed(new { Baskets = context.Arguments.Baskets, OrderId=context.Arguments.OrderId });
}
当我使用 sendEndpoint.Send() 方法(即发即弃)时,当服务发生异常时,补偿方法不会自动激活, 但是当我使用requestClient.GetResponse(request/reply)方法调用服务时,出现异常时自动调用Compensate方法。 在 PaymentConsumer 中,当抛出异常时,必须调用支付补偿方法,但它没有!
///this class has implemented in another micro-service hosted separate process:
public class TakeProductTransactionConsumer : IConsumer<TakeProductTransactionMessage>
....
public async Task Consume(ConsumeContext<TakeProductTransactionMessage> context)
{
if(context.Message.ProductBaskets.Count>0)
{
throw new Exception("Process Failed!");
}
logger.LogInformation($"Take product called ");
Dictionary<int, int> productCounts = new Dictionary<int, int>();
foreach (var item in context.Message.ProductBaskets)
{
productCounts.Add(item.ProductId, item.Count);
}
var products = await productService.TakeProducts(productCounts);
await publishEndpoint.Publish<ProductsUpdatedEvent>(new
{
ProductUpdatedEvents = products.Select(p =>new { ProductId = p.Id,p.Price,p.Count}).ToList()
});
}
问题是 MassTransit 无法从 rabbitMQ 获取异常并自动调用补偿方法。 当router slip 活动抛出异常时我应该如何告诉MassTransit 调用compensate
如果您的 Take Product activity 使用 Send to fire-and-forget to the take product 服务,并且该服务抛出异常,activity 永远不会知道它,因为它已经完成.即发即弃,在目标服务中没有观察到异常。
如果想在拍品服务抛出异常时拍品activity失败,需要使用request/response从服务中观察异常