当其他任务完成时通知任务

Notify task when other tasks complete

.Net TPL 专家,

注意:不能使用DataFlow库;不允许加载项。

我有四个任务,如下图所示:

然后我等待任务完成,即:

Task.WhenAll( t1, t2, t3, t4 )

所以我有一个生产者任务、多个消费者任务和一个保存结果的任务。

我的问题是:

如何在任务 2 和 3 完成时通知任务 4,以便任务 4 也知道何时结束?

我发现许多示例 "move" 数据以线性 "pipeline" 方式从一个任务传输到另一个任务,但没有找到任何示例来说明上述情况;即,当任务 2 和 3 完成时如何通知任务 4,以便它也知道何时完成。

我最初的想法是 "register" 任务 2 和 3 与任务 4 并简单地监视每个已注册任务的状态——当任务 2 和 3 不再 运行 时,然后任务 4可以停止(如果结果队列也为空)。

提前致谢。

如果您还为 results_queue 使用 BlockingCollection,那么您可以使用属性 BlockingCollection.IsCompleted 和 BlockingCollection.IsAddingCompleted 来实现这些通知。 进程是:

  • 当输入文件中没有更多记录时,task1 在输入集合上调用方法 BlockingCollection.CompleteAdding()。
  • task2 和 task3 在输入集合上定期检查 属性 IsCompleted。当输入集合为空且生产者调用 CompleteAdding() 方法时,此 属性 为真。 属性 为真后,任务 2 和任务 3 完成,它们可以在结果队列上调用 CompleteAdding() 方法并完成它们的工作。
  • task4 可以在 result_queue 中的记录到达时进行处理,也可以等待结果队列 IsAddingCompleted 属性 变为 true 然后开始处理。当结果队列上的 IsCompleted 属性 为真时,task4 的工作完成。

编辑: 我不确定您是否熟悉这些 IsCompleted 和 IsAddingCompleted 属性。它们是不同的,并且非常适合您的情况。我认为除了 BlockingCollection 属性之外您不需要任何其他同步元素。请询问是否需要额外的解释!

    BlockingCollection<int> inputQueue;
    BlockingCollection<int> resultQueue;

    public void StartTasks()
    {
        inputQueue = new BlockingCollection<int>();
        resultQueue = new BlockingCollection<int>();

        Task task1 = Task.Run(() => Task1());
        Task task2 = Task.Run(() => Task2_3());
        Task task3 = Task.Run(() => Task2_3());
        Task[] tasksInTheMiddle = new Task[] { task2, task3 };
        Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding()));
        Task task4 = Task.Run(() => Task4());

        //Waiting for tasks to finish
    }
    private void Task1()
    {
        while(true)
        {
            int? input = ReadFromInputFile();
            if (input != null)
            {
                inputQueue.Add((int)input);
            }
            else
            {
                inputQueue.CompleteAdding();
                break;
            }
        }
    }

    private void Task2_3()
    {
        while(inputQueue.IsCompleted)
        {
            int input = inputQueue.Take();
            resultQueue.Add(input);
        }
    }

    private void Task4()
    {
        while(resultQueue.IsCompleted)
        {
            int result = resultQueue.Take();
            WriteToOutputFile(result);
        }
    }

您描述的任务很适合 TPL Dataflow library, small add-on for a TPL itself (it can be included in project via nuget package, .NET 4.5 is supported), you just easily introduce the flow something like this (code updated based on comments with BroadcastBlock):

var buffer = new BroadcastBlock<string>();
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ });

不确定你的解法逻辑,所以我认为你只是简单地操作这里的字符串。你应该asynchronously send all incoming data for a first block (if you Post你的数据,如果缓冲区过载,消息将被丢弃),并且link彼此之间的块,像这样:

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true });
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true });
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var s in IncomingData)
{
    await buffer.SendAsync(s);
}
buffer.Complete();

如果你的消费者应该同时处理所有项,那么你应该使用BroadcastBlock(可能会出现一些),其他选项是按消费者过滤您的消息(可能是消息 id 的剩余部分按消费者数量),但在这种情况下,您应该 link 给另一个消费者,它将 "catch" 所有出于某种原因没有的消息被消耗了。

如您所见,块之间的 links 是通过完整传播创建的,因此在此之后您可以简单地附加到 .Completion 任务 属性 以获得 resultsProcessor:

resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ });

这是对 已经说过的内容的一些扩展。

通过使用 BlockingCollection,您可以对其调用 GetConsumingEnumerable(),并将其视为普通的 foreach 循环。这将使您的任务结束 "natually"。您唯一需要做的就是添加一个额外的任务来监视任务 2 和 3 以查看它们何时结束并调用它们的完整添加。

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();

Task RunProcess()
{
    Task1Start();
    var t2 = Stage2Start();
    var t3 = Stage2Start();
    Stage2MonitorStart(t2,t3);
    retrun Task4Start();
}

public void Task1Start()
{
    Task.Run(()=>
    {
        foreach(var item in GetFileSource())
        {
            var processedItem = Process(item);
            _stageOneBlockingCollection.Add(processedItem);
        }
        _stageOneBlockingCollection.CompleteAdding();
    }
}

public Task Stage2Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage2(item);
            _stageTwoBlockingCollection.Add(processedItem);
        }
    }
}

void Stage2MonitorStart(params Task[] tasks)
{
    //Once all tasks complete mark the collection complete adding.
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
}

public Task Stage4Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage4(item);
            WriteToOutputFile(processedItem);
        }
    }
}