C# Concurent 字典 - 锁定值

C# Concurent dictionary - lock on value

我正在开发一项服务,该服务负责记录发送到我们服务的请求。该服务正在离线工作(正在被解雇并忘记)。 我们根据一些输入参数(产品 ID)将请求保存到不同的数据库。我们不想每次有人发出请求时都保存到数据库中 - 我们宁愿构建一些 "batch" 插入并执行 InsertManyN 时间量(让我们说 10 秒)。我已经开始实施它,现在我正在为两件事而苦苦挣扎:

  1. 我需要使用 ConcurrentDictionary 吗?看来我会用普通的 Dictionary
  2. 达到同样的效果
  3. 如果上面的答案是 "no, in your case there are no benefits from ConcurrentDictionary" - 有没有办法 re-write 我的代码 "properly" 使用 ConcurrentDictionary 这样我就可以避免使用锁并确保 AddOrUpdate 没有 "collisions" 清除批次 ?

让我粘贴片段并进一步解释:

    // dictionary where key is ProductId and value is a list of items to insert to that product database
    ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
    public SaverService(StatelessServiceContext context)
        : base(context)
    {
        _productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
    }

    // this function will be fired and forgotten by the external service
    public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
    {
        await Task.Run(() => {
            foreach (var token in requestData.ProductAccessTokens)
            {
                // this function will extract the specific product request ( one request can contain multiple products )
                var details = SplitQuoteByProduct(requestData, responseData, token);
                _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
                {
                    list.Add(details);
                    return list;
                });
            }
        });
    }

    // this function will be executed by a timer every N amount of time
    public void SaveRequestsToDatabase()
    {
        lock (_productDetails)
        {
            foreach (var item in _productDetails)
            {
                // copy curent items and start a task which will process them
                SaveProductRequests(item.Key, item.Value.ToList());
                // clear curent items
                item.Value.Clear();
            }
        }
    }

    public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
    {
        // save received items to database
        /// ...
    }

我主要担心的是没有锁会发生以下情况:

  1. SaveRequestsToDatabase 已被解雇 - 并开始处理数据
  2. 就在 SaveRequestsToDatabase 函数中调用 item.Value.Clear(); 之前,外部服务触发另一个 SaveRecentRequest 函数,该函数使用相同的键执行 AddOrUpdate - 这将向collection
  3. SaveRequestsToDatabase 正在完成并因此清除 collection - 但最初由 2 添加的 object 不在 collection 中,因此未被处理
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;

不会是线程安全的,因为 List 不是线程安全的。当一个线程正在向列表中添加条目时,另一个线程可能正在对其进行迭代。这最终会失败。

我建议使用:

ConcurrentDictionary<string, ConcurrentQueue<QuoteDetails>> _productDetails;

或:

ConcurrentDictionary<string, BlockingCollection<QuoteDetails>> _productDetails;

您也可以完全删除 ConcurrentDictionary

通常,并发问题来自于没有首先选择正确的数据结构。

对于您的情况,您有两个工作流程:

  • n 个生产者,并发且连续地排队事件
  • 1 个消费者,在给定时间出队并处理事件

您的问题是您试图立即对事件进行分类,即使这不是必需的。将事件作为并发部分的简单流保留,并仅在消费者部分对它们进行排序,因为那里没有并发。

ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;

public SaverService(StatelessServiceContext context)
    : base(context)
{
    _productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}

// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
    await Task.Run(() => {
        foreach (var token in requestData.ProductAccessTokens)
        {
            // this function will extract the specific product request ( one request can contain multiple products )
            var details = SplitQuoteByProduct(requestData, responseData, token);
            _productDetails.Enqueue((token, details));
        }
    });
}

// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
    var products = new List<(string token, QuoteDetails details)>();

    while (_productDetails.TryDequeue(out var item))
    {
        products.Add(item);
    }

    foreach (var group in products.GroupBy(i => i.token, i => i.Details))
    {
        SaveProductRequests(group.Key, group);
    }
}

public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
    // save received items to database
    /// ...
}

每当您 add/remove/read 进出词典时都需要锁定词典。您当前的代码将允许 SaveRecentRequest 将项目添加到字典中,即使您正忙于处理字典中的项目。我建议采用以下方法

// dictionary where key is ProductId and value is a list of items to insert to that product database
Dictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
    : base(context)
{
    _productDetails = new Dictionary<string, List<QuoteDetails>>();
}

// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
    await Task.Run(() => {
        foreach (var token in requestData.ProductAccessTokens)
        {
            // this function will extract the specific product request ( one request can contain multiple products )
            var details = SplitQuoteByProduct(requestData, responseData, token);
            lock(_padlock)
            {
                _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
                {
                    list.Add(details);
                    return list;
                });
            }
        }
    });
}

// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
    Dictionary<string, List<QuoteDetails>> offboardingDictionary;
    lock (_padlock)
    {
        offboardingDictionary = _productDetails;
        _productDetails = new  Dictionary<string, List<QuoteDetails>>();
    }

    foreach (var item in offboardingDictionary)
    {

        // copy curent items and start a task which will process them
        SaveProductRequests(item.Key, item.Value.ToList());
        // clear curent items
        item.Value.Clear();
    }
}

public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
    // save received items to database
    /// ...
}

private readonly object _padlock = new object();

有了这个,你就可以在向字典中添加项目时锁定。为了提高保存性能,我们向字典添加了一个新的引用,然后用一个新的实例替换原来的引用。通过这种方式,我们最大限度地减少了锁中的时间,因此新条目可以保存在新字典中,同时我们的保存线程将条目从以前的字典卸载到数据库中。

我认为您不需要并发字典来完成这项任务,只要您锁定访问权限,普通字典就可以了