TPL Dataflow BufferBlock 线程安全吗?
Is TPL Dataflow BufferBlock thread safe?
我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产的输出将由一个消费者消费。
为此我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>
创建了一个 BufferBlock
对象。一个 Consumer
正在收听这个 BufferBlock
,并处理任何接收到的输入。
同时有两个“Producerssend data to the
BufferBlock”
简化版:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
我想先启动消费者,然后将生产者作为单独的任务启动:
var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());
// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock
// await for the Consumer to finish:
await taskConsumer;
乍一看,这正是生产者-消费者的意思:多个生产者生产数据,而一个消费者消费生产的数据。
然而,BufferBlock about thread safety 说:
Any instance members are not guaranteed to be thread safe.
我还以为TPL里的P是Parallel的意思!
我应该担心吗?我的代码不是线程安全的吗?
我应该使用其他 TPL 数据流 class 吗?
我认为 ActionBlock<T>
更适合您的工作,因为它有一个内置缓冲区,许多生产者可以通过该缓冲区发送数据。默认块选项在单个后台任务上处理数据,但您可以为并行度和有限容量设置新值。使用 ActionBlock<T>
时,确保线程安全的主要关注领域将在您传递的处理每条消息的委托中。该函数的操作必须独立于每个消息,即不像任何 Parrallel...
函数那样修改共享状态。
public class ProducerConsumer
{
private ActionBlock<int> Consumer { get; }
public ProducerConsumer()
{
Consumer = new ActionBlock<int>(x => Process(x));
}
public async Task Start()
{
var producer1Tasks = Producer1();
var producer2Tasks = Producer2();
await Task.WhenAll(producer1Tasks.Concat(producer2Tasks));
Consumer.Complete();
await Consumer.Completion;
}
private void Process(int data)
{
// process
}
private IEnumerable<Task> Producer1() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
private IEnumerable<Task> Producer2() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
}
是的,BufferBlock
class 是线程安全的。我不能通过指向官方文档来支持此声明,因为“线程安全”部分已从文档中删除。但是我可以在源代码中看到 class 包含一个用于同步传入消息的锁定对象:
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
当 Post
extension method is called (source code), the explicitly implemented ITargetBlock.OfferMessage
method is invoked (source code)。以下是此方法的摘录:
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
//...
lock (IncomingLock)
{
//...
_source.AddMessage(messageValue);
//...
}
}
如果 class 或 TPL Dataflow 库中包含的任何其他 XxxBlock
class 不是线程安全的,那确实会很奇怪。这将严重妨碍这个伟大图书馆的易用性。
我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产的输出将由一个消费者消费。
为此我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>
创建了一个 BufferBlock
对象。一个 Consumer
正在收听这个 BufferBlock
,并处理任何接收到的输入。
同时有两个“Producerssend data to the
BufferBlock”
简化版:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
我想先启动消费者,然后将生产者作为单独的任务启动:
var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());
// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock
// await for the Consumer to finish:
await taskConsumer;
乍一看,这正是生产者-消费者的意思:多个生产者生产数据,而一个消费者消费生产的数据。
然而,BufferBlock about thread safety 说:
Any instance members are not guaranteed to be thread safe.
我还以为TPL里的P是Parallel的意思! 我应该担心吗?我的代码不是线程安全的吗? 我应该使用其他 TPL 数据流 class 吗?
我认为 ActionBlock<T>
更适合您的工作,因为它有一个内置缓冲区,许多生产者可以通过该缓冲区发送数据。默认块选项在单个后台任务上处理数据,但您可以为并行度和有限容量设置新值。使用 ActionBlock<T>
时,确保线程安全的主要关注领域将在您传递的处理每条消息的委托中。该函数的操作必须独立于每个消息,即不像任何 Parrallel...
函数那样修改共享状态。
public class ProducerConsumer
{
private ActionBlock<int> Consumer { get; }
public ProducerConsumer()
{
Consumer = new ActionBlock<int>(x => Process(x));
}
public async Task Start()
{
var producer1Tasks = Producer1();
var producer2Tasks = Producer2();
await Task.WhenAll(producer1Tasks.Concat(producer2Tasks));
Consumer.Complete();
await Consumer.Completion;
}
private void Process(int data)
{
// process
}
private IEnumerable<Task> Producer1() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
private IEnumerable<Task> Producer2() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
}
是的,BufferBlock
class 是线程安全的。我不能通过指向官方文档来支持此声明,因为“线程安全”部分已从文档中删除。但是我可以在源代码中看到 class 包含一个用于同步传入消息的锁定对象:
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
当 Post
extension method is called (source code), the explicitly implemented ITargetBlock.OfferMessage
method is invoked (source code)。以下是此方法的摘录:
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
//...
lock (IncomingLock)
{
//...
_source.AddMessage(messageValue);
//...
}
}
如果 class 或 TPL Dataflow 库中包含的任何其他 XxxBlock
class 不是线程安全的,那确实会很奇怪。这将严重妨碍这个伟大图书馆的易用性。