如何使用 BlockingCollection 通知生产者-消费者模式中的失败?
How to notify failure in producer-consumer pattern using BlockingCollection?
我正在尝试创建一个生命周期进程来为数据库批量插入批处理传入消息。
新消息以不规则的间隔一次传入 1 条。
我对此的解决方案类似于使用 BlockingCollection
的生产者-消费者模式。
消息通过各种事件自由添加到 BlockingCollection
中,并以固定间隔(5 秒)从 BlockingCollection
中批量取出以进行数据库插入。
但是,当前的解决方案是即发即弃。如果批量插入因任何原因失败,我需要一种方法让处理器通知失败的原始来源,因为来源包含恢复和重试的逻辑。
是否有我应该使用的特定模式来实现我想要实现的目标?
非常感谢任何建议或帮助!
private BlockingCollection<Message> _messageCollection;
public async Task<bool> InsertMessage(Message message)
{
if (!_messageCollection.TryAdd(message)) return false;
// TODO: check message has been successfully processed, if not return false
// return false;
return true;
}
private void BulkInsertProcess()
{
Task consumerThread = Task.Factory.StartNew(async () =>
{
while (!_messageCollection.IsCompleted)
{
List<Message> messages = new List<Message>();
for (int i = 0; i < 50; i++)
{
if (_messageCollection.Any())
{
messages.Add(_messageCollection.Take());
}
else
{
break;
}
}
bool insertResult = await _database.BulkInsertMessages(messages);
// TODO: check result and inform the consumer if insert failed
await Task.Delay(5000);
}
});
}
您必须以某种方式将每个 Message
与专用的 TaskCompletionSource<bool>
相关联。您可能希望将第二个设为第一个的 属性:
public class Message
{
public TaskCompletionSource<bool> TCS { get; } = new();
}
...或者使第一个成为第二个的 属性:
private class Entry : TaskCompletionSource<bool>
{
public Message Message { get; init; }
}
...或创建包含两者的自定义 class,或使用我在以下示例中选择的 ValueTuple<Message, TaskCompletionSource<bool>>
:
private BlockingCollection<(Message, TaskCompletionSource<bool>)> _queue;
public Task<bool> InsertMessage(Message message)
{
var tcs = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
if (!_queue.TryAdd((message, tcs)))
return Task.FromResult(false);
return tcs.Task;
}
private void BulkInsertProcess()
{
Task consumerTask = Task.Run(async () =>
{
while (!_queue.IsCompleted)
{
var delayTask = Task.Delay(5000);
var batch = new List<(Message, TaskCompletionSource<bool>)>();
while (batch.Count < 50 && _queue.TryTake(out var entry))
batch.Add(entry);
if (batch.Count > 0)
{
var messages = batch.Select(e => e.Item1).ToList();
bool insertResult = await _database.BulkInsertMessages(messages);
foreach (var (message, tcs) in batch)
tcs.SetResult(insertResult);
}
await delayTask;
}
});
}
我对您的代码进行了一些改进,使其运行更流畅:
Task.Run
而不是 Task.Factory.StartNew
。前者了解异步委托。后面的doesn't.
TryTake
instead of Any
. The Any
is an extension method on the IEnumerable<T>
interface, and these are not guaranteed to be thread-safe。很可能是 thread-safe,但使用 BlockingCollection<T>
class 的 public 成员更安全、更高效。
- 在执行大容量插入操作之前创建
Task.Delay
,然后再创建 await
。这样您就可以在后续的批量插入操作之间获得一个稳定的间隔,这不取决于操作本身的持续时间。
如果您在一批中收到 50 条消息,您可能会考虑完全跳过 await delayTask
,因为这表明您的服务处于压力之下,消息正在队列中堆积。
我正在尝试创建一个生命周期进程来为数据库批量插入批处理传入消息。
新消息以不规则的间隔一次传入 1 条。
我对此的解决方案类似于使用 BlockingCollection
的生产者-消费者模式。
消息通过各种事件自由添加到 BlockingCollection
中,并以固定间隔(5 秒)从 BlockingCollection
中批量取出以进行数据库插入。
但是,当前的解决方案是即发即弃。如果批量插入因任何原因失败,我需要一种方法让处理器通知失败的原始来源,因为来源包含恢复和重试的逻辑。
是否有我应该使用的特定模式来实现我想要实现的目标? 非常感谢任何建议或帮助!
private BlockingCollection<Message> _messageCollection;
public async Task<bool> InsertMessage(Message message)
{
if (!_messageCollection.TryAdd(message)) return false;
// TODO: check message has been successfully processed, if not return false
// return false;
return true;
}
private void BulkInsertProcess()
{
Task consumerThread = Task.Factory.StartNew(async () =>
{
while (!_messageCollection.IsCompleted)
{
List<Message> messages = new List<Message>();
for (int i = 0; i < 50; i++)
{
if (_messageCollection.Any())
{
messages.Add(_messageCollection.Take());
}
else
{
break;
}
}
bool insertResult = await _database.BulkInsertMessages(messages);
// TODO: check result and inform the consumer if insert failed
await Task.Delay(5000);
}
});
}
您必须以某种方式将每个 Message
与专用的 TaskCompletionSource<bool>
相关联。您可能希望将第二个设为第一个的 属性:
public class Message
{
public TaskCompletionSource<bool> TCS { get; } = new();
}
...或者使第一个成为第二个的 属性:
private class Entry : TaskCompletionSource<bool>
{
public Message Message { get; init; }
}
...或创建包含两者的自定义 class,或使用我在以下示例中选择的 ValueTuple<Message, TaskCompletionSource<bool>>
:
private BlockingCollection<(Message, TaskCompletionSource<bool>)> _queue;
public Task<bool> InsertMessage(Message message)
{
var tcs = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
if (!_queue.TryAdd((message, tcs)))
return Task.FromResult(false);
return tcs.Task;
}
private void BulkInsertProcess()
{
Task consumerTask = Task.Run(async () =>
{
while (!_queue.IsCompleted)
{
var delayTask = Task.Delay(5000);
var batch = new List<(Message, TaskCompletionSource<bool>)>();
while (batch.Count < 50 && _queue.TryTake(out var entry))
batch.Add(entry);
if (batch.Count > 0)
{
var messages = batch.Select(e => e.Item1).ToList();
bool insertResult = await _database.BulkInsertMessages(messages);
foreach (var (message, tcs) in batch)
tcs.SetResult(insertResult);
}
await delayTask;
}
});
}
我对您的代码进行了一些改进,使其运行更流畅:
Task.Run
而不是Task.Factory.StartNew
。前者了解异步委托。后面的doesn't.TryTake
instead ofAny
. TheAny
is an extension method on theIEnumerable<T>
interface, and these are not guaranteed to be thread-safe。很可能是 thread-safe,但使用BlockingCollection<T>
class 的 public 成员更安全、更高效。- 在执行大容量插入操作之前创建
Task.Delay
,然后再创建await
。这样您就可以在后续的批量插入操作之间获得一个稳定的间隔,这不取决于操作本身的持续时间。
如果您在一批中收到 50 条消息,您可能会考虑完全跳过 await delayTask
,因为这表明您的服务处于压力之下,消息正在队列中堆积。