BackgroundService 中的 BlockingCollection<T> 导致高 CPU 使用率
BlockingCollection<T> in a BackgroundService causes high CPU usage
我有一个 .NET BackgroundService
用于使用 BlockingCollection<Notification>
来管理通知。
我的实现导致 CPU 使用率很高,即使 BlockingCollection
没有那么多工作要处理。
我收集了一些转储,看来我 运行 陷入线程池匮乏。
我不确定应该如何重构以避免这种情况。
private readonly BlockingCollection<Notification> _notifications;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Task.Run(async () =>
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
foreach (var notification in _notifications.GetConsumingEnumerable(stoppingToken))
{
// process notification
}
}
}, stoppingToken);
}
我也尝试删除 while 循环,但问题仍然存在。
编辑:添加了制作人
public abstract class CommandHandlerBase
{
private readonly BlockingCollection<Notification> _notifications;
public CommandHandlerBase(BlockingCollection<Notification> notifications)
{
_notifications = notifications;
}
protected void EnqueueNotification(AlertImapact alertImapact,
AlertUrgency alertUrgency,
AlertSeverity alertServerity,
string accountName,
string summary,
string details,
bool isEnabled,
Exception exception,
CancellationToken cancellationToken = default)
{
var notification = new Notification(accountName, summary, details, DateTime.UtcNow, exception.GetType().ToString())
{
Imapact = alertImapact,
Urgency = alertUrgency,
Severity = alertServerity,
IsSilenced = !isEnabled,
};
_notifications.Add(notification, cancellationToken);
}
}
阻塞是昂贵的,但让线程休眠和重新调度更昂贵。为了避免这种情况,.NET 通常在实际阻塞线程之前使用 SpinWait 开始阻塞操作。 spinwait 使用内核一段时间不做任何事情,这会导致您观察到 CPU 用法。
要解决此问题,请使用异步集合,例如 Channels。
- 频道允许您异步 post 或向其读取消息,同时保留它们的顺序。
- 它是线程安全的,这意味着多个读者和作者可以同时写入。
- 您可以创建一个有界频道,以防止发布者在频道已满时post。
- 最后,你可以通过一个
IAsyncEnumerable
读取一个Channel中的所有消息,让处理代码更简单。
避免阻塞频道
对于您的情况,代码可以更改为:
private readonly Channel<Notification> _notifications=Channel.CreateUnbounded<Notification>();
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
await foreach(var notification in _notifications.Reader.ReadAllAsync(stoppingToken))
{
// process notification
}
}
频道有意使用单独的接口进行读取和写入。要读取,您可以使用 ChannelReader class returned by Channel.Reader. To write, you use the ChannelWriter class returned by Channel.Writer. A Channel can be implicitly cast 来任一类型,从而可以轻松编写仅 accept/produce ChannelReader 或 ChannelWriter 的发布者和订阅者方法。
要写入频道,您可以使用 ChannelWriter 的 WriteAsync 方法:
await _notifications.Writer.WriteAsync(someNotification);
当你写完想要关闭频道时,你需要在作者上调用Complete():
await _notification.Writer.Complete();
处理循环将读取任何剩余的消息。要等待直到它完成,您需要等待 ChannelReader.Completion 任务:
await _notification.Reader.Completion;
来自其他 classes
当您使用 BackgroundService 时,通知通常来自其他 classes。这意味着发布者和服务都需要以某种方式访问同一个频道。一种方法是使用助手 class 并将其同时注入发布者和服务。
MessageChannel<T>
class 执行此操作并通过关闭编写器来处理应用程序终止:
public class MessageChannel<T>:IDisposable
{
private readonly Channel<Envelope<T>> _channel;
public ChannelReader<Envelope<T>> Reader => _channel;
public ChannelWriter<Envelope<T>> Writer => _channel;
public MessageChannel(IHostApplicationLifetime lifetime)
{
_channel = Channel.CreateBounded<Envelope<T>>(1);
lifetime.ApplicationStopping.Register(() => Writer.TryComplete());
}
private readonly CancellationTokenSource _cts = new();
public CancellationToken CancellationToken => _cts.Token;
public void Stop()
{
_cts.Cancel();
}
public void Dispose()
{
_cts.Dispose();
}
}
这个可以在后台服务注入:
MessageChannel<Notification> _notifications;
ChannelReader<Notification> _reader;
public MyService(MessageChannel<Notification> notifications)
{
_notifications=notifications;
_reader=notifications.Reader;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
await foreach(var notification in _reader.ReadAllAsync(stoppingToken))
{
// process notification
}
}
虽然我认为提议的频道解决方案可能存在争议,就像之前提出的那样,但我会投票支持更简单的解决方案,如果您愿意,频道是为大量消息而设计的,所以请考虑它留言很多
我怀疑你的高 CPU 发生是因为你的通知队列是空的,你没有等待。
public class Worker : BackgroundService
{
private readonly ConcurrentQueue _messages = new ConcurrentQueue();
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Factory.StartNew(() =>
{
while (!stoppingToken.IsCancellationRequested)
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
while (_messages.TryDequeue(out var notification) && !stoppingToken.IsCancellationRequested)
{
//ProcessNotificaiton
}
//Explicit delay for the cases when you have no notification you do not want to enter frantic looping which is what i suspect is happening
Task.Delay(1000, stoppingToken).GetAwaiter().GetResult();
}
});
}
}
原来这个问题与另一个 BackgroundService
有关,它正在等待计算错误的 TimeSpan
导致线程池饥饿。
我有一个 .NET BackgroundService
用于使用 BlockingCollection<Notification>
来管理通知。
我的实现导致 CPU 使用率很高,即使 BlockingCollection
没有那么多工作要处理。
我收集了一些转储,看来我 运行 陷入线程池匮乏。
我不确定应该如何重构以避免这种情况。
private readonly BlockingCollection<Notification> _notifications;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Task.Run(async () =>
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
foreach (var notification in _notifications.GetConsumingEnumerable(stoppingToken))
{
// process notification
}
}
}, stoppingToken);
}
我也尝试删除 while 循环,但问题仍然存在。
编辑:添加了制作人
public abstract class CommandHandlerBase
{
private readonly BlockingCollection<Notification> _notifications;
public CommandHandlerBase(BlockingCollection<Notification> notifications)
{
_notifications = notifications;
}
protected void EnqueueNotification(AlertImapact alertImapact,
AlertUrgency alertUrgency,
AlertSeverity alertServerity,
string accountName,
string summary,
string details,
bool isEnabled,
Exception exception,
CancellationToken cancellationToken = default)
{
var notification = new Notification(accountName, summary, details, DateTime.UtcNow, exception.GetType().ToString())
{
Imapact = alertImapact,
Urgency = alertUrgency,
Severity = alertServerity,
IsSilenced = !isEnabled,
};
_notifications.Add(notification, cancellationToken);
}
}
阻塞是昂贵的,但让线程休眠和重新调度更昂贵。为了避免这种情况,.NET 通常在实际阻塞线程之前使用 SpinWait 开始阻塞操作。 spinwait 使用内核一段时间不做任何事情,这会导致您观察到 CPU 用法。
要解决此问题,请使用异步集合,例如 Channels。
- 频道允许您异步 post 或向其读取消息,同时保留它们的顺序。
- 它是线程安全的,这意味着多个读者和作者可以同时写入。
- 您可以创建一个有界频道,以防止发布者在频道已满时post。
- 最后,你可以通过一个
IAsyncEnumerable
读取一个Channel中的所有消息,让处理代码更简单。
避免阻塞频道
对于您的情况,代码可以更改为:
private readonly Channel<Notification> _notifications=Channel.CreateUnbounded<Notification>();
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
await foreach(var notification in _notifications.Reader.ReadAllAsync(stoppingToken))
{
// process notification
}
}
频道有意使用单独的接口进行读取和写入。要读取,您可以使用 ChannelReader class returned by Channel.Reader. To write, you use the ChannelWriter class returned by Channel.Writer. A Channel can be implicitly cast 来任一类型,从而可以轻松编写仅 accept/produce ChannelReader 或 ChannelWriter 的发布者和订阅者方法。
要写入频道,您可以使用 ChannelWriter 的 WriteAsync 方法:
await _notifications.Writer.WriteAsync(someNotification);
当你写完想要关闭频道时,你需要在作者上调用Complete():
await _notification.Writer.Complete();
处理循环将读取任何剩余的消息。要等待直到它完成,您需要等待 ChannelReader.Completion 任务:
await _notification.Reader.Completion;
来自其他 classes
当您使用 BackgroundService 时,通知通常来自其他 classes。这意味着发布者和服务都需要以某种方式访问同一个频道。一种方法是使用助手 class 并将其同时注入发布者和服务。
MessageChannel<T>
class 执行此操作并通过关闭编写器来处理应用程序终止:
public class MessageChannel<T>:IDisposable
{
private readonly Channel<Envelope<T>> _channel;
public ChannelReader<Envelope<T>> Reader => _channel;
public ChannelWriter<Envelope<T>> Writer => _channel;
public MessageChannel(IHostApplicationLifetime lifetime)
{
_channel = Channel.CreateBounded<Envelope<T>>(1);
lifetime.ApplicationStopping.Register(() => Writer.TryComplete());
}
private readonly CancellationTokenSource _cts = new();
public CancellationToken CancellationToken => _cts.Token;
public void Stop()
{
_cts.Cancel();
}
public void Dispose()
{
_cts.Dispose();
}
}
这个可以在后台服务注入:
MessageChannel<Notification> _notifications;
ChannelReader<Notification> _reader;
public MyService(MessageChannel<Notification> notifications)
{
_notifications=notifications;
_reader=notifications.Reader;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _notificationsContext.Database.MigrateAsync(stoppingToken);
await foreach(var notification in _reader.ReadAllAsync(stoppingToken))
{
// process notification
}
}
虽然我认为提议的频道解决方案可能存在争议,就像之前提出的那样,但我会投票支持更简单的解决方案,如果您愿意,频道是为大量消息而设计的,所以请考虑它留言很多
我怀疑你的高 CPU 发生是因为你的通知队列是空的,你没有等待。
public class Worker : BackgroundService { private readonly ConcurrentQueue _messages = new ConcurrentQueue(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Factory.StartNew(() => { while (!stoppingToken.IsCancellationRequested) { await _notificationsContext.Database.MigrateAsync(stoppingToken); while (_messages.TryDequeue(out var notification) && !stoppingToken.IsCancellationRequested) { //ProcessNotificaiton } //Explicit delay for the cases when you have no notification you do not want to enter frantic looping which is what i suspect is happening Task.Delay(1000, stoppingToken).GetAwaiter().GetResult(); } }); } }
原来这个问题与另一个 BackgroundService
有关,它正在等待计算错误的 TimeSpan
导致线程池饥饿。