等待连续 UI 后台轮询任务
Waiting on a continuous UI background polling task
我对并行编程 C# 比较陌生(当我开始我的项目时,我研究了 TPL 的 MSDN 示例)并且希望对以下示例代码提供一些输入。
它是几个后台工作任务之一。此特定任务将状态消息推送到日志。
var uiCts = new CancellationTokenSource();
var globalMsgQueue = new ConcurrentQueue<string>();
var backgroundUiTask = new Task(
() =>
{
while (!uiCts.IsCancellationRequested)
{
while (globalMsgQueue.Count > 0)
ConsumeMsgQueue();
Thread.Sleep(backgroundUiTimeOut);
}
},
uiCts.Token);
// Somewhere else entirely
backgroundUiTask.Start();
Task.WaitAll(backgroundUiTask);
我在阅读了 Alternatives to using Thread.Sleep for waiting, , When to use Task.Delay, when to use Thread.Sleep?,
等多个主题后要求专业人士提供意见
这提示我使用 Task.Delay 而不是 Thread.Sleep 作为第一步并引入 TaskCreationOptions.LongRunning。
但我想知道我可能还遗漏了哪些其他注意事项?轮询 MsgQueue.Count 是一种代码味道吗?更好的版本会依赖于事件吗?
绝对 Task.Delay
优于 Thread.Sleep
,因为您不会阻塞池中的线程,并且在等待期间池中的线程将可用于处理其他任务。然后,你不需要让你的任务变得很长-运行ning。 long-运行ning任务是运行在一个专用线程里,然后Task.Delay
就没有意义了
相反,我会推荐一种不同的方法。只需使用 System.Threading.Timer,让您的生活变得简单。计时器是内核对象,将 运行 在线程池上进行回调,您不必担心延迟或休眠。
首先,没有理由使用 Task.Start
或使用 Task 构造函数。任务不是线程,它们本身不是 运行。它们是 承诺 某些事情将在 未来 完成并且可能会或可能不会产生任何结果。其中一些将 运行 在线程池线程上。使用 Task.Run
创建并 运行 在需要时一步完成任务。
我认为实际的问题是如何创建缓冲的后台工作者。 .NET 已经提供了可以执行此操作的 classes。
ActionBlock< T >
ActionBlock class 已经实现了这个以及更多 - 它允许您指定输入缓冲区有多大,同时处理传入消息的任务数,支持取消和 异步完成。
日志块可以像这样简单:
_logBlock=new ActionBlock<string>(msg=>File.AppendAllText("myLog.txt",msg));
ActionBlock class 本身负责缓冲输入,在新消息到达时将新消息提供给工作函数,如果缓冲区已满,则可能 阻塞 发送者等.不需要轮询
其他代码可以使用Post
或SendAsync
向区块发送消息:
_block.Post("some message");
完成后,我们可以告诉块 Complete()
并等待它处理任何剩余的消息:
_block.Complete();
await _block.Completion;
频道
一个更新的、较低级别的选项是使用 Channels。您可以将通道视为一种异步队列,尽管它们可用于实现复杂的处理管道。如果今天编写 ActionBlock,它将在内部使用 Channels。
使用频道,需要您自己提供"worker"任务。不过不需要轮询,因为 ChannelReader class 允许您异步读取消息,甚至可以使用 await foreach
.
writer 方法可能如下所示:
public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_=Task.Run(async ()=>{
await foreach(var msg in channel.Reader.ReadAllAsync(token))
{
File.AppendAllText(path,msg);
}
},token).ContinueWith(t=>writer.TryComplete(t.Exception);
return writer;
}
....
_logWriter=LogIt(somePath);
其他代码可以使用WriteAsync
或TryWrite
发送消息,例如:
_logWriter.TryWrite(someMessage);
完成后,我们可以在编写器上调用 Complete()
或 TryComplete()
:
_logWriter.TryComplete();
行
.ContinueWith(t=>writer.TryComplete(t.Exception);
需要确保通道关闭,即使发生异常或发出取消令牌信号也是如此。
乍一看这似乎太麻烦了。通道允许我们轻松 运行 初始化代码或将状态从一条消息传递到下一条消息。我们可以在循环开始之前打开一个流并使用它而不是每次调用 File.AppendAllText
时都重新打开文件,例如:
public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_=Task.Run(async ()=>{
//***** Can't do this with an ActionBlock ****
using(var writer=File.AppendText(somePath))
{
await foreach(var msg in channel.Reader.ReadAllAsync(token))
{
writer.WriteLine(msg);
//Or
//await writer.WriteLineAsync(msg);
}
}
},token).ContinueWith(t=>writer.TryComplete(t.Exception);
return writer;
}
TPL Dataflow library is the preferred tool for this kind of job. It allows building efficient producer-consumer pairs quite easily, and more complex pipelines as well, while offering a complete set of configuration options. In your case using a single ActionBlock
应该够了。
您可能会考虑使用一个更简单的解决方案 BlockingCollection
。它的优点是不需要安装任何包(因为它是内置的),而且也更容易学习。除了方法 Add
、CompleteAdding
和 GetConsumingEnumerable
,您不必学习更多内容。它还支持取消。缺点是它是一个阻塞集合,因此它会在等待新消息到达时阻塞消费者线程,并在等待内部缓冲区中的可用 space 时阻塞生产者线程(仅当您指定 boundedCapacity
在构造函数中)。
var uiCts = new CancellationTokenSource();
var globalMsgQueue = new BlockingCollection<string>();
var backgroundUiTask = new Task(() =>
{
foreach (var item in globalMsgQueue.GetConsumingEnumerable(uiCts.Token))
{
ConsumeMsgQueueItem(item);
}
}, uiCts.Token);
BlockingCollection
在内部使用 ConcurrentQueue
作为缓冲区。
我对并行编程 C# 比较陌生(当我开始我的项目时,我研究了 TPL 的 MSDN 示例)并且希望对以下示例代码提供一些输入。 它是几个后台工作任务之一。此特定任务将状态消息推送到日志。
var uiCts = new CancellationTokenSource();
var globalMsgQueue = new ConcurrentQueue<string>();
var backgroundUiTask = new Task(
() =>
{
while (!uiCts.IsCancellationRequested)
{
while (globalMsgQueue.Count > 0)
ConsumeMsgQueue();
Thread.Sleep(backgroundUiTimeOut);
}
},
uiCts.Token);
// Somewhere else entirely
backgroundUiTask.Start();
Task.WaitAll(backgroundUiTask);
我在阅读了 Alternatives to using Thread.Sleep for waiting,
这提示我使用 Task.Delay 而不是 Thread.Sleep 作为第一步并引入 TaskCreationOptions.LongRunning。
但我想知道我可能还遗漏了哪些其他注意事项?轮询 MsgQueue.Count 是一种代码味道吗?更好的版本会依赖于事件吗?
绝对 Task.Delay
优于 Thread.Sleep
,因为您不会阻塞池中的线程,并且在等待期间池中的线程将可用于处理其他任务。然后,你不需要让你的任务变得很长-运行ning。 long-运行ning任务是运行在一个专用线程里,然后Task.Delay
就没有意义了
相反,我会推荐一种不同的方法。只需使用 System.Threading.Timer,让您的生活变得简单。计时器是内核对象,将 运行 在线程池上进行回调,您不必担心延迟或休眠。
首先,没有理由使用 Task.Start
或使用 Task 构造函数。任务不是线程,它们本身不是 运行。它们是 承诺 某些事情将在 未来 完成并且可能会或可能不会产生任何结果。其中一些将 运行 在线程池线程上。使用 Task.Run
创建并 运行 在需要时一步完成任务。
我认为实际的问题是如何创建缓冲的后台工作者。 .NET 已经提供了可以执行此操作的 classes。
ActionBlock< T >
ActionBlock class 已经实现了这个以及更多 - 它允许您指定输入缓冲区有多大,同时处理传入消息的任务数,支持取消和 异步完成。
日志块可以像这样简单:
_logBlock=new ActionBlock<string>(msg=>File.AppendAllText("myLog.txt",msg));
ActionBlock class 本身负责缓冲输入,在新消息到达时将新消息提供给工作函数,如果缓冲区已满,则可能 阻塞 发送者等.不需要轮询
其他代码可以使用Post
或SendAsync
向区块发送消息:
_block.Post("some message");
完成后,我们可以告诉块 Complete()
并等待它处理任何剩余的消息:
_block.Complete();
await _block.Completion;
频道
一个更新的、较低级别的选项是使用 Channels。您可以将通道视为一种异步队列,尽管它们可用于实现复杂的处理管道。如果今天编写 ActionBlock,它将在内部使用 Channels。
使用频道,需要您自己提供"worker"任务。不过不需要轮询,因为 ChannelReader class 允许您异步读取消息,甚至可以使用 await foreach
.
writer 方法可能如下所示:
public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_=Task.Run(async ()=>{
await foreach(var msg in channel.Reader.ReadAllAsync(token))
{
File.AppendAllText(path,msg);
}
},token).ContinueWith(t=>writer.TryComplete(t.Exception);
return writer;
}
....
_logWriter=LogIt(somePath);
其他代码可以使用WriteAsync
或TryWrite
发送消息,例如:
_logWriter.TryWrite(someMessage);
完成后,我们可以在编写器上调用 Complete()
或 TryComplete()
:
_logWriter.TryComplete();
行
.ContinueWith(t=>writer.TryComplete(t.Exception);
需要确保通道关闭,即使发生异常或发出取消令牌信号也是如此。
乍一看这似乎太麻烦了。通道允许我们轻松 运行 初始化代码或将状态从一条消息传递到下一条消息。我们可以在循环开始之前打开一个流并使用它而不是每次调用 File.AppendAllText
时都重新打开文件,例如:
public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
_=Task.Run(async ()=>{
//***** Can't do this with an ActionBlock ****
using(var writer=File.AppendText(somePath))
{
await foreach(var msg in channel.Reader.ReadAllAsync(token))
{
writer.WriteLine(msg);
//Or
//await writer.WriteLineAsync(msg);
}
}
},token).ContinueWith(t=>writer.TryComplete(t.Exception);
return writer;
}
TPL Dataflow library is the preferred tool for this kind of job. It allows building efficient producer-consumer pairs quite easily, and more complex pipelines as well, while offering a complete set of configuration options. In your case using a single ActionBlock
应该够了。
您可能会考虑使用一个更简单的解决方案 BlockingCollection
。它的优点是不需要安装任何包(因为它是内置的),而且也更容易学习。除了方法 Add
、CompleteAdding
和 GetConsumingEnumerable
,您不必学习更多内容。它还支持取消。缺点是它是一个阻塞集合,因此它会在等待新消息到达时阻塞消费者线程,并在等待内部缓冲区中的可用 space 时阻塞生产者线程(仅当您指定 boundedCapacity
在构造函数中)。
var uiCts = new CancellationTokenSource();
var globalMsgQueue = new BlockingCollection<string>();
var backgroundUiTask = new Task(() =>
{
foreach (var item in globalMsgQueue.GetConsumingEnumerable(uiCts.Token))
{
ConsumeMsgQueueItem(item);
}
}, uiCts.Token);
BlockingCollection
在内部使用 ConcurrentQueue
作为缓冲区。