消费者生产者-生产者线程从不执行分配的功能
Consumer Producer- Producer thread never executes assigned function
我有 .NET Core Web API 解决方案。在每次调用中,我需要执行一些数据库操作。
问题是一次打开和关闭多个数据库连接。所以为了避免它,我想实现要发送到数据库的对象队列,然后想要一个单独的线程来执行数据库操作。
我试过一些代码如下。但是在这里,消费者线程从不执行分配的功能。 Producer 没有单独的线程,我只是用对象来填充队列。
我应该做什么修改?需要一些指导,因为我是线程方面的新手。
public static class BlockingQueue
{
public static Queue<WebServiceLogModel> queue;
static BlockingQueue()
{
queue = new Queue<WebServiceLogModel>();
}
public static object Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
return queue.Dequeue();
}
}
public static void Enqueue(WebServiceLogModel webServiceLog)
{
lock (queue)
{
queue.Enqueue(webServiceLog);
Monitor.Pulse(queue);
}
}
public static void ConsumerThread(IConfiguration configuration)
{
WebServiceLogModel webServiceLog = (WebServiceLogModel)Dequeue();
webServiceLog.SaveWebServiceLog(configuration);
}
public static void ProducerThread(WebServiceLogModel webServiceLog)
{
Enqueue(webServiceLog);
Thread.Sleep(100);
}
}
我在 StartUp.cs 中创建并启动了线程:
public Startup(IConfiguration configuration)
{
Thread t = new Thread(() => BlockingQueue.ConsumerThread(configuration));
t.Start();
}
在 Controller 中,我编写了代码来填充队列:
[HttpGet]
[Route("abc")]
public IActionResult GetData()
{
BlockingQueue.ProducerThread(logModel);
return StatusCode(HttpContext.Response.StatusCode = (int)HttpStatusCode.NotFound, ApplicationConstants.Message.NoBatchHistoryInfo);
}
我认为如果队列中有内容,您的消费者线程只会执行一次,然后立即执行 returns。如果你想让一个线程在后台工作,它只启动一次,它不应该 return 并且应该捕获所有异常。来自 BlockingQueue.ConsumerThread
的线程在 Stratup
和 return 中被调用一次。
另外请注意,这样做是不安全的。如果没有请求进入,ASP.NET 不保证后台线程为 运行。您的应用程序池可以回收(默认情况下,它会在 20 分钟不活动后或每 27 小时回收一次),所以有有可能您的后台代码不会为某些队列项目执行。
此外,虽然它不能解决所有问题,但我建议使用 https://www.hangfire.io/ 在 ASP.NET 服务器中执行后台任务。它有持久层,可以重试作业并且有简单的 API。在您的请求处理程序中,您可以将新作业推送到 Hangfire,然后只有 1 个作业处理器线程。
首先,尽量避免static
classes和方法。在这种情况下使用模式单例(如果你真的需要这个)。
其次,尽量避免 lock
、Monitor
- 这些并发原语会显着降低您的性能。
在这种情况下,您可以使用上面提到的'Adam G' BlockingCollection<>,或者您可以开发自己的解决方案。
public class Service : IDisposable
{
private readonly BlockingCollection<WebServiceLogModel> _packets =
new BlockingCollection<WebServiceLogModel>();
private Task _task;
private volatile bool _active;
private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(1);
public Service()
{
_active = true;
_task = ExecTaskInternal();
}
public void Enqueue(WebServiceLogModel model)
{
_packets.Add(model);
}
public void Dispose()
{
_active = false;
}
private async Task ExecTaskInternal()
{
while (_active)
{
if (_packets.TryTake(out WebServiceLogModel model))
{
// TODO: whatever you need
}
else
{
await Task.Delay(WaitTimeout);
}
}
}
}
public class MyController : Controller
{
[HttpGet]
[Route("abc")]
public IActionResult GetData([FromServices] Service service)
{
// receive model form somewhere
WebServiceLogModel model = FetchModel();
// enqueue model
service.Enqueue(model);
// TODO: return what you need
}
}
在启动中:
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<Service>();
// TODO: other init staffs
}
}
您甚至可以向服务添加 Start/Stop 方法而不是实施 IDisposable
并在方法 Configure(IApplicationBuilder app)
.[=17 的启动 class 中启动您的服务=]
我有 .NET Core Web API 解决方案。在每次调用中,我需要执行一些数据库操作。
问题是一次打开和关闭多个数据库连接。所以为了避免它,我想实现要发送到数据库的对象队列,然后想要一个单独的线程来执行数据库操作。
我试过一些代码如下。但是在这里,消费者线程从不执行分配的功能。 Producer 没有单独的线程,我只是用对象来填充队列。
我应该做什么修改?需要一些指导,因为我是线程方面的新手。
public static class BlockingQueue
{
public static Queue<WebServiceLogModel> queue;
static BlockingQueue()
{
queue = new Queue<WebServiceLogModel>();
}
public static object Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
return queue.Dequeue();
}
}
public static void Enqueue(WebServiceLogModel webServiceLog)
{
lock (queue)
{
queue.Enqueue(webServiceLog);
Monitor.Pulse(queue);
}
}
public static void ConsumerThread(IConfiguration configuration)
{
WebServiceLogModel webServiceLog = (WebServiceLogModel)Dequeue();
webServiceLog.SaveWebServiceLog(configuration);
}
public static void ProducerThread(WebServiceLogModel webServiceLog)
{
Enqueue(webServiceLog);
Thread.Sleep(100);
}
}
我在 StartUp.cs 中创建并启动了线程:
public Startup(IConfiguration configuration)
{
Thread t = new Thread(() => BlockingQueue.ConsumerThread(configuration));
t.Start();
}
在 Controller 中,我编写了代码来填充队列:
[HttpGet]
[Route("abc")]
public IActionResult GetData()
{
BlockingQueue.ProducerThread(logModel);
return StatusCode(HttpContext.Response.StatusCode = (int)HttpStatusCode.NotFound, ApplicationConstants.Message.NoBatchHistoryInfo);
}
我认为如果队列中有内容,您的消费者线程只会执行一次,然后立即执行 returns。如果你想让一个线程在后台工作,它只启动一次,它不应该 return 并且应该捕获所有异常。来自 BlockingQueue.ConsumerThread
的线程在 Stratup
和 return 中被调用一次。
另外请注意,这样做是不安全的。如果没有请求进入,ASP.NET 不保证后台线程为 运行。您的应用程序池可以回收(默认情况下,它会在 20 分钟不活动后或每 27 小时回收一次),所以有有可能您的后台代码不会为某些队列项目执行。
此外,虽然它不能解决所有问题,但我建议使用 https://www.hangfire.io/ 在 ASP.NET 服务器中执行后台任务。它有持久层,可以重试作业并且有简单的 API。在您的请求处理程序中,您可以将新作业推送到 Hangfire,然后只有 1 个作业处理器线程。
首先,尽量避免static
classes和方法。在这种情况下使用模式单例(如果你真的需要这个)。
其次,尽量避免 lock
、Monitor
- 这些并发原语会显着降低您的性能。
在这种情况下,您可以使用上面提到的'Adam G' BlockingCollection<>,或者您可以开发自己的解决方案。
public class Service : IDisposable
{
private readonly BlockingCollection<WebServiceLogModel> _packets =
new BlockingCollection<WebServiceLogModel>();
private Task _task;
private volatile bool _active;
private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(1);
public Service()
{
_active = true;
_task = ExecTaskInternal();
}
public void Enqueue(WebServiceLogModel model)
{
_packets.Add(model);
}
public void Dispose()
{
_active = false;
}
private async Task ExecTaskInternal()
{
while (_active)
{
if (_packets.TryTake(out WebServiceLogModel model))
{
// TODO: whatever you need
}
else
{
await Task.Delay(WaitTimeout);
}
}
}
}
public class MyController : Controller
{
[HttpGet]
[Route("abc")]
public IActionResult GetData([FromServices] Service service)
{
// receive model form somewhere
WebServiceLogModel model = FetchModel();
// enqueue model
service.Enqueue(model);
// TODO: return what you need
}
}
在启动中:
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<Service>();
// TODO: other init staffs
}
}
您甚至可以向服务添加 Start/Stop 方法而不是实施 IDisposable
并在方法 Configure(IApplicationBuilder app)
.[=17 的启动 class 中启动您的服务=]