BlockingCollection 在几次迭代后停止获取

BlockingCollection stops taking after few iterations

简而言之,我的 producer/consumer 模式如下所示。

public class Consumer<T>
{
    Task consumer;
    BlockingCollection<T> buffer;

    public Consumer()
    {
        buffer = new();
        consumer = Task.Factory.StartNew(
            x => ConsumerAction(),
            TaskCreationOptions.LongRunning);
    }

    public void ConsumerAction()
    {
        while(true)
        {
            // log 1
            var obj = buffer.Take();
            // log 2
            WriteToDisk(obj);
            // log 3
        }
    }

    public void Enqueue(T obj)
    {
        buffer.Add(obj);
    }
}

Consumer 类型按预期工作了一段时间,然后在 看似 随机点,它停止 Takeing,并且 buffer 继续增长。在同一个输入集合上,有时它在整个输入过程中都没有问题,有时会在输入开始时中断,有时会在输入结束时中断。

我的具体问题是:


根据评论中的要求,我将提供一个最小的可重现示例。然而,这有点挑战,因为我不确定程序的哪些部分是相关的,所以可能需要一些时间来缩小范围。同时,如果您对上述问题提出任何建议,我将不胜感激。

Is there a possibility that the garbage collector is collecting the thread?

不是,GC收集的是内存中的对象,不是线程。

Can while(true) be the source of errors?!

没有。但我建议使用 GetConsumingEnumerable 将其替换为 foreach 循环。这将允许您通过在缓冲区上调用 CompleteAdding 来干净、轻松地退出循环。

Any thoughts on how best I can debug this?

我肯定会添加一个 try/catch 以确保 WriteToDisk 不会以某种方式失败。您还应该在完成后检查任务,以确保没有发生其他故障。您可以考虑的另一件事是对缓冲区的大小添加限制。这应该会限制内存使用,有助于防止线程饥饿,并且如果项目没有从缓冲区中删除,应该会挂起您的程序。最后一点有助于调试,因为您可以在该点简单地中断进程并检查每个线程在做什么。

猜测可能存在某种死锁或其他问题,导致 ConsumerAction 线程在 WriteToDisk 内部阻塞。

Processing the entire input collection takes ~12h on a successful run, and over 1 billion items are added to the buffer (but if Take works, buffer contains only a handful number of items at a given time). Does this scale seem to be a corner-case for this pattern / not its intended use?

对我来说,这似乎是阻塞集合的完美用法。