Hangfire、.Net Core 和 Entity Framework:并发异常
Hangfire, .Net Core and Entity Framework: concurrency exception
我正在使用 Hangfire 开发 .Net 核心应用程序并遇到以下异常
A second operation started on this context before a previous operation completed. Any instance members are not guaranteed to be thread safe.
我使用 Hangfire 以 1 小时的间隔安排作业。当新的 process/job 在较早的作业完成其过程之前启动时,我遇到了上述问题。
我们如何实现多个 Hangfire processes/jobs(多个 worker)工作(并行)来完成任务。 (现在已解决,使用默认的 AspNetCoreJobActivator)
var scopeFactory = serviceProvider.GetService<IServiceScopeFactory>();
if (scopeFactory != null)
GlobalConfiguration.Configuration.UseActivator(new AspNetCoreJobActivator(scopeFactory));
现在,我在 CreateOrderData.cs 中收到以下异常:-
/*System.InvalidOperationException: An exception has been raised that
is likely due to a transient failure. If you are connecting to a SQL
Azure database consider using SqlAzureExecutionStrategy. --->
Microsoft.EntityFrameworkCore.DbUpdateException: An error occurred
while updating the entries. See the inner exception for details. --->
System.Data.SqlClient.SqlException: Transaction (Process ID 103) was
deadlocked on lock resources with another process and has been chosen
as the deadlock victim. Rerun the transaction. */
我正在安排 hangfire cron 作业如下:-
RecurringJob.AddOrUpdate<IS2SScheduledJobs>(x => x.ProcessInputXML(), Cron.MinuteInterval(1));
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
string hangFireConnection = Configuration["ConnectionStrings:HangFire"];
GlobalConfiguration.Configuration.UseSqlServerStorage(hangFireConnection);
var config = new AutoMapper.MapperConfiguration(cfg =>
{
cfg.AddProfile(new AutoMapperProfileConfiguration());
);
var mapper = config.CreateMapper();
services.AddSingleton(mapper);
services.AddScoped<IHangFireJob, HangFireJob>();
services.AddScoped<IScheduledJobs, ScheduledJobs>();
services.AddScoped<BusinessLogic>();
services.AddHangfire(opt =>
opt.UseSqlServerStorage(Configuration["ConnectionStrings:HangFire"]));
services.AddEntityFrameworkSqlServer().AddDbContext<ABCContext>(options =>
options.UseSqlServer(Configuration["ConnectionStrings:ABC"]));
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
GlobalConfiguration.Configuration.UseActivator(new HangFireActivator(serviceProvider));
//hangFireJob.Jobs();
// add NLog to ASP.NET Core
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
loggerFactory.AddDebug();
loggerFactory.AddNLog();
// app.UseCors("AllowSpecificOrigin");
foreach (DatabaseTarget target in LogManager.Configuration.AllTargets.Where(t => t is DatabaseTarget))
{
target.ConnectionString = Configuration.GetConnectionString("Logging");
}
LogManager.ReconfigExistingLoggers();
}
Hangfire.cs
public class HangFireJob : IHangFireJob
{
private ABCContext _abcContext;
private IScheduledJobs scheduledJobs;
public HangFireJob(ABCContext abcContext, IScheduledJobs scheduledJobs)
{
_abcContext = abcContext;
this.scheduledJobs = scheduledJobs;
}
public void Jobs()
{
RecurringJob.AddOrUpdate<IScheduledJobs>(x => x.ProcessInputXML(), Cron.HourInterval(1));
}
}
ScheduledJobs.cs
public class S2SScheduledJobs : IS2SScheduledJobs
{
private BusinessLogic _businessLogic;
public ScheduledJobs(BusinessLogic businessLogic)
{
_businessLogic = businessLogic;
}
public async Task<string> ProcessInputXML()
{
await _businessLogic.ProcessXML();
}
}
BusinessLogic.cs
public class BusinessLogic
{
private ABCContext _abcContext;
public BusinessLogic(ABCContext abcContext) : base(abcContext)
{
_abcContext = abcContext;
}
public async Task ProcessXML()
{
var batchRepository = new BatchRepository(_abcContext);
var unprocessedBatchRecords = await BatchRepository.GetUnprocessedBatch();
foreach (var batchRecord in unprocessedBatchRecords)
{
try
{
int orderId = await LoadDataToOrderTable(batchRecord.BatchId);
await UpdateBatchProcessedStatus(batchRecord.BatchId);
if (orderId > 0)
{
await CreateOrderData(orderId);
}
}
catch(Exception ex)
{
}
}
}
CreateOrderData.cs
public async Task<int> CreateOrderData(int orderId)
{
try
{
await OrderRepo.InsertOrder(order);
await _abcContext.SaveChangesAsync();
}
catch(Exception ex)
{
/*System.InvalidOperationException: An exception has been raised that is likely due to a transient failure. If you are connecting to a SQL Azure database consider using SqlAzureExecutionStrategy. ---> Microsoft.EntityFrameworkCore.DbUpdateException: An error occurred while updating the entries. See the inner exception for details. ---> System.Data.SqlClient.SqlException: Transaction (Process ID 103) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction. */
}
}
InsertOrder.cs
public async Task InsertOrder(Order o)
{
// creation of large number of entites(more than 50) to be inserted in the database
woRepo.Insert(p);
poRepo.Insert(y);
//and many more like above
Insert(order);
}
Insert.cs
public virtual void Insert(TEntity entity)
{
entity.ObjectState = ObjectState.Added;
if (entity is IXYZEntity xyzEntity)
{
xyzEntity.CreatedDate = DateTime.Now;
xyzEntity.UpdatedDate = xyzEntity.CreatedDate;
xyzEntity.CreatedBy = _context.UserName ?? string.Empty;
xyzEntity.UpdatedBy = _context.UserName ?? string.Empty;
}
else if (entity is IxyzEntityNull xyzEntityNull)
{
xyzEntityNull.CreatedDate = DateTime.Now;
xyzEntityNull.UpdatedDate = xyzEntityNull.CreatedDate;
xyzEntityNull.CreatedBy = _context.UserName;
xyzEntityNull.UpdatedBy = _context.UserName;
}
_dbSet.Add(entity);
_context.SyncObjectState(entity);
}
LoadDataToOrder.cs
public async Task<int> LoadDataToOrder(int batchId)
{
// using (var unitOfWork = new UnitOfWork(_abcContext))
// {
var orderRepo = new OrderRepository(_abcContext);
Entities.Order order = new Entities.Order();
order.Guid = Guid.NewGuid();
order.BatchId = batchId;
order.VendorId = null;
orderRepo.Insert(order);
//unitOfWork.SaveChanges();
await _abcContext.SaveChangesAsync();
return order.OrderId;
//
}
}
HangfireActivator.cs
public class HangFireActivator : Hangfire.JobActivator
{
private readonly IServiceProvider _serviceProvider;
public HangFireActivator(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public override object ActivateJob(Type type)
{
return _serviceProvider.GetService(type);
}
}
请指教
谢谢。
以下解决方案适用于这 2 个问题:
实现多个Hangfire processes/jobs(多个worker)工作(并行)。
回答:当我使用内置 AspNetCoreJobActivator
而不是开箱即用时,这个问题得到了解决,即删除了 HangfireActivator class 并删除了对 UseActivator 方法的调用。
var scopeFactory = serviceProvider.GetService<IServiceScopeFactory>();
if (scopeFactory != null)
GlobalConfiguration.Configuration.UseActivator(new AspNetCoreJobActivator(scopeFactory));
SqlAzureExecutionStrategy
CreateOrder.cs
中的异常(交易陷入僵局)
回答:通过在发生死锁时自动重试查询解决了这个问题。
感谢 odinserj 的建议。
我正在使用 Hangfire 开发 .Net 核心应用程序并遇到以下异常
A second operation started on this context before a previous operation completed. Any instance members are not guaranteed to be thread safe.
我使用 Hangfire 以 1 小时的间隔安排作业。当新的 process/job 在较早的作业完成其过程之前启动时,我遇到了上述问题。
我们如何实现多个 Hangfire processes/jobs(多个 worker)工作(并行)来完成任务。 (现在已解决,使用默认的 AspNetCoreJobActivator)
var scopeFactory = serviceProvider.GetService<IServiceScopeFactory>();
if (scopeFactory != null)
GlobalConfiguration.Configuration.UseActivator(new AspNetCoreJobActivator(scopeFactory));
现在,我在 CreateOrderData.cs 中收到以下异常:-
/*System.InvalidOperationException: An exception has been raised that is likely due to a transient failure. If you are connecting to a SQL Azure database consider using SqlAzureExecutionStrategy. ---> Microsoft.EntityFrameworkCore.DbUpdateException: An error occurred while updating the entries. See the inner exception for details. ---> System.Data.SqlClient.SqlException: Transaction (Process ID 103) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction. */
我正在安排 hangfire cron 作业如下:-
RecurringJob.AddOrUpdate<IS2SScheduledJobs>(x => x.ProcessInputXML(), Cron.MinuteInterval(1));
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
string hangFireConnection = Configuration["ConnectionStrings:HangFire"];
GlobalConfiguration.Configuration.UseSqlServerStorage(hangFireConnection);
var config = new AutoMapper.MapperConfiguration(cfg =>
{
cfg.AddProfile(new AutoMapperProfileConfiguration());
);
var mapper = config.CreateMapper();
services.AddSingleton(mapper);
services.AddScoped<IHangFireJob, HangFireJob>();
services.AddScoped<IScheduledJobs, ScheduledJobs>();
services.AddScoped<BusinessLogic>();
services.AddHangfire(opt =>
opt.UseSqlServerStorage(Configuration["ConnectionStrings:HangFire"]));
services.AddEntityFrameworkSqlServer().AddDbContext<ABCContext>(options =>
options.UseSqlServer(Configuration["ConnectionStrings:ABC"]));
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
GlobalConfiguration.Configuration.UseActivator(new HangFireActivator(serviceProvider));
//hangFireJob.Jobs();
// add NLog to ASP.NET Core
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
loggerFactory.AddDebug();
loggerFactory.AddNLog();
// app.UseCors("AllowSpecificOrigin");
foreach (DatabaseTarget target in LogManager.Configuration.AllTargets.Where(t => t is DatabaseTarget))
{
target.ConnectionString = Configuration.GetConnectionString("Logging");
}
LogManager.ReconfigExistingLoggers();
}
Hangfire.cs
public class HangFireJob : IHangFireJob
{
private ABCContext _abcContext;
private IScheduledJobs scheduledJobs;
public HangFireJob(ABCContext abcContext, IScheduledJobs scheduledJobs)
{
_abcContext = abcContext;
this.scheduledJobs = scheduledJobs;
}
public void Jobs()
{
RecurringJob.AddOrUpdate<IScheduledJobs>(x => x.ProcessInputXML(), Cron.HourInterval(1));
}
}
ScheduledJobs.cs
public class S2SScheduledJobs : IS2SScheduledJobs
{
private BusinessLogic _businessLogic;
public ScheduledJobs(BusinessLogic businessLogic)
{
_businessLogic = businessLogic;
}
public async Task<string> ProcessInputXML()
{
await _businessLogic.ProcessXML();
}
}
BusinessLogic.cs
public class BusinessLogic
{
private ABCContext _abcContext;
public BusinessLogic(ABCContext abcContext) : base(abcContext)
{
_abcContext = abcContext;
}
public async Task ProcessXML()
{
var batchRepository = new BatchRepository(_abcContext);
var unprocessedBatchRecords = await BatchRepository.GetUnprocessedBatch();
foreach (var batchRecord in unprocessedBatchRecords)
{
try
{
int orderId = await LoadDataToOrderTable(batchRecord.BatchId);
await UpdateBatchProcessedStatus(batchRecord.BatchId);
if (orderId > 0)
{
await CreateOrderData(orderId);
}
}
catch(Exception ex)
{
}
}
}
CreateOrderData.cs
public async Task<int> CreateOrderData(int orderId)
{
try
{
await OrderRepo.InsertOrder(order);
await _abcContext.SaveChangesAsync();
}
catch(Exception ex)
{
/*System.InvalidOperationException: An exception has been raised that is likely due to a transient failure. If you are connecting to a SQL Azure database consider using SqlAzureExecutionStrategy. ---> Microsoft.EntityFrameworkCore.DbUpdateException: An error occurred while updating the entries. See the inner exception for details. ---> System.Data.SqlClient.SqlException: Transaction (Process ID 103) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction. */
}
}
InsertOrder.cs
public async Task InsertOrder(Order o)
{
// creation of large number of entites(more than 50) to be inserted in the database
woRepo.Insert(p);
poRepo.Insert(y);
//and many more like above
Insert(order);
}
Insert.cs
public virtual void Insert(TEntity entity)
{
entity.ObjectState = ObjectState.Added;
if (entity is IXYZEntity xyzEntity)
{
xyzEntity.CreatedDate = DateTime.Now;
xyzEntity.UpdatedDate = xyzEntity.CreatedDate;
xyzEntity.CreatedBy = _context.UserName ?? string.Empty;
xyzEntity.UpdatedBy = _context.UserName ?? string.Empty;
}
else if (entity is IxyzEntityNull xyzEntityNull)
{
xyzEntityNull.CreatedDate = DateTime.Now;
xyzEntityNull.UpdatedDate = xyzEntityNull.CreatedDate;
xyzEntityNull.CreatedBy = _context.UserName;
xyzEntityNull.UpdatedBy = _context.UserName;
}
_dbSet.Add(entity);
_context.SyncObjectState(entity);
}
LoadDataToOrder.cs
public async Task<int> LoadDataToOrder(int batchId)
{
// using (var unitOfWork = new UnitOfWork(_abcContext))
// {
var orderRepo = new OrderRepository(_abcContext);
Entities.Order order = new Entities.Order();
order.Guid = Guid.NewGuid();
order.BatchId = batchId;
order.VendorId = null;
orderRepo.Insert(order);
//unitOfWork.SaveChanges();
await _abcContext.SaveChangesAsync();
return order.OrderId;
//
}
}
HangfireActivator.cs
public class HangFireActivator : Hangfire.JobActivator
{
private readonly IServiceProvider _serviceProvider;
public HangFireActivator(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public override object ActivateJob(Type type)
{
return _serviceProvider.GetService(type);
}
}
请指教
谢谢。
以下解决方案适用于这 2 个问题:
实现多个Hangfire processes/jobs(多个worker)工作(并行)。 回答:当我使用内置
AspNetCoreJobActivator
而不是开箱即用时,这个问题得到了解决,即删除了 HangfireActivator class 并删除了对 UseActivator 方法的调用。var scopeFactory = serviceProvider.GetService<IServiceScopeFactory>(); if (scopeFactory != null) GlobalConfiguration.Configuration.UseActivator(new AspNetCoreJobActivator(scopeFactory));
SqlAzureExecutionStrategy
CreateOrder.cs
中的异常(交易陷入僵局)
回答:通过在发生死锁时自动重试查询解决了这个问题。
感谢 odinserj 的建议。