在 Entity Framework 的交易中通过 MassTransit 发送消息
Sending message via MassTransit in a transaction with Entity Framework
背景:
我们正在使用 Quartz.NET to schedule when a message should be sent to a RabbitMQ queue using MassTransit。所有消息也使用 Entity Framework 保存到数据库中,既用于审计原因,也用于恢复;在应用程序启动期间重新安排未发送的消息,因为 Quartz 将计划的作业(要发送的消息)保存在内存中。发送消息时,数据库会更新,在消息实体上设置一个标志,说明已发送,以便在下次应用程序启动时不会重新发送。
问题:
应用程序可能会在数据库更新后、消息发送前终止,导致消息永远无法发送。如果我们翻转操作顺序并首先发送消息,应用程序可能会在数据库更新之前崩溃,从而在下次应用程序启动时重新发送消息。我们希望将这两个操作作为一个事务来执行。
我们成功地在消费者中使用了一个事务,从 ConsumeContext<T>.GetPayload<TransactionContext>()
获取了一个 TransactionContext
并将其传递给 DbContext.EnlistTransaction()
。使用 ISendEndpointProvider
是否有任何类似的选项可用?
以下是 Quartz IJob 实现的(简化)版本。我们必须为 EF 创建一个范围,因为 Quartz 是 运行 作为一个单例。如果要解决上述问题,我们有什么选择?
我们正在使用 MassTransit 7.2.3、Quartz 3.3.3 和 Entity Framework Core 5.0.11。
public sealed class ScheduleMessageJob : IJob
{
private readonly IServiceProvider _serviceProvider;
public ScheduleMessageJob(IServiceProvider serviceProvider) =>
_serviceProvider = serviceProvider;
public async Task Execute(IJobExecutionContext context)
{
using var scope = _serviceProvider.CreateScope();
var scopedServiceProvider = scope.ServiceProvider;
await UpdateScheduledMessage(scopedServiceProvider);
await Send(scopedServiceProvider);
}
private static async Task UpdateScheduledMessage(IServiceProvider scopedServiceProvider)
{
var dbContext = scopedServiceProvider.GetRequiredService<IDbContext>();
var scheduledMessage = await dbContext.Get<ScheduledMessage>(id: 1);
scheduledMessage.IsQueued = true;
dbContext.Update(scheduledMessage);
await dbContext.SaveChangesAsync();
}
public static async Task Send(IServiceProvider scopedServiceProvider)
{
var endpoint = await GetEndpoint(scopedServiceProvider);
var triggerExecuted = new TriggerExecuted("Some data");
await endpoint.Send(triggerExecuted);
}
private static async Task<ISendEndpoint> GetEndpoint(IServiceProvider serviceProvider) =>
await serviceProvider
.GetRequiredService<ISendEndpointProvider>()
.GetSendEndpoint(new Uri("queue:SomeQueue"));
}
大多数消息代理不是事务性的,也不参与事务。在您上面描述的情况下,预期的分布式事务一致性是一个错误。或者那至少是一个非常非常糟糕的主意。
您可以选择依赖消息代理(我的偏好)或数据库。您还可以在 消息被消耗后将消息写入数据库,使用审计功能(观察所有 sent/published/consumed 消息并将它们写入数据库)或通过该消息类型的单独消费者。
Quartz 具有触发器重新触发功能,因此您可以重试失败的触发器。我会首先使用持久的 Quartz 数据存储,然后写入代理,然后最后处理您的数据库。
背景:
我们正在使用 Quartz.NET to schedule when a message should be sent to a RabbitMQ queue using MassTransit。所有消息也使用 Entity Framework 保存到数据库中,既用于审计原因,也用于恢复;在应用程序启动期间重新安排未发送的消息,因为 Quartz 将计划的作业(要发送的消息)保存在内存中。发送消息时,数据库会更新,在消息实体上设置一个标志,说明已发送,以便在下次应用程序启动时不会重新发送。
问题:
应用程序可能会在数据库更新后、消息发送前终止,导致消息永远无法发送。如果我们翻转操作顺序并首先发送消息,应用程序可能会在数据库更新之前崩溃,从而在下次应用程序启动时重新发送消息。我们希望将这两个操作作为一个事务来执行。
我们成功地在消费者中使用了一个事务,从 ConsumeContext<T>.GetPayload<TransactionContext>()
获取了一个 TransactionContext
并将其传递给 DbContext.EnlistTransaction()
。使用 ISendEndpointProvider
是否有任何类似的选项可用?
以下是 Quartz IJob 实现的(简化)版本。我们必须为 EF 创建一个范围,因为 Quartz 是 运行 作为一个单例。如果要解决上述问题,我们有什么选择?
我们正在使用 MassTransit 7.2.3、Quartz 3.3.3 和 Entity Framework Core 5.0.11。
public sealed class ScheduleMessageJob : IJob
{
private readonly IServiceProvider _serviceProvider;
public ScheduleMessageJob(IServiceProvider serviceProvider) =>
_serviceProvider = serviceProvider;
public async Task Execute(IJobExecutionContext context)
{
using var scope = _serviceProvider.CreateScope();
var scopedServiceProvider = scope.ServiceProvider;
await UpdateScheduledMessage(scopedServiceProvider);
await Send(scopedServiceProvider);
}
private static async Task UpdateScheduledMessage(IServiceProvider scopedServiceProvider)
{
var dbContext = scopedServiceProvider.GetRequiredService<IDbContext>();
var scheduledMessage = await dbContext.Get<ScheduledMessage>(id: 1);
scheduledMessage.IsQueued = true;
dbContext.Update(scheduledMessage);
await dbContext.SaveChangesAsync();
}
public static async Task Send(IServiceProvider scopedServiceProvider)
{
var endpoint = await GetEndpoint(scopedServiceProvider);
var triggerExecuted = new TriggerExecuted("Some data");
await endpoint.Send(triggerExecuted);
}
private static async Task<ISendEndpoint> GetEndpoint(IServiceProvider serviceProvider) =>
await serviceProvider
.GetRequiredService<ISendEndpointProvider>()
.GetSendEndpoint(new Uri("queue:SomeQueue"));
}
大多数消息代理不是事务性的,也不参与事务。在您上面描述的情况下,预期的分布式事务一致性是一个错误。或者那至少是一个非常非常糟糕的主意。
您可以选择依赖消息代理(我的偏好)或数据库。您还可以在 消息被消耗后将消息写入数据库,使用审计功能(观察所有 sent/published/consumed 消息并将它们写入数据库)或通过该消息类型的单独消费者。
Quartz 具有触发器重新触发功能,因此您可以重试失败的触发器。我会首先使用持久的 Quartz 数据存储,然后写入代理,然后最后处理您的数据库。