C# Concurent 字典 - 锁定值
C# Concurent dictionary - lock on value
我正在开发一项服务,该服务负责记录发送到我们服务的请求。该服务正在离线工作(正在被解雇并忘记)。
我们根据一些输入参数(产品 ID)将请求保存到不同的数据库。我们不想每次有人发出请求时都保存到数据库中 - 我们宁愿构建一些 "batch" 插入并执行 InsertMany
每 N
时间量(让我们说 10 秒)。我已经开始实施它,现在我正在为两件事而苦苦挣扎:
- 我需要使用
ConcurrentDictionary
吗?看来我会用普通的 Dictionary 达到同样的效果
- 如果上面的答案是 "no, in your case there are no benefits from
ConcurrentDictionary
" - 有没有办法 re-write 我的代码 "properly" 使用 ConcurrentDictionary
这样我就可以避免使用锁并确保 AddOrUpdate
没有 "collisions" 清除批次 ?
让我粘贴片段并进一步解释:
// dictionary where key is ProductId and value is a list of items to insert to that product database
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
lock (_productDetails)
{
foreach (var item in _productDetails)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
我主要担心的是没有锁会发生以下情况:
SaveRequestsToDatabase
已被解雇 - 并开始处理数据
- 就在
SaveRequestsToDatabase
函数中调用 item.Value.Clear();
之前,外部服务触发另一个 SaveRecentRequest
函数,该函数使用相同的键执行 AddOrUpdate
- 这将向collection
SaveRequestsToDatabase
正在完成并因此清除 collection - 但最初由 2 添加的 object 不在 collection 中,因此未被处理
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
不会是线程安全的,因为 List 不是线程安全的。当一个线程正在向列表中添加条目时,另一个线程可能正在对其进行迭代。这最终会失败。
我建议使用:
ConcurrentDictionary<string, ConcurrentQueue<QuoteDetails>> _productDetails;
或:
ConcurrentDictionary<string, BlockingCollection<QuoteDetails>> _productDetails;
您也可以完全删除 ConcurrentDictionary
。
通常,并发问题来自于没有首先选择正确的数据结构。
对于您的情况,您有两个工作流程:
- n 个生产者,并发且连续地排队事件
- 1 个消费者,在给定时间出队并处理事件
您的问题是您试图立即对事件进行分类,即使这不是必需的。将事件作为并发部分的简单流保留,并仅在消费者部分对它们进行排序,因为那里没有并发。
ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.Enqueue((token, details));
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
var products = new List<(string token, QuoteDetails details)>();
while (_productDetails.TryDequeue(out var item))
{
products.Add(item);
}
foreach (var group in products.GroupBy(i => i.token, i => i.Details))
{
SaveProductRequests(group.Key, group);
}
}
public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
每当您 add/remove/read 进出词典时都需要锁定词典。您当前的代码将允许 SaveRecentRequest 将项目添加到字典中,即使您正忙于处理字典中的项目。我建议采用以下方法
// dictionary where key is ProductId and value is a list of items to insert to that product database
Dictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new Dictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
lock(_padlock)
{
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
Dictionary<string, List<QuoteDetails>> offboardingDictionary;
lock (_padlock)
{
offboardingDictionary = _productDetails;
_productDetails = new Dictionary<string, List<QuoteDetails>>();
}
foreach (var item in offboardingDictionary)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
private readonly object _padlock = new object();
有了这个,你就可以在向字典中添加项目时锁定。为了提高保存性能,我们向字典添加了一个新的引用,然后用一个新的实例替换原来的引用。通过这种方式,我们最大限度地减少了锁中的时间,因此新条目可以保存在新字典中,同时我们的保存线程将条目从以前的字典卸载到数据库中。
我认为您不需要并发字典来完成这项任务,只要您锁定访问权限,普通字典就可以了
我正在开发一项服务,该服务负责记录发送到我们服务的请求。该服务正在离线工作(正在被解雇并忘记)。
我们根据一些输入参数(产品 ID)将请求保存到不同的数据库。我们不想每次有人发出请求时都保存到数据库中 - 我们宁愿构建一些 "batch" 插入并执行 InsertMany
每 N
时间量(让我们说 10 秒)。我已经开始实施它,现在我正在为两件事而苦苦挣扎:
- 我需要使用
ConcurrentDictionary
吗?看来我会用普通的 Dictionary 达到同样的效果
- 如果上面的答案是 "no, in your case there are no benefits from
ConcurrentDictionary
" - 有没有办法 re-write 我的代码 "properly" 使用ConcurrentDictionary
这样我就可以避免使用锁并确保AddOrUpdate
没有 "collisions" 清除批次 ?
让我粘贴片段并进一步解释:
// dictionary where key is ProductId and value is a list of items to insert to that product database
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
lock (_productDetails)
{
foreach (var item in _productDetails)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
我主要担心的是没有锁会发生以下情况:
SaveRequestsToDatabase
已被解雇 - 并开始处理数据- 就在
SaveRequestsToDatabase
函数中调用item.Value.Clear();
之前,外部服务触发另一个SaveRecentRequest
函数,该函数使用相同的键执行AddOrUpdate
- 这将向collection SaveRequestsToDatabase
正在完成并因此清除 collection - 但最初由 2 添加的 object 不在 collection 中,因此未被处理
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
不会是线程安全的,因为 List 不是线程安全的。当一个线程正在向列表中添加条目时,另一个线程可能正在对其进行迭代。这最终会失败。
我建议使用:
ConcurrentDictionary<string, ConcurrentQueue<QuoteDetails>> _productDetails;
或:
ConcurrentDictionary<string, BlockingCollection<QuoteDetails>> _productDetails;
您也可以完全删除 ConcurrentDictionary
。
通常,并发问题来自于没有首先选择正确的数据结构。
对于您的情况,您有两个工作流程:
- n 个生产者,并发且连续地排队事件
- 1 个消费者,在给定时间出队并处理事件
您的问题是您试图立即对事件进行分类,即使这不是必需的。将事件作为并发部分的简单流保留,并仅在消费者部分对它们进行排序,因为那里没有并发。
ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.Enqueue((token, details));
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
var products = new List<(string token, QuoteDetails details)>();
while (_productDetails.TryDequeue(out var item))
{
products.Add(item);
}
foreach (var group in products.GroupBy(i => i.token, i => i.Details))
{
SaveProductRequests(group.Key, group);
}
}
public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
每当您 add/remove/read 进出词典时都需要锁定词典。您当前的代码将允许 SaveRecentRequest 将项目添加到字典中,即使您正忙于处理字典中的项目。我建议采用以下方法
// dictionary where key is ProductId and value is a list of items to insert to that product database
Dictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new Dictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
lock(_padlock)
{
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
Dictionary<string, List<QuoteDetails>> offboardingDictionary;
lock (_padlock)
{
offboardingDictionary = _productDetails;
_productDetails = new Dictionary<string, List<QuoteDetails>>();
}
foreach (var item in offboardingDictionary)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
private readonly object _padlock = new object();
有了这个,你就可以在向字典中添加项目时锁定。为了提高保存性能,我们向字典添加了一个新的引用,然后用一个新的实例替换原来的引用。通过这种方式,我们最大限度地减少了锁中的时间,因此新条目可以保存在新字典中,同时我们的保存线程将条目从以前的字典卸载到数据库中。
我认为您不需要并发字典来完成这项任务,只要您锁定访问权限,普通字典就可以了