我可以使用哪个 Task<T> 扩展来递归地等待其他任务?
Which Task<T> extension can I utilize for waiting on other tasks recursively?
我不太确定执行此操作的有效方法。我有文件,其中文件的内容指向其他文件,例如:
A
|-- B
| |-- C
| |-- D
| |-- E
|
|-- F
|-- C
G
|-- H
| |-- I
|
|-- D
| |-- E
|
|-- J
这种情况持续了数十万个文件;幸运的是,依赖的深度很浅,但为了论证,它可能是 N 级深度,不可能有循环。我的目标是了解每个文件的完整依赖关系(扁平化)。例如:
- A: (B, C, D, E, F) -- 注意 'C' 只列出一次。
- B: (C, D, E)
- C:()
- D: (E)
- E: ()
- F: (C)
- G: (D, E, H, I, J)
- 等
我首先创建了一些模型来跟踪这些信息:
public class FileData
{
public string FilePath { get; set; }
public ISet<FileInfo> DependentUpon { get; set; }
}
当然,然后我创建了一个List<FileData>
来存储处理后的文件。同步扫描文件内容以构建此依赖关系树(然后将其展平)会花费太长时间,因此我探索了查看 async/await,这有助于加快速度,但我希望它变得更均匀在将其释放到生产环境之前更快。
我的 async/await 尝试要快得多,但仍然不够有效。
public async Task<ICollection<FileData>> ProcessAsync(IEnumerable<FileInfo> files)
{
var mappings = new Dictionary<FileInfo, FileData>();
foreach (var file in files)
{
// Static Method that constructs an instance of the class
// and utilizes async I/O to read the file line-by-line
// to build any first level dependencies.
var info = await FileData.CreateAsync(file);
// Update progress + Other Properties
mappings.Add(file, info);
}
// Go through the list and recursively add to the dependencies
foreach (var item in list)
{
foreach (var dependency in GetAllDependencies(item, mappings))
{
file.DependentUpon.Add(dependency);
}
}
}
IEnumerable<FileInfo> GetAllDependencies(FileData data, IDictionary<FileInfo, FileData> mappings)
{
foreach (var file in info.DependentUpon)
{
yield return file;
foreach (var child in GetAllDependencies(mappings[file], mappings))
{
yield return child;
}
}
}
当然,这在某种程度上是不错的异步,但是当我尝试获取层次结构(扁平化)时,它仍然非常同步且速度很慢。我正在尝试重构解决方案,以便利用 async/await 在分层搜索中更快地工作。到目前为止,我只有伪描述,我不知道这是否可行或如何正确实施:
创建 FileInfo
和 Task<FileData>
的字典(所以我不再等待构建 class 实例)。在扫描第一级 DependentUpon 的文件后,我找到匹配的任务,并且只有在这些任务完成后才继续我当前的任务。当然,这些任务具有相同的指令,因此它们只有在它们的依赖关系完成时才会被标记为已完成。我想同时开始所有任务。例如(只是一个例子,我无法预测什么任务在什么时候完成):
- 开始任务 A
- 开始任务 B
- 扫描文件 A,DependentUpon (B, F)
- 开始任务 C
- 扫描文件 B,DependentUpon (C, D)
- 任务 A 等到任务 B 和 F 完成。
- 开始任务 D
- 扫描文件 C
- ...
- 任务 D 等到任务 E 完成。
- 扫描文件E,DependentUpon()
- 任务 E 已完成
- 任务 D 已完成
- 任务 C 已完成
- 任务 B 已完成。
- 开始任务 J
- 任务 F 已完成。
- 任务 A 已完成。
- ...
- 所有任务完成
考虑使用 Task.WhenAll<> 并发等待加载(递归)根项目的任务。您还可以推迟依赖列表扩展,这会减少函数的 运行 时间并更有效地使用内存。
public class FileData
{
public string FilePath { get; set; }
public ISet<FileInfo> DependentUpon { get; set; }
public IEnumerable<FileInfo> Dependencies {get; set;}
}
新 属性 Dependencies
提供所有依赖项的列表。 DependentUpon
现在仅包含直接依赖项,无需更改。
public async Task<ICollection<FileData>> ProcessAsync(IEnumerable<FileInfo> files)
{
var map = new Dictionary<FileInfo, Task<FileData>>();
var tasks = files.Select(it => LoadFileDataAsync(it, map));
return await Task.WhenAll(tasks);
}
static async Task<FileData> LoadFileDataAsync(FileInfo fileInfo, Dictionary<FileInfo, Task<FileData>> map)
{
// Load recursively FileData elements for all children
// storing the result in the map.
Task<FileData> pendingTask;
bool isNew;
lock (map)
{
isNew = !map.TryGetValue(fileInfo, out pendingTask);
if (isNew)
{
pendingTask = FileData.CreateAsync(fileInfo);
map.Add(fileInfo, pendingTask);
}
}
var data = await pendingTask;
if (isNew)
{
// Assign an iterator traversing through the dependency graph
// Note: parameters are captured by reference.
data.Dependencies = ExpandDependencies(data.DependsUpon, map);
if (data.DependsUpon.Count > 0)
{
// Recursively load child items
var tasks = data.DependsUpon.Select(it => (Task)LoadFileDataAsync(it, map));
await Task.WhenAll(tasks);
}
}
return data;
}
static IEnumerable<FileInfo> ExpandDependencies(ISet<FileInfo> directDependencies, Dictionary<FileInfo, Task<FileData>> map)
{
if (directDependencies.Count == 0)
{
yield break;
}
// Depth-first graph traversal
var visited = new HashSet<FileInfo>(map.Comparer); // check for duplicates
var stack = new Stack<FileInfo>();
foreach(var item in directDependencies)
{
stack.Push(item);
}
while(stack.Count > 0)
{
var info = stack.Pop();
if (visited.Add(info))
{
yield return info;
var data = map[info].Result;
foreach (var child in data.DependsUpon)
{
stack.Push(child);
}
}
}
}
我不太确定执行此操作的有效方法。我有文件,其中文件的内容指向其他文件,例如:
A
|-- B
| |-- C
| |-- D
| |-- E
|
|-- F
|-- C
G
|-- H
| |-- I
|
|-- D
| |-- E
|
|-- J
这种情况持续了数十万个文件;幸运的是,依赖的深度很浅,但为了论证,它可能是 N 级深度,不可能有循环。我的目标是了解每个文件的完整依赖关系(扁平化)。例如:
- A: (B, C, D, E, F) -- 注意 'C' 只列出一次。
- B: (C, D, E)
- C:()
- D: (E)
- E: ()
- F: (C)
- G: (D, E, H, I, J)
- 等
我首先创建了一些模型来跟踪这些信息:
public class FileData
{
public string FilePath { get; set; }
public ISet<FileInfo> DependentUpon { get; set; }
}
当然,然后我创建了一个List<FileData>
来存储处理后的文件。同步扫描文件内容以构建此依赖关系树(然后将其展平)会花费太长时间,因此我探索了查看 async/await,这有助于加快速度,但我希望它变得更均匀在将其释放到生产环境之前更快。
我的 async/await 尝试要快得多,但仍然不够有效。
public async Task<ICollection<FileData>> ProcessAsync(IEnumerable<FileInfo> files)
{
var mappings = new Dictionary<FileInfo, FileData>();
foreach (var file in files)
{
// Static Method that constructs an instance of the class
// and utilizes async I/O to read the file line-by-line
// to build any first level dependencies.
var info = await FileData.CreateAsync(file);
// Update progress + Other Properties
mappings.Add(file, info);
}
// Go through the list and recursively add to the dependencies
foreach (var item in list)
{
foreach (var dependency in GetAllDependencies(item, mappings))
{
file.DependentUpon.Add(dependency);
}
}
}
IEnumerable<FileInfo> GetAllDependencies(FileData data, IDictionary<FileInfo, FileData> mappings)
{
foreach (var file in info.DependentUpon)
{
yield return file;
foreach (var child in GetAllDependencies(mappings[file], mappings))
{
yield return child;
}
}
}
当然,这在某种程度上是不错的异步,但是当我尝试获取层次结构(扁平化)时,它仍然非常同步且速度很慢。我正在尝试重构解决方案,以便利用 async/await 在分层搜索中更快地工作。到目前为止,我只有伪描述,我不知道这是否可行或如何正确实施:
创建 FileInfo
和 Task<FileData>
的字典(所以我不再等待构建 class 实例)。在扫描第一级 DependentUpon 的文件后,我找到匹配的任务,并且只有在这些任务完成后才继续我当前的任务。当然,这些任务具有相同的指令,因此它们只有在它们的依赖关系完成时才会被标记为已完成。我想同时开始所有任务。例如(只是一个例子,我无法预测什么任务在什么时候完成):
- 开始任务 A
- 开始任务 B
- 扫描文件 A,DependentUpon (B, F)
- 开始任务 C
- 扫描文件 B,DependentUpon (C, D)
- 任务 A 等到任务 B 和 F 完成。
- 开始任务 D
- 扫描文件 C
- ...
- 任务 D 等到任务 E 完成。
- 扫描文件E,DependentUpon()
- 任务 E 已完成
- 任务 D 已完成
- 任务 C 已完成
- 任务 B 已完成。
- 开始任务 J
- 任务 F 已完成。
- 任务 A 已完成。
- ...
- 所有任务完成
考虑使用 Task.WhenAll<> 并发等待加载(递归)根项目的任务。您还可以推迟依赖列表扩展,这会减少函数的 运行 时间并更有效地使用内存。
public class FileData
{
public string FilePath { get; set; }
public ISet<FileInfo> DependentUpon { get; set; }
public IEnumerable<FileInfo> Dependencies {get; set;}
}
新 属性 Dependencies
提供所有依赖项的列表。 DependentUpon
现在仅包含直接依赖项,无需更改。
public async Task<ICollection<FileData>> ProcessAsync(IEnumerable<FileInfo> files)
{
var map = new Dictionary<FileInfo, Task<FileData>>();
var tasks = files.Select(it => LoadFileDataAsync(it, map));
return await Task.WhenAll(tasks);
}
static async Task<FileData> LoadFileDataAsync(FileInfo fileInfo, Dictionary<FileInfo, Task<FileData>> map)
{
// Load recursively FileData elements for all children
// storing the result in the map.
Task<FileData> pendingTask;
bool isNew;
lock (map)
{
isNew = !map.TryGetValue(fileInfo, out pendingTask);
if (isNew)
{
pendingTask = FileData.CreateAsync(fileInfo);
map.Add(fileInfo, pendingTask);
}
}
var data = await pendingTask;
if (isNew)
{
// Assign an iterator traversing through the dependency graph
// Note: parameters are captured by reference.
data.Dependencies = ExpandDependencies(data.DependsUpon, map);
if (data.DependsUpon.Count > 0)
{
// Recursively load child items
var tasks = data.DependsUpon.Select(it => (Task)LoadFileDataAsync(it, map));
await Task.WhenAll(tasks);
}
}
return data;
}
static IEnumerable<FileInfo> ExpandDependencies(ISet<FileInfo> directDependencies, Dictionary<FileInfo, Task<FileData>> map)
{
if (directDependencies.Count == 0)
{
yield break;
}
// Depth-first graph traversal
var visited = new HashSet<FileInfo>(map.Comparer); // check for duplicates
var stack = new Stack<FileInfo>();
foreach(var item in directDependencies)
{
stack.Push(item);
}
while(stack.Count > 0)
{
var info = stack.Pop();
if (visited.Add(info))
{
yield return info;
var data = map[info].Result;
foreach (var child in data.DependsUpon)
{
stack.Push(child);
}
}
}
}