消费者生产者-生产者线程从不执行分配的功能

Consumer Producer- Producer thread never executes assigned function

我有 .NET Core Web API 解决方案。在每次调用中,我需要执行一些数据库操作。
问题是一次打开和关闭多个数据库连接。所以为了避免它,我想实现要发送到数据库的对象队列,然后想要一个单独的线程来执行数据库操作。
我试过一些代码如下。但是在这里,消费者线程从不执行分配的功能。 Producer 没有单独的线程,我只是用对象来填充队列。
我应该做什么修改?需要一些指导,因为我是线程方面的新手。

  public static class BlockingQueue
{
    public static Queue<WebServiceLogModel> queue;
    static BlockingQueue()
    {
        queue = new Queue<WebServiceLogModel>();

    }

    public static object Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            return queue.Dequeue();
        }
    }
    public static void Enqueue(WebServiceLogModel webServiceLog)
    {
        lock (queue)
        {
            queue.Enqueue(webServiceLog);
            Monitor.Pulse(queue);
        }
    }

    public static void ConsumerThread(IConfiguration configuration)
    {
        WebServiceLogModel webServiceLog = (WebServiceLogModel)Dequeue();
        webServiceLog.SaveWebServiceLog(configuration);
    }

   public static void ProducerThread(WebServiceLogModel webServiceLog)
    {
        Enqueue(webServiceLog);
         Thread.Sleep(100);
    }
}

我在 StartUp.cs 中创建并启动了线程:

    public Startup(IConfiguration configuration)
    {
        Thread t = new Thread(() => BlockingQueue.ConsumerThread(configuration));
        t.Start();
    }

在 Controller 中,我编写了代码来填充队列:

    [HttpGet]
    [Route("abc")]
    public IActionResult GetData()
    {
        BlockingQueue.ProducerThread(logModel);
        return StatusCode(HttpContext.Response.StatusCode = (int)HttpStatusCode.NotFound, ApplicationConstants.Message.NoBatchHistoryInfo);
    }

我认为如果队列中有内容,您的消费者线程只会执行一次,然后立即执行 returns。如果你想让一个线程在后台工作,它只启动一次,它不应该 return 并且应该捕获所有异常。来自 BlockingQueue.ConsumerThread 的线程在 Stratup 和 return 中被调用一次。

另外请注意,这样做是不安全的。如果没有请求进入,ASP.NET 不保证后台线程为 运行。您的应用程序池可以回收(默认情况下,它会在 20 分钟不活动后或每 27 小时回收一次),所以有有可能您的后台代码不会为某些队列项目执行。

此外,虽然它不能解决所有问题,但我建议使用 https://www.hangfire.io/ 在 ASP.NET 服务器中执行后台任务。它有持久层,可以重试作业并且有简单的 API。在您的请求处理程序中,您可以将新作业推送到 Hangfire,然后只有 1 个作业处理器线程。

首先,尽量避免staticclasses和方法。在这种情况下使用模式单例(如果你真的需要这个)。 其次,尽量避免 lockMonitor - 这些并发原语会显着降低您的性能。 在这种情况下,您可以使用上面提到的'Adam G' BlockingCollection<>,或者您可以开发自己的解决方案。

public class Service : IDisposable
{
    private readonly BlockingCollection<WebServiceLogModel> _packets =
        new BlockingCollection<WebServiceLogModel>();
    private Task _task;
    private volatile bool _active;
    private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(1);

    public Service()
    {
        _active = true;
        _task = ExecTaskInternal();
    }

    public void Enqueue(WebServiceLogModel model)
    {
        _packets.Add(model);
    }

    public void Dispose()
    {
        _active = false;
    }

    private async Task ExecTaskInternal()
    {
        while (_active)
        {
            if (_packets.TryTake(out WebServiceLogModel model))
            {
                // TODO: whatever you need
            }
            else
            {
                await Task.Delay(WaitTimeout);
            }
        }
    }
}

public class MyController : Controller
{
    [HttpGet]
    [Route("abc")]
    public IActionResult GetData([FromServices] Service service)
    {
        // receive model form somewhere
        WebServiceLogModel model = FetchModel();
        // enqueue model
        service.Enqueue(model);
        // TODO: return what you need
    }
}

在启动中:

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<Service>();
        // TODO: other init staffs
    }
}

您甚至可以向服务添加 Start/Stop 方法而不是实施 IDisposable 并在方法 Configure(IApplicationBuilder app).[=17 的启动 class 中启动您的服务=]