批量消费 BlockingCollection 时丢失的消息
Messages lost when consuming from BlockingCollection in batches
我试图发明一种方法来消耗来自 BlockingCollection
的批次,但遇到了麻烦。
这是一个最小的重现:
internal class Program
{
private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
private static int _consumed;
static void Main()
{
Task.Run(() => Producer());
Task.Run(() => Consumer());
Console.WriteLine("press [ENTER] to check");
while (true)
{
Console.ReadLine();
Console.WriteLine("consumed: " + _consumed);
}
}
private static void Producer()
{
for (var i = 0; i < 5000; i++)
_bc.Add("msg");
}
private static void Consumer()
{
foreach (var s in _bc.GetConsumingEnumerable())
{
var batchSize = _bc.Count + 1;
var batch = new List<string>(batchSize) { s };
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
batch.Add(additionalResult);
_consumed = _consumed + batch.Count;
}
}
}
很少有消息丢失(但不总是相同的数量)。如果无法重现,请尝试增加生成消息的数量。
我想要实现的是在消费者中使用 GetConsumingEnumerable
方法(一段时间后我会调用 CompleteAdding
)并能够收集一些消息的批处理大小,如果它们已经存在。
丢失消息的原因是什么,如何正确使用?
哇。这是一个错误。这行
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
应该是
while (batch.Count < batchSize && _bc.TryTake(out var additionalResult))
因为第一个条件具有从集合中删除项目的副作用。
[__DynamicallyInvokable]
public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
{
...
while (!this.IsCompleted)
{
T obj;
if (this.TryTakeWithNoTimeValidation(out obj, -1, cancellationToken, linkedTokenSource))
yield return obj;
}
...
}
和
public bool TryTake(out T item)
{
...
return this.TryTakeWithNoTimeValidation(out item, (int) timeout.TotalMilliseconds, CancellationToken.None, (CancellationTokenSource) null);
}
TryTake 和 GetConsumingEnumerable 都使用方法 TryTakeWithNoTimeValidation 。我假设缺少的元素已通过 GetConsumingEnumerable 从集合中删除。考虑以下示例:
private static void Producer()
{
Console.WriteLine($"begin produce isCompleted:{_bc.IsCompleted}");
for (var i = 0; i < 5000; i++)
_bc.Add($"msg:{i}");
_bc.CompleteAdding();
Console.WriteLine($"end produce isCompleted:{_bc.IsCompleted}");
}
var batch = new List<string>();
foreach (var s in _bc.GetConsumingEnumerable())
{
batch.Add(s);
if (_bc.IsCompleted && _bc.Count == 0)
{
break;
}
}
Console.WriteLine($"first:{batch.First()}, last:{batch.Last()}");
Console.WriteLine($"consumed:{batch.Count}");
_bc 为空。
有几种方法可以实现您的算法,其中一种我建议使用 Take 并在生产者之前调用消费者(这会阻止调用线程)。
我试图发明一种方法来消耗来自 BlockingCollection
的批次,但遇到了麻烦。
这是一个最小的重现:
internal class Program
{
private static readonly BlockingCollection<string> _bc = new BlockingCollection<string>(1000);
private static int _consumed;
static void Main()
{
Task.Run(() => Producer());
Task.Run(() => Consumer());
Console.WriteLine("press [ENTER] to check");
while (true)
{
Console.ReadLine();
Console.WriteLine("consumed: " + _consumed);
}
}
private static void Producer()
{
for (var i = 0; i < 5000; i++)
_bc.Add("msg");
}
private static void Consumer()
{
foreach (var s in _bc.GetConsumingEnumerable())
{
var batchSize = _bc.Count + 1;
var batch = new List<string>(batchSize) { s };
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
batch.Add(additionalResult);
_consumed = _consumed + batch.Count;
}
}
}
很少有消息丢失(但不总是相同的数量)。如果无法重现,请尝试增加生成消息的数量。
我想要实现的是在消费者中使用 GetConsumingEnumerable
方法(一段时间后我会调用 CompleteAdding
)并能够收集一些消息的批处理大小,如果它们已经存在。
丢失消息的原因是什么,如何正确使用?
哇。这是一个错误。这行
while (_bc.TryTake(out var additionalResult) && batch.Count < batchSize)
应该是
while (batch.Count < batchSize && _bc.TryTake(out var additionalResult))
因为第一个条件具有从集合中删除项目的副作用。
[__DynamicallyInvokable]
public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
{
...
while (!this.IsCompleted)
{
T obj;
if (this.TryTakeWithNoTimeValidation(out obj, -1, cancellationToken, linkedTokenSource))
yield return obj;
}
...
}
和
public bool TryTake(out T item)
{
...
return this.TryTakeWithNoTimeValidation(out item, (int) timeout.TotalMilliseconds, CancellationToken.None, (CancellationTokenSource) null);
}
TryTake 和 GetConsumingEnumerable 都使用方法 TryTakeWithNoTimeValidation 。我假设缺少的元素已通过 GetConsumingEnumerable 从集合中删除。考虑以下示例:
private static void Producer()
{
Console.WriteLine($"begin produce isCompleted:{_bc.IsCompleted}");
for (var i = 0; i < 5000; i++)
_bc.Add($"msg:{i}");
_bc.CompleteAdding();
Console.WriteLine($"end produce isCompleted:{_bc.IsCompleted}");
}
var batch = new List<string>();
foreach (var s in _bc.GetConsumingEnumerable())
{
batch.Add(s);
if (_bc.IsCompleted && _bc.Count == 0)
{
break;
}
}
Console.WriteLine($"first:{batch.First()}, last:{batch.Last()}");
Console.WriteLine($"consumed:{batch.Count}");
_bc 为空。 有几种方法可以实现您的算法,其中一种我建议使用 Take 并在生产者之前调用消费者(这会阻止调用线程)。