运行 并行任务并连接输出

Run tasks in parallel and concatenate the output

我想获取多个数据提供者,他们 return 他们的数据结构相同,但数据输出不同。最后需要附加数据源的输出,以便我可以使用总结果。为了提高性能,需要并行调用这些数据源。我现在有这个解决方案:

Task<List<Result>> dataSource1 = null;
Task<List<Result>> dataSource2 = null;
foreach (var dataSource in dataSourcesToBeFetched)
        {
            switch (dataSource)
            {
                case DataSource.DataSource1:
                    dataSource1 = DataSource1();
                    break;

                case DataSource.DataSource2:
                    dataSource2 =DataSource2();
                    break;
            }
        }
await Task.WhenAll(dataSource1, dataSource2);
var allData = dataSource1.Result.Append(dataSource2.Result)

但我对此并不满意。添加更多数据源时,我需要将新结果附加到列表中,这看起来很难看。除此之外,我想使用 switch 表达式,但我正在努力解决这个问题。

所有这些代码都可以替换为:

var results=await Task.WhenAll(DataSource1(),DataSource2());

Task.WhenAll< TResult>(Task< TResult>[])方法returns一个Task< TResult[]>所有异步操作的结果。

获得结果后,您可以将它们与 Enumerable.SelectMany 合并:

var flattened=results.SelectMany(r=>r).ToList();

虽然您可以将这两种操作结合​​起来,但最好避免这样做。这导致代码难以阅读、维护和调试。在调试期间,您通常希望在 await 之后中断以检查结果是否存在空值或其他意外值。

不同线程上的任务和扁平化 运行,这使得链式调用的调试更加困难。

如果确实需要,可以在 WhenAll 之后使用 ContinueWith 在返回结果之前在线程池线程中处理结果:

var flatten=await Task.WhenAll(DataSource1(),DataSource2())
                      .ContinueWith(t=>t.Results.SelectMany(r=>r)
                                        .ToList());

更新

要过滤源,一种快速而肮脏的方法是创建一个 Dictionary 将源 ID 映射到方法并使用 LINQ 的 Select 来选择它们:

//In a field
Dictionary<DataSource,Func<Task<List<Result>>>> map=new (){
    [DataSource.Source1]=DataSource1,
    [DataSource.Source1]=DataSource2
};

//In the method
DataSource[] fetchSources=new DataSource[0];
var tasks=fetchSources.Select(s=>map[s]());

但这与使用 函数 来完成相同的工作有点不同:

DataSource[] fetchSources=new DataSource[0];
var tasks=fetchSources.Select(s=>RunSource(s));
//or even 
//var tasks=fetchSources.Select(RunSource);
    
var results=await Task.WhenAll(tasks);
var flattened=results.SelectMany(r=>r).ToList();


public static Task<List<Result>> RunSource(DataSource source)
{
    return source switch {
            DataSource.Source1=> DataSource1(),
            DataSource.Source2=> DataSource2(),
            _=>throw new ArgumentOutOfRangeException(nameof(source))
    };
}

您的代码中的一个问题是,如果 dataSourcesToBeFetched 中不存在 DataSource.DataSource1,则您正在等待空任务。

我可能会去等待一系列任务。

类似于:

var dataSources = new List<Task<List<Result>>>();

// check if the DataSource1 is present in the dataSourcesToBeFetched
if(dataSourcesToBeFetched.Any(i => i == DataSource.DataSource1))
    dataSources.Add(DataSource1());

// check if the DataSource2 is present in the dataSourcesToBeFetched
if(dataSourcesToBeFetched.Any(i => i == DataSource.DataSource2))
    dataSources.Add(DataSource2());

// a list to hold all results
var allData = new List<Result>();

// if we need to fetch any, await all tasks.
if(dataSources.Count > 0)
{
    await Task.WhenAll(dataSources);

    // add the results to the list.
    foreach(var dataSource in dataSources)
        allData.AddRange(dataSource.Result);
}