涉及 EntityFramework/SQL 服务器和 NServiceBus/MSMQ 的分布式事务中的非同时提交
Non-simultaneous commits in a distributed transaction involving EntityFramework/SQL Server and NServiceBus/MSMQ
有一个 .NET 4.7 WebAPI 应用程序与 SQL 服务器一起使用 Entity Framework 并通过 MSMQ 传输托管 NServiceBus 端点。
简化的工作流程可以通过控制器操作来描述:
[HttpPost]
public async Task<IHttpActionResult> SendDebugCommand()
{
var sample = new Sample
{
State = SampleState.Initial,
};
_dataContext.Set<Sample>().Add(sample);
await _dataContext.SaveChangesAsync();
sample.State = SampleState.Queueing;
var options = new TransactionOptions
{
IsolationLevel = IsolationLevel.ReadCommitted,
};
using (var scope = new TransactionScope(TransactionScopeOption.Required, options, TransactionScopeAsyncFlowOption.Enabled))
{
await _dataContext.SaveChangesAsync();
await _messageSession.Send(new DebugCommand {SampleId = sample.Id});
scope.Complete();
}
_logger.OnCreated(sample);
return Ok();
}
和 DebugCommand
处理程序,发送到相同的 NServiceBus 端点:
public async Task Handle(DebugCommand message, IMessageHandlerContext context)
{
var sample = await _dataContext.Set<Sample>().FindAsync(message.SampleId);
if (sample == null)
{
_logger.OnNotFound(message.SampleId);
return;
}
if (sample.State != SampleState.Queueing)
{
_logger.OnUnexpectedState(sample, SampleState.Queueing);
return;
}
// Some work being done
sample.State = SampleState.Processed;
await _dataContext.SaveChangesAsync();
_logger.OnHandled(sample);
}
有时,消息处理程序从数据库中检索 Sample
,但其状态仍为 Initial
,而不是预期的 Queueing
。这意味着在控制器动作中发起的分布式事务还没有完全完成。日志文件中的时间戳也证实了这一点。
'sometimes' 很少发生,在较重的负载和网络延迟下可能会影响。无法使用本地数据库重现问题,但使用远程数据库很容易。
我检查了 DTC 配置。我检查了肯定有升级到分布式事务。此外,如果 scope.Complete()
未被调用,则不会发生数据库更新和消息发送。
当事务作用域完成并释放后,直觉上我希望在执行一条进一步的指令之前,DB 和 MSMQ 都得到解决。
我找不到问题的明确答案:
- 这是 DTC 的工作方式吗?事务双方都进行提交,而完成没有报告给协调器,这是否正常?
- 如果是,是否意味着我应该通过改变程序逻辑来克服此类事件?
- 我是否以某种方式滥用了交易?正确的方法是什么?
除了 Evk 在 here's also an excerpt from the particular documentation page about transactions 中提到的评论:
A distributed transaction between the queueing system and the persistent storage guarantees atomic commits but guarantees only eventual consistency.
两个补充说明:
- NServiceBus 默认使用
IsolationLevel.ReadCommitted
作为用于消费消息的事务。 This can be configured 虽然我不确定将它设置为在消费者上序列化是否真的能解决这里的问题。
- 一般来说,不建议在服务之间使用共享数据库,因为这会大大增加耦合并为您在这里遇到的问题打开大门。尝试将相关数据作为消息的一部分传递,并将数据库作为一项服务的内部存储。特别是在使用 Web 服务器时,一种常见的模式是将所有相关数据添加到消息中并在向用户确认成功时触发它(因为消息不会丢失),而接收端点可以将数据存储到它的数据库中,如果必要的。要提供更具体的建议,这需要对您的领域和用例有更多了解。我可以推荐 particular discussion community 来讨论 design/architectural 这样的问题。
有一个 .NET 4.7 WebAPI 应用程序与 SQL 服务器一起使用 Entity Framework 并通过 MSMQ 传输托管 NServiceBus 端点。
简化的工作流程可以通过控制器操作来描述:
[HttpPost]
public async Task<IHttpActionResult> SendDebugCommand()
{
var sample = new Sample
{
State = SampleState.Initial,
};
_dataContext.Set<Sample>().Add(sample);
await _dataContext.SaveChangesAsync();
sample.State = SampleState.Queueing;
var options = new TransactionOptions
{
IsolationLevel = IsolationLevel.ReadCommitted,
};
using (var scope = new TransactionScope(TransactionScopeOption.Required, options, TransactionScopeAsyncFlowOption.Enabled))
{
await _dataContext.SaveChangesAsync();
await _messageSession.Send(new DebugCommand {SampleId = sample.Id});
scope.Complete();
}
_logger.OnCreated(sample);
return Ok();
}
和 DebugCommand
处理程序,发送到相同的 NServiceBus 端点:
public async Task Handle(DebugCommand message, IMessageHandlerContext context)
{
var sample = await _dataContext.Set<Sample>().FindAsync(message.SampleId);
if (sample == null)
{
_logger.OnNotFound(message.SampleId);
return;
}
if (sample.State != SampleState.Queueing)
{
_logger.OnUnexpectedState(sample, SampleState.Queueing);
return;
}
// Some work being done
sample.State = SampleState.Processed;
await _dataContext.SaveChangesAsync();
_logger.OnHandled(sample);
}
有时,消息处理程序从数据库中检索 Sample
,但其状态仍为 Initial
,而不是预期的 Queueing
。这意味着在控制器动作中发起的分布式事务还没有完全完成。日志文件中的时间戳也证实了这一点。
'sometimes' 很少发生,在较重的负载和网络延迟下可能会影响。无法使用本地数据库重现问题,但使用远程数据库很容易。
我检查了 DTC 配置。我检查了肯定有升级到分布式事务。此外,如果 scope.Complete()
未被调用,则不会发生数据库更新和消息发送。
当事务作用域完成并释放后,直觉上我希望在执行一条进一步的指令之前,DB 和 MSMQ 都得到解决。
我找不到问题的明确答案:
- 这是 DTC 的工作方式吗?事务双方都进行提交,而完成没有报告给协调器,这是否正常?
- 如果是,是否意味着我应该通过改变程序逻辑来克服此类事件?
- 我是否以某种方式滥用了交易?正确的方法是什么?
除了 Evk 在
A distributed transaction between the queueing system and the persistent storage guarantees atomic commits but guarantees only eventual consistency.
两个补充说明:
- NServiceBus 默认使用
IsolationLevel.ReadCommitted
作为用于消费消息的事务。 This can be configured 虽然我不确定将它设置为在消费者上序列化是否真的能解决这里的问题。 - 一般来说,不建议在服务之间使用共享数据库,因为这会大大增加耦合并为您在这里遇到的问题打开大门。尝试将相关数据作为消息的一部分传递,并将数据库作为一项服务的内部存储。特别是在使用 Web 服务器时,一种常见的模式是将所有相关数据添加到消息中并在向用户确认成功时触发它(因为消息不会丢失),而接收端点可以将数据存储到它的数据库中,如果必要的。要提供更具体的建议,这需要对您的领域和用例有更多了解。我可以推荐 particular discussion community 来讨论 design/architectural 这样的问题。