我应该选择使用任务并行库的简单字典还是 ConcurrentDictionary
Should I choose simple Dictionary or ConcurrentDictionary working with task parallel library
这是一个简化的场景 - 用户想要下载和处理一些数据:
private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
foreach (var (id, path, name) in properties)
{
_testDictionary.TryAdd(id, (path, name));
}
await CreatePipeline(properties);
//after returning I would like to check if _testDictionary contains any elements,
//and what is their status
}
所有传入的项目都在 ConcurrentDictionary
中注册,然后调用 TPL 数据流管道进行下载和处理:
public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => { return data.id; },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
var resultsBlock = new ActionBlock<int>((data) =>
{
_testDictionary.TryRemove(data, out _);
//or
//_testDictionary.AddOrUpdate(...);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
downloadBlock.LinkTo(resultsBlock,
new DataflowLinkOptions { PropagateCompletion = true });
foreach (var item in properties)
{
await downloadBlock.SendAsync(item);
}
resultsBlock.Complete();
await resultsBlock.Completion;
}
在结果块的最后,根据它的进展情况从 _testDictionary
中删除(或更新)项目。我的愚蠢问题是 - 如果我为创建我的管道的所有块设置 MaxDegreeOfParallelism = 1
并确保不会同时存在超过一个管道 运行,我真的需要 ConcurrentDictionary
对于这个或简单的 Dictionary
就足够了吗?我担心管道可以在不同的线程上执行并从那里访问简单的 Dictionary
可能会导致问题。
是的,如果你的代码结构保证字典不能被多线程并发访问,那么一个普通的Dictionary
就足够了。如果您担心字典内部状态的可见性,以及某些线程在某个时候看到陈旧状态的可能性,这不是问题,因为:
TPL includes the appropriate barriers when tasks are queued and at the beginning/end of task execution, so that values are appropriately made visible.
()
数据流
据我所知,您的 StartDownload
试图表现得像 生产者 而您的 CreatePipeline
表现得像 消费者 从 _testDictionary
的角度来看。 Add
和 Remove
调用分为两个不同的函数,这就是为什么您需要将该变量设置为 class 级别。
如果 CreatePipeline
包含两个调用并且它 returns 所有未处理的元素怎么办?
public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));
// var downloadBlock = ...;
var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);
//...
downloadBlock.Complete();
await resultsBlock.Completion;
return unprocessed.ToDictionary(
dict => dict.Key,
dict => dict.Value);
}
正在订购
如果排序无关紧要,那么您可以考虑像这样重写 TransformBlock
填充逻辑:
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
不可变字典
如果你想确保返回的未处理项目不能被其他线程修改,那么你可以利用 ImmutableDictionary.
所以,如果我们把所有东西放在一起,它可能看起来像这样:
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
var unprocessedProperties = await CreatePipeline(properties);
foreach (var property in unprocessedProperties)
{
//TODO
}
}
public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id, options);
var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);
downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
downloadBlock.Complete();
await resultsBlock.Completion;
return unprocessed.ToImmutableDictionary(
dict => dict.Key,
dict => dict.Value);
}
编辑:反映新的新要求
正如 OP 所指出的,字典背后的主要原因是提供扩展待处理队列的能力,同时处理仍在进行。
换句话说,待处理项的处理和收集不是一次性的,而是连续的activity。
好处是您可以完全摆脱 _testDictionary
和 resultsBlock
。你只需要不断Post
或Send
新数据到TransformBlock
。在单独的方法中等待处理 (StopDownload
)。
private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
public MyAwesomeClass()
{
downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
}
public void StartDownload(List<(int id, string path, string name)> properties)
{
//Starts to send props, but does not await them
_ = properties.Select(downloadBlock.SendAsync).ToList();
//You can await the send operation if you wish
}
public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}
可以轻松修改此结构以注入 BufferBlock
来平滑负载:
private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
public MyAwesomeBufferedClass()
{
var transform = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});
var buffer = new BufferBlock<(int id, string path, string name)>(
new DataflowBlockOptions() { BoundedCapacity = 100});
buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
downloadBlock = buffer;
}
public void StartDownload(List<(int id, string path, string name)> properties)
{
_ = properties.Select(downloadBlock.SendAsync).ToList();
}
public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}
这是一个简化的场景 - 用户想要下载和处理一些数据:
private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
foreach (var (id, path, name) in properties)
{
_testDictionary.TryAdd(id, (path, name));
}
await CreatePipeline(properties);
//after returning I would like to check if _testDictionary contains any elements,
//and what is their status
}
所有传入的项目都在 ConcurrentDictionary
中注册,然后调用 TPL 数据流管道进行下载和处理:
public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => { return data.id; },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
var resultsBlock = new ActionBlock<int>((data) =>
{
_testDictionary.TryRemove(data, out _);
//or
//_testDictionary.AddOrUpdate(...);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
downloadBlock.LinkTo(resultsBlock,
new DataflowLinkOptions { PropagateCompletion = true });
foreach (var item in properties)
{
await downloadBlock.SendAsync(item);
}
resultsBlock.Complete();
await resultsBlock.Completion;
}
在结果块的最后,根据它的进展情况从 _testDictionary
中删除(或更新)项目。我的愚蠢问题是 - 如果我为创建我的管道的所有块设置 MaxDegreeOfParallelism = 1
并确保不会同时存在超过一个管道 运行,我真的需要 ConcurrentDictionary
对于这个或简单的 Dictionary
就足够了吗?我担心管道可以在不同的线程上执行并从那里访问简单的 Dictionary
可能会导致问题。
是的,如果你的代码结构保证字典不能被多线程并发访问,那么一个普通的Dictionary
就足够了。如果您担心字典内部状态的可见性,以及某些线程在某个时候看到陈旧状态的可能性,这不是问题,因为:
TPL includes the appropriate barriers when tasks are queued and at the beginning/end of task execution, so that values are appropriately made visible.
(
数据流
据我所知,您的 StartDownload
试图表现得像 生产者 而您的 CreatePipeline
表现得像 消费者 从 _testDictionary
的角度来看。 Add
和 Remove
调用分为两个不同的函数,这就是为什么您需要将该变量设置为 class 级别。
如果 CreatePipeline
包含两个调用并且它 returns 所有未处理的元素怎么办?
public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));
// var downloadBlock = ...;
var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);
//...
downloadBlock.Complete();
await resultsBlock.Completion;
return unprocessed.ToDictionary(
dict => dict.Key,
dict => dict.Value);
}
正在订购
如果排序无关紧要,那么您可以考虑像这样重写 TransformBlock
填充逻辑:
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
不可变字典
如果你想确保返回的未处理项目不能被其他线程修改,那么你可以利用 ImmutableDictionary.
所以,如果我们把所有东西放在一起,它可能看起来像这样:
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
var unprocessedProperties = await CreatePipeline(properties);
foreach (var property in unprocessedProperties)
{
//TODO
}
}
public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id, options);
var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);
downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
downloadBlock.Complete();
await resultsBlock.Completion;
return unprocessed.ToImmutableDictionary(
dict => dict.Key,
dict => dict.Value);
}
编辑:反映新的新要求
正如 OP 所指出的,字典背后的主要原因是提供扩展待处理队列的能力,同时处理仍在进行。
换句话说,待处理项的处理和收集不是一次性的,而是连续的activity。
好处是您可以完全摆脱 _testDictionary
和 resultsBlock
。你只需要不断Post
或Send
新数据到TransformBlock
。在单独的方法中等待处理 (StopDownload
)。
private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
public MyAwesomeClass()
{
downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
}
public void StartDownload(List<(int id, string path, string name)> properties)
{
//Starts to send props, but does not await them
_ = properties.Select(downloadBlock.SendAsync).ToList();
//You can await the send operation if you wish
}
public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}
可以轻松修改此结构以注入 BufferBlock
来平滑负载:
private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
public MyAwesomeBufferedClass()
{
var transform = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});
var buffer = new BufferBlock<(int id, string path, string name)>(
new DataflowBlockOptions() { BoundedCapacity = 100});
buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
downloadBlock = buffer;
}
public void StartDownload(List<(int id, string path, string name)> properties)
{
_ = properties.Select(downloadBlock.SendAsync).ToList();
}
public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}