任务 post - 处理完成 2 个任务后立即开始

Task post-processing to start soon after 2 tasks are done

我有一个核心任务检索一些核心数据和多个其他子任务获取额外数据。一旦核心任务和任何子任务准备就绪,希望 运行 对核心数据进行一些更丰富的处理。你知道怎么做吗?

考虑过类似的事情,但不确定是否是我想要的:

// Starting the tasks
var coreDataTask = new Task(...);
var extraDataTask1 = new Task(...);
var extraDataTask2 = new Task(...);

coreDataTask.Start();
extraDataTask1.Start();
extraDataTask2.Start();

// Enriching the results
Task.WaitAll(coreDataTask, extraDataTask1);
EnrichCore(coreDataTask.Results, extraDataTask1.Results);

Task.WaitAll(coreDataTask, extraDataTask2);
EnrichCore(coreDataTask.Results, extraDataTask2.Results);

另外考虑到丰富是在同一个核心对象上,我想我需要把它锁在某个地方吗?

提前致谢!

我认为您可能想要的是 "ContinueWith"(此处的文档:https://msdn.microsoft.com/en-us/library/dd270696(v=vs.110).aspx)。只要您的充实不需要按特定顺序进行即可。

代码如下所示:

var coreTask = new Task<object>(() => { return null; });
var enrichTask1 = new Task<object>(() => { return null; });
var enrichTask2 = new Task<object>(() => { return null; });

coreTask.Start();
coreTask.Wait();

//Create your continue tasks here with the data you want. 
enrichTask1.ContinueWith(task => {/*Do enriching here with task.Result*/});

//Start all enricher tasks here. 
enrichTask1.Start();

//Wait for all the tasks to complete here. 
Task.WaitAll(enrichTask1);

您仍然需要先 运行 您的 CoreTask,因为它需要在所有充实任务之前完成。但是您可以从那里开始所有任务,并在完成后告诉他们 "ContinueWith" 做其他事情。

您还应该快速浏览一下 "Enricher Pattern",它可能能够帮助您完成您想要实现的目标(在线程之外)。像这里的例子:http://www.enterpriseintegrationpatterns.com/DataEnricher.html

这是另一个利用 Task.WhenAny() 检测任务何时完成的想法。

对于这个最小的例子,我假设核心数据和额外数据都是字符串。但是你可以根据你的类型进行调整。

另外,我实际上并没有做任何处理。你将不得不插入你的处理。

此外,我正在做的一个假设(不是很清楚)是您主要尝试并行化数据收集,因为这是昂贵的部分,但丰富的部分实际上非常快。基于该假设,您会注意到任务 运行 并行收集核心数据和额外数据。但是随着数据变得可用,核心数据会同步丰富,以避免必须通过锁定使代码复杂化。

如果您复制粘贴下面的代码,您应该能够运行它是如何工作的。

public static void Main(string[] args)
{
    StartWork().Wait();
}

private async static Task StartWork()
{
    // start core and extra tasks
    Task<string> coreDataTask = Task.Run(() => "core data" /* do something more complicated here */);
    List<Task<string>> extraDataTaskList = new List<Task<string>>();
    for (int i = 0; i < 10; i++)
    {
        int x = i;
        extraDataTaskList.Add(Task.Run(() => "extra data " + x /* do something more complicated here */));
    }

    // wait for core data to be ready first.
    StringBuilder coreData = new StringBuilder(await coreDataTask);

    // enrich core as the extra data tasks complete.
    while (extraDataTaskList.Count != 0)
    {
        Task<string> completedExtraDataTask = await Task.WhenAny(extraDataTaskList);
        extraDataTaskList.Remove(completedExtraDataTask);
        EnrichCore(coreData, await completedExtraDataTask);
    }

    Console.WriteLine(coreData.ToString());
}

private static void EnrichCore(StringBuilder coreData, string extraData)
{
    coreData.Append(" enriched with ").Append(extraData);
}

编辑:.NET 4.0 版本

以下是我将如何针对 .NET 4.0 对其进行更改,同时仍保留相同的总体设计:

  • Task.Run() 变为 Task.Factory.StartNew()
  • 我没有对任务执行 await,而是调用 Result,这是一个等待任务完成的阻塞调用。
  • 使用Task.WaitAny代替Task.WhenAny,这也是阻塞调用。

设计仍然非常相似。两个版本代码之间的一大区别是,在 .NET 4.5 版本中,只要出现 await,当前线程就可以自由地执行其他工作。在 .NET 4.0 版本中,无论何时调用 Task.ResultTask.WaitAny,当前线程都会阻塞,直到任务完成。可能这种差异对您来说并不重要。但如果是,请确保将整个代码块 运行 包装在后台线程或任务中以释放主线程。

另一个区别是异常处理。使用 .NET 4.5 版本,如果您的任何任务因未处理的异常而失败,该异常会自动解包并以非常透明的方式传播。使用 .NET 4.0 版本,您将获得 AggregateException,您必须自行解包和处理。如果这是一个问题,请确保事先对此进行测试,以便知道会发生什么。

就个人而言,我尽量避免 Task.ContinueWith。它往往会使代码非常丑陋且难以阅读。

public static void Main(string[] args)
{
    // start core and extra tasks
    Task<string> coreDataTask = Task.Factory.StartNew(() => "core data" /* do something more complicated here */);
    List<Task<string>> extraDataTaskList = new List<Task<string>>();
    for (int i = 0; i < 10; i++)
    {
        int x = i;
        extraDataTaskList.Add(Task.Factory.StartNew(() => "extra data " + x /* do something more complicated here */));
    }

    // wait for core data to be ready first.
    StringBuilder coreData = new StringBuilder(coreDataTask.Result);

    // enrich core as the extra data tasks complete.
    while (extraDataTaskList.Count != 0)
    {
        int indexOfCompletedTask = Task.WaitAny(extraDataTaskList.ToArray());
        Task<string> completedExtraDataTask = extraDataTaskList[indexOfCompletedTask];
        extraDataTaskList.Remove(completedExtraDataTask);
        EnrichCore(coreData, completedExtraDataTask.Result);
    }

    Console.WriteLine(coreData.ToString());
}

private static void EnrichCore(StringBuilder coreData, string extraData)
{
    coreData.Append(" enriched with ").Append(extraData);
}