等待连续 UI 后台轮询任务

Waiting on a continuous UI background polling task

我对并行编程 C# 比较陌生(当我开始我的项目时,我研究了 TPL 的 MSDN 示例)并且希望对以下示例代码提供一些输入。 它是几个后台工作任务之一。此特定任务将状态消息推送到日志。

var uiCts = new CancellationTokenSource();
var globalMsgQueue = new ConcurrentQueue<string>();

var backgroundUiTask = new Task(
() =>
{
    while (!uiCts.IsCancellationRequested)
    {
        while (globalMsgQueue.Count > 0)
            ConsumeMsgQueue();
        Thread.Sleep(backgroundUiTimeOut);
    }
},
uiCts.Token);

// Somewhere else entirely
backgroundUiTask.Start();
Task.WaitAll(backgroundUiTask);

我在阅读了 Alternatives to using Thread.Sleep for waiting, , When to use Task.Delay, when to use Thread.Sleep?,

等多个主题后要求专业人士提供意见

这提示我使用 Task.Delay 而不是 Thread.Sleep 作为第一步并引入 TaskCreationOptions.LongRunning。

但我想知道我可能还遗漏了哪些其他注意事项?轮询 MsgQueue.Count 是一种代码味道吗?更好的版本会依赖于事件吗?

绝对 Task.Delay 优于 Thread.Sleep,因为您不会阻塞池中的线程,并且在等待期间池中的线程将可用于处理其他任务。然后,你不需要让你的任务变得很长-运行ning。 long-运行ning任务是运行在一个专用线程里,然后Task.Delay就没有意义了

相反,我会推荐一种不同的方法。只需使用 System.Threading.Timer,让您的生活变得简单。计时器是内核对象,将 运行 在线程池上进行回调,您不必担心延迟或休眠。

首先,没有理由使用 Task.Start 或使用 Task 构造函数。任务不是线程,它们本身不是 运行。它们是 承诺 某些事情将在 未来 完成并且可能会或可能不会产生任何结果。其中一些将 运行 在线程池线程上。使用 Task.Run 创建并 运行 在需要时一步完成任务。

我认为实际的问题是如何创建缓冲的后台工作者。 .NET 已经提供了可以执行此操作的 classes。

ActionBlock< T >

ActionBlock class 已经实现了这个以及更多 - 它允许您指定输入缓冲区有多大,同时处理传入消息的任务数,支持取消和 异步完成。

日志块可以像这样简单:

_logBlock=new ActionBlock<string>(msg=>File.AppendAllText("myLog.txt",msg));

ActionBlock class 本身负责缓冲输入,在新消息到达时将新消息提供给工作函数,如果缓冲区已满,则可能 阻塞 发送者等.不需要轮询

其他代码可以使用PostSendAsync向区块发送消息:

_block.Post("some message");

完成后,我们可以告诉块 Complete() 并等待它处理任何剩余的消息:

_block.Complete();
await _block.Completion;

频道

一个更新的、较低级别的选项是使用 Channels。您可以将通道视为一种异步队列,尽管它们可用于实现复杂的处理管道。如果今天编写 ActionBlock,它将在内部使用 Channels。

使用频道,需要您自己提供"worker"任务。不过不需要轮询,因为 ChannelReader class 允许您异步读取消息,甚至可以使用 await foreach.

writer 方法可能如下所示:

public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
    _=Task.Run(async ()=>{
        await foreach(var msg in channel.Reader.ReadAllAsync(token))
        {
            File.AppendAllText(path,msg);
        }
    },token).ContinueWith(t=>writer.TryComplete(t.Exception);
    return writer;
}

....

_logWriter=LogIt(somePath);

其他代码可以使用WriteAsyncTryWrite发送消息,例如:

_logWriter.TryWrite(someMessage);

完成后,我们可以在编写器上调用 Complete()TryComplete() :

_logWriter.TryComplete();

.ContinueWith(t=>writer.TryComplete(t.Exception);

需要确保通道关闭,即使发生异常或发出取消令牌信号也是如此。

乍一看这似乎太麻烦了。通道允许我们轻松 运行 初始化代码或将状态从一条消息传递到下一条消息。我们可以在循环开始之前打开一个流并使用它而不是每次调用 File.AppendAllText 时都重新打开文件,例如:

public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
    _=Task.Run(async ()=>{
       //***** Can't do this with an ActionBlock ****
        using(var writer=File.AppendText(somePath))
        {
            await foreach(var msg in channel.Reader.ReadAllAsync(token))
            {
                writer.WriteLine(msg);
                //Or
                //await writer.WriteLineAsync(msg);
            }
        }
    },token).ContinueWith(t=>writer.TryComplete(t.Exception);
    return writer;
}

TPL Dataflow library is the preferred tool for this kind of job. It allows building efficient producer-consumer pairs quite easily, and more complex pipelines as well, while offering a complete set of configuration options. In your case using a single ActionBlock应该够了。

您可能会考虑使用一个更简单的解决方案 BlockingCollection。它的优点是不需要安装任何包(因为它是内置的),而且也更容易学习。除了方法 AddCompleteAddingGetConsumingEnumerable,您不必学习更多内容。它还支持取消。缺点是它是一个阻塞集合,因此它会在等待新消息到达时阻塞消费者线程,并在等待内部缓冲区中的可用 space 时阻塞生产者线程(仅当您指定 boundedCapacity 在构造函数中)。

var uiCts = new CancellationTokenSource();
var globalMsgQueue = new BlockingCollection<string>();

var backgroundUiTask = new Task(() =>
{
    foreach (var item in globalMsgQueue.GetConsumingEnumerable(uiCts.Token))
    {
        ConsumeMsgQueueItem(item);
    }
}, uiCts.Token);

BlockingCollection 在内部使用 ConcurrentQueue 作为缓冲区。