如何在基于数据库的多线程 notification/email 发送器中减少 CPU 使用
How to reduce CPU usage in a db-based multi threaded notification/email sender
我正在尝试开发一个 windows 服务来向订阅发送通知。
数据保存在 SQL 服务器数据库中。
通知是通过向 REST API 端点发出 Web POST 请求创建的,并保存在数据库中 table。
该服务启动一个任务,该任务不断从该数据库读取通知 table 并将它们添加到队列中。
该服务还启动了几个任务,这些任务不断从队列中读取并执行实际的发送过程。
代码运行良好并且完成了所需的工作,但问题是 CPU 使用率是 100% 运行 服务。
我尝试使用 Thread.Sleep 或 Task.Delay,但都没有帮助我减少 CPU 的使用。
我已阅读此代码项目 page,我需要使用等待处理程序并且应该在某些情况下等待。我无法让它正常工作。
所以谁能告诉我怎样做才能减少 EnqueueTask
和 DequeueTask
的 CPU 使用率?
这是发件人代码:
static class NotificationSender
{
static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
static Task enqueueTask = null;
static Task[] dequeueTasks = null;
public static void StartSending(ServiceState serviceState)
{
PushService.InitServices();
enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);
deliveryQueue = new ConcurrentQueue<NotificationDelivery>();
int dequeueTasksCount = 10;
dequeueTasks = new Task[dequeueTasksCount];
for (int i = 0; i < dequeueTasksCount; i++)
{
dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);
}
}
public static void EnqueueTask(object state)
{
ServiceState serviceState = (ServiceState)state;
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
{
int toEnqueue = 100 - deliveryQueue.Count;
if (toEnqueue > 0)
{
// fetch some records from db to be enqueued
NotificationDelivery[] deliveries = db.NotificationDeliveries
.Include("Subscription")
.Include("Notification")
.Include("Notification.NotificationLanguages")
.Include("Notification.NotificationLanguages.Language")
.Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
.OrderBy(nd => nd.StartSendingAt)
.Take(toEnqueue)
.ToArray();
foreach (NotificationDelivery delivery in deliveries)
{
delivery.Status = NotificationDeliveryStatus.Queued;
deliveryQueue.Enqueue(delivery);
}
if (deliveries.Length > 0)
{
db.SaveChanges(); // save Queued state, so not fetched again the next loop
}
}
// save any changes made by the DequeueTask
// an event may be used here to know if any changes made
db.SaveChanges();
}
Task.WaitAll(dequeueTasks);
db.SaveChanges();
}
}
public async static void DequeueTask(object state)
{
ServiceState serviceState = (ServiceState)state;
while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
{
NotificationDelivery delivery = null;
if (deliveryQueue.TryDequeue(out delivery))
{
NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
{
PushResult result = await PushService.DoPushAsync(delivery);
switch (result)
{
case PushResult.Pushed:
ns = NotificationDeliveryStatus.Delivered;
break;
case PushResult.Error:
ns = NotificationDeliveryStatus.FailureError;
break;
case PushResult.NotSupported:
ns = NotificationDeliveryStatus.FailureNotSupported;
break;
case PushResult.UnSubscribed:
ns = NotificationDeliveryStatus.FailureUnSubscribed;
delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
break;
}
}
else
{
ns = NotificationDeliveryStatus.FailureUnSubscribed;
}
delivery.Status = ns;
delivery.DeliveredAt = DateTime.Now;
}
}
}
public static void Wait()
{
Task.WaitAll(enqueueTask);
Task.WaitAll(dequeueTasks);
enqueueTask.Dispose();
for(int i = 0; i < dequeueTasks.Length; i++)
{
dequeueTasks[i].Dispose();
}
}
}
一个ServiceState
类型的对象用于维护服务的启动和停止,下面是该类型的代码:
class ServiceState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public void Start()
{
CancellationTokenSource = new CancellationTokenSource();
NotificationSender.StartSending(this);
}
public void Stop()
{
CancellationTokenSource.Cancel();
NotificationSender.Wait();
CancellationTokenSource.Dispose();
}
}
这里是服务启动和停止代码:
protected override void OnStart(string[] args)
{
_serviceState = new ServiceState();
_serviceState.Start();
}
protected override void OnStop()
{
_serviceState.Stop();
}
我想我终于可以做一些好的改变来使用等待处理程序和计时器来维持 CPU 的使用。
EnqueueTask
将等待 5 秒,然后再尝试从通知中获取数据 table 如果没有获取任何通知。如果没有获取通知,它将启动计时器并重置等待句柄。计时器经过回调将设置等待句柄。
另外 DequeueTask
现在正在使用等待句柄。如果队列中没有更多项目,它将重置等待句柄以停止对空队列进行出列。 EnqueueTask
将项目添加到队列时设置此等待句柄。
CPU 使用率现在 <= 10%
这里是更新后的 NotificationSender
代码:
static class NotificationSender
{
static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
static Task enqueueTask = null;
static Task[] dequeueTasks = null;
static ManualResetEvent enqueueSignal = null;
static ManualResetEvent dequeueSignal = null;
static System.Timers.Timer enqueueTimer = null;
public static void StartSending(CancellationToken token)
{
PushService.InitServices();
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
NotificationDelivery[] queuedDeliveries = db.NotificationDeliveries
.Where(nd => nd.Status == NotificationDeliveryStatus.Queued)
.ToArray();
foreach (NotificationDelivery delivery in queuedDeliveries)
{
delivery.Status = NotificationDeliveryStatus.Pending;
}
db.SaveChanges();
}
enqueueSignal = new ManualResetEvent(true);
dequeueSignal = new ManualResetEvent(false);
enqueueTimer = new System.Timers.Timer();
enqueueTimer.Elapsed += EnqueueTimerCallback;
enqueueTimer.Interval = 5000;
enqueueTimer.AutoReset = false;
enqueueTimer.Stop();
enqueueTask = new Task(EnqueueTask, token, TaskCreationOptions.LongRunning);
enqueueTask.Start();
deliveryQueue = new ConcurrentQueue<NotificationDelivery>();
int dequeueTasksCount = 10;
dequeueTasks = new Task[dequeueTasksCount];
for (int i = 0; i < dequeueTasksCount; i++)
{
dequeueTasks[i] = new Task(DequeueTask, token, TaskCreationOptions.LongRunning);
dequeueTasks[i].Start();
}
}
public static void EnqueueTimerCallback(Object source, ElapsedEventArgs e)
{
enqueueSignal.Set();
enqueueTimer.Stop();
}
public static void EnqueueTask(object state)
{
CancellationToken token = (CancellationToken)state;
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
while (!token.IsCancellationRequested)
{
if (enqueueSignal.WaitOne())
{
int toEnqueue = 100 - deliveryQueue.Count;
if (toEnqueue > 0)
{
// fetch some records from db to be enqueued
NotificationDelivery[] deliveries = db.NotificationDeliveries
.Include("Subscription")
.Include("Notification")
.Include("Notification.NotificationLanguages")
.Include("Notification.NotificationLanguages.Language")
.Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
.OrderBy(nd => nd.StartSendingAt)
.Take(toEnqueue)
.ToArray();
foreach (NotificationDelivery delivery in deliveries)
{
delivery.Status = NotificationDeliveryStatus.Queued;
deliveryQueue.Enqueue(delivery);
}
if (deliveries.Length > 0)
{
// save Queued state, so not fetched again the next loop
db.SaveChanges();
// signal the DequeueTask
dequeueSignal.Set();
}
else
{
// no more notifications, wait 5 seconds before try fetching again
enqueueSignal.Reset();
enqueueTimer.Start();
}
}
// save any changes made by the DequeueTask
// an event may be used here to know if any changes made
db.SaveChanges();
}
}
Task.WaitAll(dequeueTasks);
db.SaveChanges();
}
}
public async static void DequeueTask(object state)
{
CancellationToken token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
if (dequeueSignal.WaitOne()) // block untill we have items in the queue
{
NotificationDelivery delivery = null;
if (deliveryQueue.TryDequeue(out delivery))
{
NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
{
PushResult result = await PushService.DoPushAsync(delivery);
switch (result)
{
case PushResult.Pushed:
ns = NotificationDeliveryStatus.Delivered;
break;
case PushResult.Error:
ns = NotificationDeliveryStatus.FailureError;
break;
case PushResult.NotSupported:
ns = NotificationDeliveryStatus.FailureNotSupported;
break;
case PushResult.UnSubscribed:
ns = NotificationDeliveryStatus.FailureUnSubscribed;
delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
break;
}
}
else
{
ns = NotificationDeliveryStatus.FailureUnSubscribed;
}
delivery.Status = ns;
delivery.DeliveredAt = DateTime.Now;
}
else
{
// empty queue, no more items
// stop dequeueing untill new items added by EnqueueTask
dequeueSignal.Reset();
}
}
}
}
public static void Wait()
{
Task.WaitAll(enqueueTask);
Task.WaitAll(dequeueTasks);
enqueueTask.Dispose();
for(int i = 0; i < dequeueTasks.Length; i++)
{
dequeueTasks[i].Dispose();
}
}
}
我正在尝试开发一个 windows 服务来向订阅发送通知。 数据保存在 SQL 服务器数据库中。
通知是通过向 REST API 端点发出 Web POST 请求创建的,并保存在数据库中 table。
该服务启动一个任务,该任务不断从该数据库读取通知 table 并将它们添加到队列中。
该服务还启动了几个任务,这些任务不断从队列中读取并执行实际的发送过程。
代码运行良好并且完成了所需的工作,但问题是 CPU 使用率是 100% 运行 服务。
我尝试使用 Thread.Sleep 或 Task.Delay,但都没有帮助我减少 CPU 的使用。
我已阅读此代码项目 page,我需要使用等待处理程序并且应该在某些情况下等待。我无法让它正常工作。
所以谁能告诉我怎样做才能减少 EnqueueTask
和 DequeueTask
的 CPU 使用率?
这是发件人代码:
static class NotificationSender
{
static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
static Task enqueueTask = null;
static Task[] dequeueTasks = null;
public static void StartSending(ServiceState serviceState)
{
PushService.InitServices();
enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);
deliveryQueue = new ConcurrentQueue<NotificationDelivery>();
int dequeueTasksCount = 10;
dequeueTasks = new Task[dequeueTasksCount];
for (int i = 0; i < dequeueTasksCount; i++)
{
dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);
}
}
public static void EnqueueTask(object state)
{
ServiceState serviceState = (ServiceState)state;
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
{
int toEnqueue = 100 - deliveryQueue.Count;
if (toEnqueue > 0)
{
// fetch some records from db to be enqueued
NotificationDelivery[] deliveries = db.NotificationDeliveries
.Include("Subscription")
.Include("Notification")
.Include("Notification.NotificationLanguages")
.Include("Notification.NotificationLanguages.Language")
.Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
.OrderBy(nd => nd.StartSendingAt)
.Take(toEnqueue)
.ToArray();
foreach (NotificationDelivery delivery in deliveries)
{
delivery.Status = NotificationDeliveryStatus.Queued;
deliveryQueue.Enqueue(delivery);
}
if (deliveries.Length > 0)
{
db.SaveChanges(); // save Queued state, so not fetched again the next loop
}
}
// save any changes made by the DequeueTask
// an event may be used here to know if any changes made
db.SaveChanges();
}
Task.WaitAll(dequeueTasks);
db.SaveChanges();
}
}
public async static void DequeueTask(object state)
{
ServiceState serviceState = (ServiceState)state;
while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
{
NotificationDelivery delivery = null;
if (deliveryQueue.TryDequeue(out delivery))
{
NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
{
PushResult result = await PushService.DoPushAsync(delivery);
switch (result)
{
case PushResult.Pushed:
ns = NotificationDeliveryStatus.Delivered;
break;
case PushResult.Error:
ns = NotificationDeliveryStatus.FailureError;
break;
case PushResult.NotSupported:
ns = NotificationDeliveryStatus.FailureNotSupported;
break;
case PushResult.UnSubscribed:
ns = NotificationDeliveryStatus.FailureUnSubscribed;
delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
break;
}
}
else
{
ns = NotificationDeliveryStatus.FailureUnSubscribed;
}
delivery.Status = ns;
delivery.DeliveredAt = DateTime.Now;
}
}
}
public static void Wait()
{
Task.WaitAll(enqueueTask);
Task.WaitAll(dequeueTasks);
enqueueTask.Dispose();
for(int i = 0; i < dequeueTasks.Length; i++)
{
dequeueTasks[i].Dispose();
}
}
}
一个ServiceState
类型的对象用于维护服务的启动和停止,下面是该类型的代码:
class ServiceState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public void Start()
{
CancellationTokenSource = new CancellationTokenSource();
NotificationSender.StartSending(this);
}
public void Stop()
{
CancellationTokenSource.Cancel();
NotificationSender.Wait();
CancellationTokenSource.Dispose();
}
}
这里是服务启动和停止代码:
protected override void OnStart(string[] args)
{
_serviceState = new ServiceState();
_serviceState.Start();
}
protected override void OnStop()
{
_serviceState.Stop();
}
我想我终于可以做一些好的改变来使用等待处理程序和计时器来维持 CPU 的使用。
EnqueueTask
将等待 5 秒,然后再尝试从通知中获取数据 table 如果没有获取任何通知。如果没有获取通知,它将启动计时器并重置等待句柄。计时器经过回调将设置等待句柄。
另外 DequeueTask
现在正在使用等待句柄。如果队列中没有更多项目,它将重置等待句柄以停止对空队列进行出列。 EnqueueTask
将项目添加到队列时设置此等待句柄。
CPU 使用率现在 <= 10%
这里是更新后的 NotificationSender
代码:
static class NotificationSender
{
static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
static Task enqueueTask = null;
static Task[] dequeueTasks = null;
static ManualResetEvent enqueueSignal = null;
static ManualResetEvent dequeueSignal = null;
static System.Timers.Timer enqueueTimer = null;
public static void StartSending(CancellationToken token)
{
PushService.InitServices();
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
NotificationDelivery[] queuedDeliveries = db.NotificationDeliveries
.Where(nd => nd.Status == NotificationDeliveryStatus.Queued)
.ToArray();
foreach (NotificationDelivery delivery in queuedDeliveries)
{
delivery.Status = NotificationDeliveryStatus.Pending;
}
db.SaveChanges();
}
enqueueSignal = new ManualResetEvent(true);
dequeueSignal = new ManualResetEvent(false);
enqueueTimer = new System.Timers.Timer();
enqueueTimer.Elapsed += EnqueueTimerCallback;
enqueueTimer.Interval = 5000;
enqueueTimer.AutoReset = false;
enqueueTimer.Stop();
enqueueTask = new Task(EnqueueTask, token, TaskCreationOptions.LongRunning);
enqueueTask.Start();
deliveryQueue = new ConcurrentQueue<NotificationDelivery>();
int dequeueTasksCount = 10;
dequeueTasks = new Task[dequeueTasksCount];
for (int i = 0; i < dequeueTasksCount; i++)
{
dequeueTasks[i] = new Task(DequeueTask, token, TaskCreationOptions.LongRunning);
dequeueTasks[i].Start();
}
}
public static void EnqueueTimerCallback(Object source, ElapsedEventArgs e)
{
enqueueSignal.Set();
enqueueTimer.Stop();
}
public static void EnqueueTask(object state)
{
CancellationToken token = (CancellationToken)state;
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
while (!token.IsCancellationRequested)
{
if (enqueueSignal.WaitOne())
{
int toEnqueue = 100 - deliveryQueue.Count;
if (toEnqueue > 0)
{
// fetch some records from db to be enqueued
NotificationDelivery[] deliveries = db.NotificationDeliveries
.Include("Subscription")
.Include("Notification")
.Include("Notification.NotificationLanguages")
.Include("Notification.NotificationLanguages.Language")
.Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
.OrderBy(nd => nd.StartSendingAt)
.Take(toEnqueue)
.ToArray();
foreach (NotificationDelivery delivery in deliveries)
{
delivery.Status = NotificationDeliveryStatus.Queued;
deliveryQueue.Enqueue(delivery);
}
if (deliveries.Length > 0)
{
// save Queued state, so not fetched again the next loop
db.SaveChanges();
// signal the DequeueTask
dequeueSignal.Set();
}
else
{
// no more notifications, wait 5 seconds before try fetching again
enqueueSignal.Reset();
enqueueTimer.Start();
}
}
// save any changes made by the DequeueTask
// an event may be used here to know if any changes made
db.SaveChanges();
}
}
Task.WaitAll(dequeueTasks);
db.SaveChanges();
}
}
public async static void DequeueTask(object state)
{
CancellationToken token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
if (dequeueSignal.WaitOne()) // block untill we have items in the queue
{
NotificationDelivery delivery = null;
if (deliveryQueue.TryDequeue(out delivery))
{
NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
{
PushResult result = await PushService.DoPushAsync(delivery);
switch (result)
{
case PushResult.Pushed:
ns = NotificationDeliveryStatus.Delivered;
break;
case PushResult.Error:
ns = NotificationDeliveryStatus.FailureError;
break;
case PushResult.NotSupported:
ns = NotificationDeliveryStatus.FailureNotSupported;
break;
case PushResult.UnSubscribed:
ns = NotificationDeliveryStatus.FailureUnSubscribed;
delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
break;
}
}
else
{
ns = NotificationDeliveryStatus.FailureUnSubscribed;
}
delivery.Status = ns;
delivery.DeliveredAt = DateTime.Now;
}
else
{
// empty queue, no more items
// stop dequeueing untill new items added by EnqueueTask
dequeueSignal.Reset();
}
}
}
}
public static void Wait()
{
Task.WaitAll(enqueueTask);
Task.WaitAll(dequeueTasks);
enqueueTask.Dispose();
for(int i = 0; i < dequeueTasks.Length; i++)
{
dequeueTasks[i].Dispose();
}
}
}