执行 Parallel.ForEach 基于一个 List 需要在执行时间内接收更多的项目

Execute Parallel.ForEach based in a List that needs to receive more items in execution time

我在这里遇到了一个挑战,这让我很头疼找不到解决方案。

我有一个List的东西,我根据它执行Parallel.ForEach:

List<Customer> customers = GetNotProcessedCostumer(); 

Parallel.ForEach(customers, new ParallelOptions {MaxDegreeOfParallelism = 2},
cust=> 
{
     ExecuteSomething(cust);
});

这里的问题是我需要再次调用 GetNotProcessedCostumer 来检查数据库中是否有新的未处理项目可用,而这个并行仍然是 运行。 再次调用该方法是可以的,但是,如何在并行已经使用的 List 中插入新项目?

换句话说,List<Customer>是活的,我需要一直在上面插入item,尽量使用现有Parallel中可用的线程。看看:

List<Customer> customers = GetNotProcessCustomer // get not processed customers from database

Parallel.ForEach(customers) // ...... Start the parallel ... 

customer.Add(GetNotProcessCustomer()) // Read database again..

“嘿 Parallel,你有可用的线程吗?”如果是,请使用它。

我可以接受其他的方法和想法,比如ThreadsThreadPool......

有人可以帮我吗?

可能有比 Parallel class 更好的方法来完成这项工作,混合使用 ActionBlock<Customer> from the TPL Dataflow library being the most promising candidate. But if you want to do your job using the knowledge you already have, you could feed the parallel loop with a deferred IEnumerable<Customer> sequence instead of a materialized List<Customer>. This sequence will be querying the database and yielding the not-processed customers in a never ending loop. It might be a good idea to add a Task.Delay,以确保不会比 ActionBlock<Customer> from the TPL Dataflow library being the most promising candidate. But if you want to do your job using the knowledge you already have, you could feed the parallel loop with a deferred IEnumerable<Customer> sequence instead of a materialized List<Customer>. This sequence will be querying the database and yielding the not-processed customers in a never ending loop. It might be a good idea to add a Task.Delay 更频繁地查询数据库每 X 秒。

IEnumerable<Customer> GetNotProcessedCustomersNonStop(
    CancellationToken cancellationToken = default)
{
    while (true)
    {
        var delayTask = Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
        foreach (var customer in GetNotProcessedCustomers())
            yield return customer;
        delayTask.GetAwaiter().GetResult();
    }
}

在混音中添加一个 CancellationToken 可能也是一个好主意,因为最终你想要停止循环,不是吗?

如果你对延迟可枚举序列和yield语句不熟悉,你可以看看这个文档:Iterators

最后一个重要的细节是告诉 Parallel class 你不希望它做一些花哨的事情,比如贪婪地枚举可枚举的对象并缓存它的项目。您希望它仅在准备好处理时才吸引下一位客户。您可以通过在混音中加入 Partitioner.Create 来实现。将所有内容放在一起:

var cts = new CancellationTokenSource();

var source = Partitioner.Create(GetNotProcessedCustomersNonStop(cts.Token),
    EnumerablePartitionerOptions.NoBuffering);

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = 2,
    CancellationToken = cts.Token,
};

Parallel.ForEach(source, parallelOptions, customer =>
{
    ProcessCustomer(customer);
});

//cts.Cancel(); // eventually...