使用 GetConsumingEnumerable() 在 C# BlockingCollection 的某处丢失项目

Losing items somewhere in C# BlockingCollection with GetConsumingEnumerable()

我正在尝试通过 WAN 对多个目标执行并行 SqlBulkCopy,其中许多目标可能连接速度较慢 and/or 连接中断;它们的连接速度从 2 到 50 兆位下载不等,我正在通过 1000 兆位上传的连接发送;许多目标需要多次重试才能正确完成。

我目前在 BlockingCollection (queue) 的 GetConsumingEnumerable() 上使用 Parallel.ForEach;但是我要么偶然发现了一些错误,要么我在完全理解它的目的时遇到了问题,或者只是出了点问题.. 代码从不调用 blockingcollection 的 CompleteAdding() 方法, 似乎在 parallel-foreach-loop 的某个地方丢失了一些目标。 即使对此有不同的方法,并且忽略它在循环中所做的工作类型,blockingcollection 不应该像它在这个例子中那样表现,不是吗?

在 foreach 循环中,我完成工作,并在成功完成的情况下将目标添加到 results-collection,或者在出现错误时将目标重新添加到 BlockingCollection 直到目标达到最大重试阈值;那时我将它添加到 results-collection.

在另一个任务中,我循环直到 results-collection 的计数等于目标的初始计数;然后我在阻塞集合上执行 CompleteAdding()

我已经尝试使用锁定对象来对 results 集合(使用 List<int> 和队列进行操作,但没有成功,但无论如何都没有必要.我还尝试将重试添加到单独的集合中,然后将它们重新添加到不同任务中的 BlockingCollection 而不是 parallel.foreach。 只是为了好玩,我还尝试使用 .NET 从 4.5 到 4.8 以及不同的 C# 语言版本进行编译。

这是一个简化的例子:

List<int> targets = new List<int>();
for (int i = 0; i < 200; i++)
{
    targets.Add(0);
}

BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
ConcurrentBag<int> results = new ConcurrentBag<int>();
targets.ForEach(f => queue.Add(f));

// Bulkcopy in die Filialen:
Task.Run(() =>
    {
        while (results.Count < targets.Count)
        {
            Thread.Sleep(2000);
            Console.WriteLine($"Completed: {results.Count} / {targets.Count} | queue: {queue.Count}");
        }
        queue.CompleteAdding();
    });

int MAX_RETRIES = 10;
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 50 };

Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
    {
        try
        {
            // simulate a problem with the bulkcopy:
            throw new Exception();
            results.Add(target);
        }
        catch (Exception)
        {
            if (target < MAX_RETRIES)
            {
                target++;
                if (!queue.TryAdd(target))
                    Console.WriteLine($"{target.ToString("D3")}: Error, can't add to queue!");
            }
            else
            {
                results.Add(target);
                Console.WriteLine($"Aborted after {target + 1} tries | {results.Count} / {targets.Count} items finished.");
            }

        }
    });

我希望 results-collection 的计数最终与 targets-list 的精确计数相同,但它似乎永远不会达到那个数字,这导致了 BlockingCollection永远不会被标记为已完成,因此代码永远不会完成。

我真的不明白为什么最终没有将所有目标都添加到 results-集合中!添加的计数总是变化的,并且大多只是低于预期的最终计数。

编辑:我删除了重试部分,并用一个简单的 int 计数器替换了 ConcurrentBag,但大部分时间它仍然不起作用:

List<int> targets = new List<int>();
for (int i = 0; i < 500; i++)
    targets.Add(0);

BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
//ConcurrentBag<int> results = new ConcurrentBag<int>();
int completed = 0;
targets.ForEach(f => queue.Add(f));

var thread = new Thread(() =>
{
    while (completed < targets.Count)
    {
        Thread.Sleep(2000);
        Console.WriteLine($"Completed: {completed} / {targets.Count} | queue: {queue.Count}");
    }
    queue.CompleteAdding();
});
thread.Start();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
    Interlocked.Increment(ref completed);
});

抱歉,找到了答案:blockingcollection 和并行 foreach 使用的默认分区器是分块和缓冲,这导致 foreach 循环永远等待下一个块的足够项目。对我来说,它在那里等待整整一个晚上,没有处理最后几条!

所以,而不是:

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
    Interlocked.Increment(ref completed);
});

你必须使用:

var partitioner = Partitioner.Create(queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(partitioner, options, target =>
{
    Interlocked.Increment(ref completed);
});

Parallel.ForEach 用于数据并行性(即使用所有 8 个内核处理 100K 行),而不是并发操作。如果不是管道问题,这本质上是 pub/sub 和 async 问题。 CPU 在这种情况下无需执行任何操作,只需启动异步操作并等待它们完成即可。

.NET 从 .NET 4.5 开始通过数据流 classes 处理这个问题,最近,较低级别的 System.Threading.Channel 命名空间。

在最简单的形式中,您可以创建一个 ActionBlock<>,它采用缓冲区和目标连接并发布数据。假设您使用此方法将数据发送到服务器:

async Task MyBulkCopyMethod(string connectionString,DataTable data)
{
    using(var bcp=new SqlBulkCopy(connectionString))
    {
        //Set up mappings etc.
        //....
        await bcp.WriteToServerAsync(data);   
    }
}

您可以将其与具有已配置并行度的 ActionBlock class 一起使用。像 ActionBlock 这样的数据流 class 有自己的输入,并在适当的情况下有输出缓冲区,因此无需创建单独的队列:

class DataMessage
{
    public string Connection{get;set;}
    public DataTable Data {get;set;} 
}

...

var options=new ExecutionDataflowBlockOptions { 
                    MaxDegreeOfParallelism = 50,
                    BoundedCapacity = 8
            };
var block=new ActionBlock<DataMessage>(msg=>MyBulkCopyMethod(msg.Connection,msg.Data, options);

我们现在可以开始 post 发送消息到区块。通过将容量设置为 8,我们确保输入缓冲区不会在块太慢时被大消息填满。 MaxDegreeOfParallelism 控制并发操作 运行 的可能性。假设我们要将相同的数据发送到许多服务器:

var data=.....;
var servers=new[]{connString1, connString2,....};
var messages= from sv in servers
              select new DataMessage{ ConnectionString=sv,Table=data};

foreach(var msg in messages)
{
    await block.SendAsync(msg);
}
//Tell the block we are done
block.Complete();
//Await for all messages to finish processing
await block.Completion;

重试

重试的一种可能性是在工作函数中使用重试循环。一个更好的主意是使用 different 块和 post 失败的消息。

var block=new ActionBlock<DataMessage>(async msg=> {
    try {
        await MyBulkCopyMethod(msg.Connection,msg.Data, options);
    }
    catch(SqlException exc) when (some retry condition)
    {
        //Post without awaiting
        retryBlock.Post(msg);
    });

当原始块完成时,我们想告诉重试块也完成,无论如何:

block.Completion.ContinueWith(_=>retryBlock.Complete());

现在我们可以等待 retryBlock 完成。

该块可能有更小的 DOP,并且尝试之间可能有延迟:

var retryOptions=new ExecutionDataflowBlockOptions { 
                MaxDegreeOfParallelism = 5
        };
var retryBlock=new ActionBlock<DataMessage>(async msg=>{
    await Task.Delay(1000);
    try {
        await MyBulkCopyMethod(msg.Connection,msg.Data, options);
    }
    catch (Exception ....)
    {
        ...
    }
});

可以重复此模式以创建多个级别的重试或不同的条件。它还可以用于通过为高优先级工作人员提供更大的 DOP,或为低优先级工作人员提供更大的延迟来创建不同优先级的工作人员