WPF ConcurrentDictionary 或 SortedDictionary 存储多个异步任务的结果

WPF ConcurrentDictionary or SortedDictionary to store results of multiple async Tasks

以下示例代码片段实现了 async 多任务处理功能,即:在线读取股票价格列表(特别是对应道琼斯工业平均指数)并将它们异步存储在 3 个不同的 Collection.Generics:

Dictionary<string,double>

SortedDictionary<string,double>

ConcurrentDictionary<string,double>

该应用程序已经过测试,所有 3 个泛型似乎都正常执行,目前没有观察到任何并发(竞争条件)问题。

我的问题:尽管快速测试的结果都是积极的,但在这种类型中使用 Dictionary<string,double>SortedDictionary<string,double> 是否安全async/await 多任务 WPF 实现,或者它仍然需要使用 ConcurrentDictionary<string,double> 以避免可能的 concurrency/race 条件问题?

重要:为了更加清楚,请注意此问题特定于 async/await 多任务实现(不仅仅是一般的 TPL 内容)。在 await 实现中,它非常好,例如,直接访问任何 UI 控件(例如 TextBox),没有任何 concurrency/racing 问题的风险。我假设同样的考虑也适用于 Windows class 变量(特别是任何泛型数据结构,如本示例中使用的字典),从而可以安全地在相同的环境中使用它们不需要额外线程的方式 synchronization/interlocking(即使常规 Collections/Dictionary 本身不是线程安全的,但 await 可能会处理同步问题)。

清单 1. 通过多个 async 任务在线阅读股票报价的示例应用程序

using System;
using System.Windows;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;

// sample stock quote project implementing multiple async Tasks
// and various Dictionary objects to store results
namespace QuoteServer
{
    public partial class MainWindow : Window
    {
        // quote web service root Url
        private const string _baseUrl = @"http://finance.yahoo.com/d/quotes.csv?f=l1&s=";

        /// <summary>
        /// Sample List of 30 Stock Symbols (Dow Jones Industrial Average) 
        /// </summary>
        List<string> ListDJIA = new List<string>
        {
            "AAPL", "AXP",  "BA",   "CAT",  "CSCO", "CVX",  "DD",   "DIS",  "GE",   "GS",
            "HD",   "IBM",  "INTC", "JNJ",  "JPM",  "KO",   "MCD",  "MMM",  "MRK",  "MSFT",
            "NKE",  "PFE",  "PG",   "TRV",  "UNH",  "UTX",  "V",    "VZ",   "WMT",  "XOM"
        };

        // store results (multiple stock Px) of asynchronous Tasks in various Dictionary objects
        private Dictionary<string, double> _dQuote = new Dictionary<string,double>();
        private ConcurrentDictionary<string, double> _cdQuote = new ConcurrentDictionary<string,double>();
        private SortedDictionary<string, double> _sdQuote = new SortedDictionary<string,double>();

        private CancellationTokenSource _CTS;

        public MainWindow(){ InitializeComponent();}

        /// <summary>start multiple async Tasks to get quote list</summary>
        private async void startButton_Click(object sender, RoutedEventArgs e) {
            _CTS = new CancellationTokenSource();
            try{
                await AccessWebAsync(_CTS.Token);
                resultsTextBox.Text += "\r\nDownloads complete.";
            }
            catch (OperationCanceledException){
                resultsTextBox.Text += "\r\nDownloads canceled.\r\n";
            }
            catch { resultsTextBox.Text += "\r\nDownloads failed.\r\n"; }
            finally { _CTS = null; }
        }

        /// <summary>cancel async Tasks</summary>
        private void cancelButton_Click(object sender, RoutedEventArgs e) {
            if (_CTS != null) _CTS.Cancel();
         }

        /// <summary>
        /// access web service in async mode to run multiple Tasks
        /// </summary>
        /// <param name="CT">CancellationToken</param>
        /// <returns>Task</returns>
        async Task AccessWebAsync(CancellationToken CT)
        {
            _dQuote.Clear();
            _cdQuote.Clear();
            _sdQuote.Clear();
            resultsTextBox.Clear();

            HttpClient _httpClient = new HttpClient();
            List<Task> _taskList = new List<Task>();
            foreach (string _symbol in ListDJIA)
            {
                _taskList.Add(GetQuoteAsync(_symbol, _httpClient, CT));
            }
            await Task.WhenAll(_taskList);

            // test if count is 30 (DJIA stocks)
            resultsTextBox.Text += String.Concat("Dictionary Items: ", _dQuote.Count, Environment.NewLine);
            resultsTextBox.Text += String.Concat("Dictionary Items: ", _cdQuote.Count, Environment.NewLine);
            resultsTextBox.Text += String.Concat("Dictionary Items: ", _sdQuote.Count, Environment.NewLine); 
        }

        /// <summary>
        /// Get quote from web in async mode
        /// </summary>
        /// <param name="Symbol">string</param>
        /// <param name="client">HttpClient</param>
        /// <param name="CT">CancellationToken</param>
        /// <returns>Task</returns>
        async Task GetQuoteAsync(string Symbol, HttpClient client, CancellationToken CT) {
            try {
                HttpResponseMessage _response = await client.GetAsync(_baseUrl + Symbol, CT);
                // bytes array
                byte[] _arrContentBytes = await _response.Content.ReadAsByteArrayAsync();
                // bytes array to string
                string quote = System.Text.Encoding.UTF8.GetString(_arrContentBytes);

                // try parse Stock Px as double
                double _dbl;
                if (double.TryParse(quote,out _dbl))
                {
                    // add item to dictionary (simplest option)
                    _dQuote.Add(Symbol, _dbl);
                    // add item to sorted dictionary (desirable option)
                    _sdQuote.Add(Symbol, _dbl);
                    // add item to concurrent dictionary (guaranteed thread-safe option)
                    _cdQuote.TryAdd(Symbol, _dbl);
                }
            }
            catch { throw; }
        }
    }
}

如果是并发使用我肯定会用ConcurrentDictionary。你可以测试无数次并获得积极的结果,然后 100 次或 1000 次它就会做一些不可预测的事情。它通常 工作实际上是一个问题,因为当它不工作时你将无法重现错误。

几天前。我做了一个并发操作的简单示例,有时我可以 运行 它一百多次而没有任何问题。场景略有不同(它涉及序列。)但基本概念仍然相同。我处理的生产应用程序大部分时间 运行 然后每隔一周就会因为这样的问题而锁定或行为不端。

DictionarySortedDictionary 不是线程安全的,除了只读访问。如果任何一个线程的实例都可以被另一个使用它的线程同时修改,那么你必须同步。否则,你不需要。

在您的示例中,await 的每次使用都在 UI 线程的上下文中,因此 await 之后的延续(即出现在每个 await) 也将出现在 UI 线程中。这意味着每个集合实例仅在 UI 线程中使用,因此修改和访问必然不能同时发生。它们通过使用 await 在单个线程中执行连续代码来序列化。

来自 MSDN

A Dictionary can support multiple readers concurrently, as long as the collection is not modified. Even so, enumerating through a collection is intrinsically not a thread-safe procedure. In the rare case where an enumeration contends with write accesses, the collection must be locked during the entire enumeration. To allow the collection to be accessed by multiple threads for reading and writing, you must implement your own synchronization.

ConcurrentDictionary<TKey,TValue> 是通用 Dictionary<TKey, TValue> 集合的线程安全对应物,两者都是为 O(1) – 基于键查找数据而设计的。

一般来说,当我们需要 concurrent 字典时,我们会交错读取和更新。它实现了它的 thread-safety 没有公共锁以提高效率。它实际上使用一系列锁来提供并发更新,并且有 lockless 次读取。

最后但同样重要的是,SortedDictionaryIDictionary 的排序实现,它们存储为有序树。虽然它们不如未排序的字典快——它们是 O(log2 n) 速度和排序的结合。