如何正确使用 ConcurrentQueue 中的块
How to consume chuncks from ConcurrentQueue correctly
我需要实现一个可以从多个线程填充的请求队列。当此队列大于 1000 个已完成请求时,应将此请求存储到数据库中。这是我的实现:
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest );
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue( new RequestExecuter().ExecuteVerificationRequest( request ) );
if (_storageQueue.Count > 1000 && !isLoading)
{
lock ( _lock )
{
if ( _storageQueue.Count > 1000 && !isLoading )
{
isLoading = true;
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if( _storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}
有什么方法可以在没有锁和 isLoading 的情况下实现这个吗?
完成您要求的最简单方法是使用 TPL Dataflow 库中的块。例如
var batchBlock = new BatchBlock<VerificationRequest>(1000);
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
new VerificationRequestRepository().InsertRange(records);
};
batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true });
就是这样。
您可以使用
向起始块发送消息
batchBlock.Post(new VerificationRequest(...));
完成工作后,您可以关闭整个管道并通过调用 batchBlock.Complete();
清除所有剩余消息并等待最后一个块完成:
batchBlock.Complete();
await exportBlock.Completion;
BatchBlock batches up to 1000 records into arrays of 1000 items and passes them to the next block. An ActionBlock默认只使用1个任务,所以是线程安全的。您可以使用存储库的现有实例而不必担心跨线程访问:
var repository=new VerificationRequestRepository();
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
repository.InsertRange(records);
};
几乎所有的块都有一个并发输入缓冲区。每个块在其自己的 TPL 任务上运行,因此每个步骤彼此同时运行。这意味着您将获得异步执行 "for free",如果您有多个链接步骤,这可能很重要,例如,您使用 TransformBlock 来修改流经管道的消息。
我使用此类管道创建调用外部服务、解析响应、生成最终记录、批处理它们并使用使用 SqlBulkCopy 的块将它们发送到数据库的管道。
我需要实现一个可以从多个线程填充的请求队列。当此队列大于 1000 个已完成请求时,应将此请求存储到数据库中。这是我的实现:
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest );
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue( new RequestExecuter().ExecuteVerificationRequest( request ) );
if (_storageQueue.Count > 1000 && !isLoading)
{
lock ( _lock )
{
if ( _storageQueue.Count > 1000 && !isLoading )
{
isLoading = true;
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if( _storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}
有什么方法可以在没有锁和 isLoading 的情况下实现这个吗?
完成您要求的最简单方法是使用 TPL Dataflow 库中的块。例如
var batchBlock = new BatchBlock<VerificationRequest>(1000);
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
new VerificationRequestRepository().InsertRange(records);
};
batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true });
就是这样。
您可以使用
向起始块发送消息batchBlock.Post(new VerificationRequest(...));
完成工作后,您可以关闭整个管道并通过调用 batchBlock.Complete();
清除所有剩余消息并等待最后一个块完成:
batchBlock.Complete();
await exportBlock.Completion;
BatchBlock batches up to 1000 records into arrays of 1000 items and passes them to the next block. An ActionBlock默认只使用1个任务,所以是线程安全的。您可以使用存储库的现有实例而不必担心跨线程访问:
var repository=new VerificationRequestRepository();
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
repository.InsertRange(records);
};
几乎所有的块都有一个并发输入缓冲区。每个块在其自己的 TPL 任务上运行,因此每个步骤彼此同时运行。这意味着您将获得异步执行 "for free",如果您有多个链接步骤,这可能很重要,例如,您使用 TransformBlock 来修改流经管道的消息。
我使用此类管道创建调用外部服务、解析响应、生成最终记录、批处理它们并使用使用 SqlBulkCopy 的块将它们发送到数据库的管道。