c# - DbContext 在 BackgroundService 中被释放
c# - DbContext gets disposed in BackgroundService
我有一个 WebAPI,它也应该接收来自 RabbitMQ 的消息。我使用了 this 教程,因为我知道有时 IIS 喜欢终止长 运行 任务(虽然还没有在服务器上测试它,也许它不会工作)。我有一项服务可以处理通过 RabbitMQ 接收的消息。我遇到的第一个问题——我无法将它注入 BackgroundService
class,所以我使用了 IServiceScopeFactory
。现在,我必须使用来自两个队列的消息,据我所知,最佳做法是为此使用两个通道。但是处理是在一项服务中完成的。后台服务:
public class ConsumeRabbitMQHostedService : BackgroundService
{
private IConnection _connection;
private IModel _firstChannel;
private IModel _secondChannel;
private RabbitConfigSection _rabbitConfig;
public IServiceScopeFactory _serviceScopeFactory;
public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
{
_rabbitConfig = rabbitConfig.Value;
_serviceScopeFactory = serviceScopeFactory;
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };
_connection = factory.CreateConnection();
_firstChannel = _connection.CreateModel();
_firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
_firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_firstChannel.BasicQos(0, 1, false);
_secondChannel = _connection.CreateModel();
_secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
_secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_secondChannel.BasicQos(0, 1, false);
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var firstConsumer = new EventingBasicConsumer(_firstChannel);
var secondConsumer = new EventingBasicConsumer(_secondChannel);
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
firstConsumer.Received += (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleFirstMessage(content, scoped);
_firstChannel.BasicAck(ea.DeliveryTag, false);
};
firstConsumer.Shutdown += OnConsumerShutdown;
firstConsumer.Registered += OnConsumerRegistered;
firstConsumer.Unregistered += OnConsumerUnregistered;
firstConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
}
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
secondConsumer.Received += (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleSecondMessage(content, scoped);
_secondChannel.BasicAck(ea.DeliveryTag, false);
};
secondConsumer.Shutdown += OnConsumerShutdown;
secondConsumer.Registered += OnConsumerRegistered;
secondConsumer.Unregistered += OnConsumerUnregistered;
secondConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
}
return Task.CompletedTask;
}
private void HandleFirstMessage(string content, IIntegrationService integrationService)
{
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
private void HandleSecondMessage(string content, IIntegrationService integrationService)
{
List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
integrationService.ImportSecond(importData);
}
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
public override void Dispose()
{
_firstChannel.Close();
_connection.Close();
base.Dispose();
}
}
在服务中我得到
System.ObjectDisposedException: 'Cannot access a disposed context instance. A common cause of this error is disposing a context instance that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling 'Dispose' on the context instance, or wrapping it in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances.
Object name: 'IntegrationDbContext'.'
DbContext
被注入IIntegrationService
。如果我明白发生了什么,服务的两个实例(甚至一个)共享 DbContext
,当其中一个完成时,它会释放 DbContext
。我尝试不创建两个实例(一个 using
中的所有代码),尝试使 IIntegrationService
瞬态,尝试异步执行所有操作(它是初始版本,使其与测试同步) - 仍然是同样的错误。我应该在这里做什么?这是正确的方法吗?
更新 1. ConfigureServices
在 Startup
:
public void ConfigureServices(IServiceCollection services)
{
var rabbitConfigSection =
Configuration.GetSection("Rabbit");
services.Configure<RabbitConfigSection>(rabbitConfigSection);
services.AddDbContext<SUNDbContext>(options =>
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));
services.AddCors();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Title = "My API",
Version = "v1"
});
});
services.AddRabbit(Configuration);
services.AddHostedService<ConsumeRabbitMQHostedService>();
services.AddControllers();
services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
}
问题是由于 _serviceScopeFactory.CreateScope()
创建的外部 scope
在每个 using 语句之后被释放,而每条消息仍在尝试依赖现在释放的范围和附加的上下文处理消息。
解决方案是在消息处理程序中为每条消息创建一个新范围:
private void HandleFirstMessage(string content)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService integrationService = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
}
我有一个 WebAPI,它也应该接收来自 RabbitMQ 的消息。我使用了 this 教程,因为我知道有时 IIS 喜欢终止长 运行 任务(虽然还没有在服务器上测试它,也许它不会工作)。我有一项服务可以处理通过 RabbitMQ 接收的消息。我遇到的第一个问题——我无法将它注入 BackgroundService
class,所以我使用了 IServiceScopeFactory
。现在,我必须使用来自两个队列的消息,据我所知,最佳做法是为此使用两个通道。但是处理是在一项服务中完成的。后台服务:
public class ConsumeRabbitMQHostedService : BackgroundService
{
private IConnection _connection;
private IModel _firstChannel;
private IModel _secondChannel;
private RabbitConfigSection _rabbitConfig;
public IServiceScopeFactory _serviceScopeFactory;
public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
{
_rabbitConfig = rabbitConfig.Value;
_serviceScopeFactory = serviceScopeFactory;
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };
_connection = factory.CreateConnection();
_firstChannel = _connection.CreateModel();
_firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
_firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_firstChannel.BasicQos(0, 1, false);
_secondChannel = _connection.CreateModel();
_secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
_secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_secondChannel.BasicQos(0, 1, false);
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var firstConsumer = new EventingBasicConsumer(_firstChannel);
var secondConsumer = new EventingBasicConsumer(_secondChannel);
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
firstConsumer.Received += (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleFirstMessage(content, scoped);
_firstChannel.BasicAck(ea.DeliveryTag, false);
};
firstConsumer.Shutdown += OnConsumerShutdown;
firstConsumer.Registered += OnConsumerRegistered;
firstConsumer.Unregistered += OnConsumerUnregistered;
firstConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
}
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
secondConsumer.Received += (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleSecondMessage(content, scoped);
_secondChannel.BasicAck(ea.DeliveryTag, false);
};
secondConsumer.Shutdown += OnConsumerShutdown;
secondConsumer.Registered += OnConsumerRegistered;
secondConsumer.Unregistered += OnConsumerUnregistered;
secondConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
}
return Task.CompletedTask;
}
private void HandleFirstMessage(string content, IIntegrationService integrationService)
{
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
private void HandleSecondMessage(string content, IIntegrationService integrationService)
{
List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
integrationService.ImportSecond(importData);
}
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
public override void Dispose()
{
_firstChannel.Close();
_connection.Close();
base.Dispose();
}
}
在服务中我得到
System.ObjectDisposedException: 'Cannot access a disposed context instance. A common cause of this error is disposing a context instance that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling 'Dispose' on the context instance, or wrapping it in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances. Object name: 'IntegrationDbContext'.'
DbContext
被注入IIntegrationService
。如果我明白发生了什么,服务的两个实例(甚至一个)共享 DbContext
,当其中一个完成时,它会释放 DbContext
。我尝试不创建两个实例(一个 using
中的所有代码),尝试使 IIntegrationService
瞬态,尝试异步执行所有操作(它是初始版本,使其与测试同步) - 仍然是同样的错误。我应该在这里做什么?这是正确的方法吗?
更新 1. ConfigureServices
在 Startup
:
public void ConfigureServices(IServiceCollection services)
{
var rabbitConfigSection =
Configuration.GetSection("Rabbit");
services.Configure<RabbitConfigSection>(rabbitConfigSection);
services.AddDbContext<SUNDbContext>(options =>
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));
services.AddCors();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Title = "My API",
Version = "v1"
});
});
services.AddRabbit(Configuration);
services.AddHostedService<ConsumeRabbitMQHostedService>();
services.AddControllers();
services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
}
问题是由于 _serviceScopeFactory.CreateScope()
创建的外部 scope
在每个 using 语句之后被释放,而每条消息仍在尝试依赖现在释放的范围和附加的上下文处理消息。
解决方案是在消息处理程序中为每条消息创建一个新范围:
private void HandleFirstMessage(string content)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService integrationService = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
}