BlockingCollection 在几次迭代后停止获取
BlockingCollection stops taking after few iterations
简而言之,我的 producer/consumer 模式如下所示。
public class Consumer<T>
{
Task consumer;
BlockingCollection<T> buffer;
public Consumer()
{
buffer = new();
consumer = Task.Factory.StartNew(
x => ConsumerAction(),
TaskCreationOptions.LongRunning);
}
public void ConsumerAction()
{
while(true)
{
// log 1
var obj = buffer.Take();
// log 2
WriteToDisk(obj);
// log 3
}
}
public void Enqueue(T obj)
{
buffer.Add(obj);
}
}
Consumer
类型按预期工作了一段时间,然后在 看似 随机点,它停止 Take
ing,并且 buffer
继续增长。在同一个输入集合上,有时它在整个输入过程中都没有问题,有时会在输入开始时中断,有时会在输入结束时中断。
- 我尝试在执行路径中涉及的任何方法中捕获任何可能的异常,但没有引发异常;
- 我检查了我的应用程序中的日志,相应地,最后一个
obj
上的所有业务逻辑都已成功执行,因此调用返回到 var obj = buffer.Take();
并正在等待新项目将添加到 buffer
;
- 我试过将
while(true)
包含在 try-catch
块中,没有发现异常;
- 代码示例中的注释日志,在日志中按以下顺序出现:
1
、2
、3
、... 3
、1
.
我的具体问题是:
- 是否有可能是垃圾收集器正在收集线程?
while(true)
会不会是错误的来源?!
- 关于如何最好地调试它有什么想法吗?
- 成功处理整个输入集合需要约 12 小时 运行,超过 10 亿个项目被添加到
buffer
(但如果 Take
有效,buffer
在给定时间仅包含少数项目)。这个比例似乎是这个模式的一个极端情况/不是它的预期用途吗?
根据评论中的要求,我将提供一个最小的可重现示例。然而,这有点挑战,因为我不确定程序的哪些部分是相关的,所以可能需要一些时间来缩小范围。同时,如果您对上述问题提出任何建议,我将不胜感激。
Is there a possibility that the garbage collector is collecting the thread?
不是,GC收集的是内存中的对象,不是线程。
Can while(true) be the source of errors?!
没有。但我建议使用 GetConsumingEnumerable 将其替换为 foreach 循环。这将允许您通过在缓冲区上调用 CompleteAdding 来干净、轻松地退出循环。
Any thoughts on how best I can debug this?
我肯定会添加一个 try/catch 以确保 WriteToDisk
不会以某种方式失败。您还应该在完成后检查任务,以确保没有发生其他故障。您可以考虑的另一件事是对缓冲区的大小添加限制。这应该会限制内存使用,有助于防止线程饥饿,并且如果项目没有从缓冲区中删除,应该会挂起您的程序。最后一点有助于调试,因为您可以在该点简单地中断进程并检查每个线程在做什么。
猜测可能存在某种死锁或其他问题,导致 ConsumerAction 线程在 WriteToDisk
内部阻塞。
Processing the entire input collection takes ~12h on a successful run, and over 1 billion items are added to the buffer (but if Take works, buffer contains only a handful number of items at a given time). Does this scale seem to be a corner-case for this pattern / not its intended use?
对我来说,这似乎是阻塞集合的完美用法。
简而言之,我的 producer/consumer 模式如下所示。
public class Consumer<T>
{
Task consumer;
BlockingCollection<T> buffer;
public Consumer()
{
buffer = new();
consumer = Task.Factory.StartNew(
x => ConsumerAction(),
TaskCreationOptions.LongRunning);
}
public void ConsumerAction()
{
while(true)
{
// log 1
var obj = buffer.Take();
// log 2
WriteToDisk(obj);
// log 3
}
}
public void Enqueue(T obj)
{
buffer.Add(obj);
}
}
Consumer
类型按预期工作了一段时间,然后在 看似 随机点,它停止 Take
ing,并且 buffer
继续增长。在同一个输入集合上,有时它在整个输入过程中都没有问题,有时会在输入开始时中断,有时会在输入结束时中断。
- 我尝试在执行路径中涉及的任何方法中捕获任何可能的异常,但没有引发异常;
- 我检查了我的应用程序中的日志,相应地,最后一个
obj
上的所有业务逻辑都已成功执行,因此调用返回到var obj = buffer.Take();
并正在等待新项目将添加到buffer
; - 我试过将
while(true)
包含在try-catch
块中,没有发现异常; - 代码示例中的注释日志,在日志中按以下顺序出现:
1
、2
、3
、...3
、1
.
我的具体问题是:
- 是否有可能是垃圾收集器正在收集线程?
while(true)
会不会是错误的来源?!- 关于如何最好地调试它有什么想法吗?
- 成功处理整个输入集合需要约 12 小时 运行,超过 10 亿个项目被添加到
buffer
(但如果Take
有效,buffer
在给定时间仅包含少数项目)。这个比例似乎是这个模式的一个极端情况/不是它的预期用途吗?
根据评论中的要求,我将提供一个最小的可重现示例。然而,这有点挑战,因为我不确定程序的哪些部分是相关的,所以可能需要一些时间来缩小范围。同时,如果您对上述问题提出任何建议,我将不胜感激。
Is there a possibility that the garbage collector is collecting the thread?
不是,GC收集的是内存中的对象,不是线程。
Can while(true) be the source of errors?!
没有。但我建议使用 GetConsumingEnumerable 将其替换为 foreach 循环。这将允许您通过在缓冲区上调用 CompleteAdding 来干净、轻松地退出循环。
Any thoughts on how best I can debug this?
我肯定会添加一个 try/catch 以确保 WriteToDisk
不会以某种方式失败。您还应该在完成后检查任务,以确保没有发生其他故障。您可以考虑的另一件事是对缓冲区的大小添加限制。这应该会限制内存使用,有助于防止线程饥饿,并且如果项目没有从缓冲区中删除,应该会挂起您的程序。最后一点有助于调试,因为您可以在该点简单地中断进程并检查每个线程在做什么。
猜测可能存在某种死锁或其他问题,导致 ConsumerAction 线程在 WriteToDisk
内部阻塞。
Processing the entire input collection takes ~12h on a successful run, and over 1 billion items are added to the buffer (but if Take works, buffer contains only a handful number of items at a given time). Does this scale seem to be a corner-case for this pattern / not its intended use?
对我来说,这似乎是阻塞集合的完美用法。