我应该选择使用任务并行库的简单字典还是 ConcurrentDictionary

Should I choose simple Dictionary or ConcurrentDictionary working with task parallel library

这是一个简化的场景 - 用户想要下载和处理一些数据:

private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
    foreach (var (id, path, name) in properties)
    {
        _testDictionary.TryAdd(id, (path, name));
    }
    await CreatePipeline(properties);
    //after returning I would like to check if _testDictionary contains any elements,
    //and what is their status
}

所有传入的项目都在 ConcurrentDictionary 中注册,然后调用 TPL 数据流管道进行下载和处理:

public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
    var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => { return data.id; },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    var resultsBlock = new ActionBlock<int>((data) =>
    {
        _testDictionary.TryRemove(data, out _);
        //or
        //_testDictionary.AddOrUpdate(...);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    downloadBlock.LinkTo(resultsBlock,
        new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var item in properties)
    {
        await downloadBlock.SendAsync(item);
    }
    resultsBlock.Complete();
    await resultsBlock.Completion;
}

在结果块的最后,根据它的进展情况从 _testDictionary 中删除(或更新)项目。我的愚蠢问题是 - 如果我为创建我的管道的所有块设置 MaxDegreeOfParallelism = 1 并确保不会同时存在超过一个管道 运行,我真的需要 ConcurrentDictionary 对于这个或简单的 Dictionary 就足够了吗?我担心管道可以在不同的线程上执行并从那里访问简单的 Dictionary 可能会导致问题。

是的,如果你的代码结构保证字典不能被多线程并发访问,那么一个普通的Dictionary就足够了。如果您担心字典内部状态的可见性,以及某些线程在某个时候看到陈旧状态的可能性,这不是问题,因为:

TPL includes the appropriate barriers when tasks are queued and at the beginning/end of task execution, so that values are appropriately made visible.

()

数据流

据我所知,您的 StartDownload 试图表现得像 生产者 而您的 CreatePipeline 表现得像 消费者_testDictionary 的角度来看。 AddRemove 调用分为两个不同的函数,这就是为什么您需要将该变量设置为 class 级别。

如果 CreatePipeline 包含两个调用并且它 returns 所有未处理的元素怎么办?

public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
    var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
        properties.ToDictionary(
            prop => prop.id, 
            prop => (prop.path, prop.name)));

    // var downloadBlock = ...;

    var resultsBlock = new ActionBlock<int>(
        (data) => unprocessed.TryRemove(data, out _), options);

    //...

    downloadBlock.Complete();
    await resultsBlock.Completion;

    return unprocessed.ToDictionary(
        dict => dict.Key,
        dict => dict.Value);
}

正在订购

如果排序无关紧要,那么您可以考虑像这样重写 TransformBlock 填充逻辑:

await Task.WhenAll(properties.Select(downloadBlock.SendAsync));

不可变字典

如果你想确保返回的未处理项目不能被其他线程修改,那么你可以利用 ImmutableDictionary.

所以,如果我们把所有东西放在一起,它可能看起来像这样:

public async Task StartDownload(List<(int id, string path, string name)> properties)
{
    var unprocessedProperties = await CreatePipeline(properties);
    foreach (var property in unprocessedProperties)
    {
        //TODO
    }
}

public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
    var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};

    var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
        properties.ToDictionary(
            prop => prop.id, 
            prop => (prop.path, prop.name)));

    var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => data.id, options);

    var resultsBlock = new ActionBlock<int>(
        (data) => unprocessed.TryRemove(data, out _), options);

    downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
    await Task.WhenAll(properties.Select(downloadBlock.SendAsync));

    downloadBlock.Complete();
    await resultsBlock.Completion;

    return unprocessed.ToImmutableDictionary(
        dict => dict.Key, 
        dict => dict.Value); 
}

编辑:反映新的新要求

正如 OP 所指出的,字典背后的主要原因是提供扩展待处理队列的能力,同时处理仍在进行。

换句话说,待处理项的处理和收集不是一次性的,而是连续的activity。

好处是您可以完全摆脱 _testDictionaryresultsBlock。你只需要不断PostSend新数据到TransformBlock。在单独的方法中等待处理 (StopDownload)。

private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;

public MyAwesomeClass()
{
    downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => data.id, 
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
}

public void StartDownload(List<(int id, string path, string name)> properties)
{
   //Starts to send props, but does not await them
    _ = properties.Select(downloadBlock.SendAsync).ToList();
   
   //You can await the send operation if you wish 
}

public async Task StopDownload()
{
    downloadBlock.Complete();
    await downloadBlock.Completion;
}  

可以轻松修改此结构以注入 BufferBlock 来平滑负载:

private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;

public MyAwesomeBufferedClass()
{
    var transform = new TransformBlock<(int id, string path, string name), int>(
        (data) => data.id,
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});

    var buffer = new BufferBlock<(int id, string path, string name)>(
        new DataflowBlockOptions() { BoundedCapacity = 100});

    buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
    downloadBlock = buffer;
}

public void StartDownload(List<(int id, string path, string name)> properties)
{
    _ = properties.Select(downloadBlock.SendAsync).ToList(); 
}

public async Task StopDownload()
{
    downloadBlock.Complete();
    await downloadBlock.Completion;
}