如何在 C# ASP NET Core 的后台服务中使用 Mediator?
How to use Mediator inside Background Service in C# ASP NET Core?
我正在尝试使用 Mediator 实现后台服务 QueuedBackground 服务。
到目前为止,我能够实现队列,但无法执行 Mediator。
界面
public interface IBackgroundTaskQueueService
{
void QueueBackgroundWorkItem(object workItem, CancellationToken token);
Task<object> DequeueAsync(
CancellationToken cancellationToken);
}
实施
public class BackgroundTaskQueueService : IBackgroundTaskQueueService
{
private readonly ConcurrentQueue<(object,CancellationToken)> _workItems =
new ConcurrentQueue<(object,CancellationToken)>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(object workItem, CancellationToken token)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue((workItem,token));
_signal.Release();
}
public async Task<object> DequeueAsync( CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem.Item1;
}
}
后台服务
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IMediator _mediator;
public QueuedHostedService(IBackgroundTaskQueueService taskQueue, ILoggerFactory loggerFactory, IMediator mediator)
{
TaskQueue = taskQueue;
_mediator = mediator;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueueService TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (false == stoppingToken.IsCancellationRequested)
{
try
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
await _mediator.Send(workItem, stoppingToken);
// await _mediator.Send(new UpdateProductCostByMaterialRequestModel()
// {
// Id = 1
// }, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error occurred executing Work item.");
}
}
}
}
用法
_queueService.QueueBackgroundWorkItem(new UpdateProductCostByMaterialRequestModel()
{
Id = request.ProductId
}, CancellationToken.None);
现在使用上面的代码,我可以接收 class 对象,但是当我在调解器中传递它时,我得到 InvalidOperation Handler not registered。
我很困惑。
好的,我找到问题了
我不得不使用 ServiceFactory 接口,而不是从构造函数传递它
我的解决方案
BackgroundService 是一个单例。您不能将 Scoped 注入到 Singleton 中。
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
public QueuedHostedService(IBackgroundTaskQueueService taskQueue, ILoggerFactory loggerFactory, IServiceScopeFactory serviceScopeFactory)
{
TaskQueue = taskQueue;
_serviceScopeFactory = serviceScopeFactory;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueueService TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _serviceScopeFactory.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
while (false == stoppingToken.IsCancellationRequested)
{
try
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
if (workItem is IRequest<object> item)
{
await mediator.Send(workItem, stoppingToken);
}
// await _mediator.Send(new UpdateProductCostByMaterialRequestModel()
// {
// Id = 1
// }, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error occurred executing Work item.");
}
}
}
}
我正在尝试使用 Mediator 实现后台服务 QueuedBackground 服务。
到目前为止,我能够实现队列,但无法执行 Mediator。
界面
public interface IBackgroundTaskQueueService
{
void QueueBackgroundWorkItem(object workItem, CancellationToken token);
Task<object> DequeueAsync(
CancellationToken cancellationToken);
}
实施
public class BackgroundTaskQueueService : IBackgroundTaskQueueService
{
private readonly ConcurrentQueue<(object,CancellationToken)> _workItems =
new ConcurrentQueue<(object,CancellationToken)>();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(object workItem, CancellationToken token)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue((workItem,token));
_signal.Release();
}
public async Task<object> DequeueAsync( CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem.Item1;
}
}
后台服务
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IMediator _mediator;
public QueuedHostedService(IBackgroundTaskQueueService taskQueue, ILoggerFactory loggerFactory, IMediator mediator)
{
TaskQueue = taskQueue;
_mediator = mediator;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueueService TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (false == stoppingToken.IsCancellationRequested)
{
try
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
await _mediator.Send(workItem, stoppingToken);
// await _mediator.Send(new UpdateProductCostByMaterialRequestModel()
// {
// Id = 1
// }, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error occurred executing Work item.");
}
}
}
}
用法
_queueService.QueueBackgroundWorkItem(new UpdateProductCostByMaterialRequestModel()
{
Id = request.ProductId
}, CancellationToken.None);
现在使用上面的代码,我可以接收 class 对象,但是当我在调解器中传递它时,我得到 InvalidOperation Handler not registered。
我很困惑。
好的,我找到问题了
我不得不使用 ServiceFactory 接口,而不是从构造函数传递它
我的解决方案 BackgroundService 是一个单例。您不能将 Scoped 注入到 Singleton 中。
public class QueuedHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
public QueuedHostedService(IBackgroundTaskQueueService taskQueue, ILoggerFactory loggerFactory, IServiceScopeFactory serviceScopeFactory)
{
TaskQueue = taskQueue;
_serviceScopeFactory = serviceScopeFactory;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
public IBackgroundTaskQueueService TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _serviceScopeFactory.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
while (false == stoppingToken.IsCancellationRequested)
{
try
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
if (workItem is IRequest<object> item)
{
await mediator.Send(workItem, stoppingToken);
}
// await _mediator.Send(new UpdateProductCostByMaterialRequestModel()
// {
// Id = 1
// }, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error occurred executing Work item.");
}
}
}
}