条件匹配时 TPL 数据流完成管道
TPL Dataflow Complete Pipeline when condition matches
我认为这是非常基本的方法,但我还没有找到任何示例。
我有一个生产者和一个消费者,我想在至少处理 x 个对象时完成管道。此外,我需要知道收到了哪些对象。
我就是这样做的:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
我对 buffer.TryReceiveAll 有点困惑。等待消费任务和 TryReceiveAll 有什么区别?为什么 TryReceiveAll 在我的场景中是错误的?我想我实现目标的方法还是有问题。
您的 Consume
方法应该是 ActionBlock
。无需使用 OutputAvailableAsync
或 TryRecieveAll
。将 BufferBlock
替换为 ActionBlock
并在 ActionBlock
中进行处理。目前还不清楚为什么你需要 TransformBlock
除非你在这个过程中有不止一个步骤。
public class BlockTester
{
//Could be removed
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<int, int>(s => s + s);
var processor = new ActionBlock<int>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static int itemsRecieved = 0;
public static void ProcessMessage(int x)
{
Interlocked.Increment(ref itemsRecieved);
if (itemsRecieved > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
或者使用复杂的消息对象:
public class Message { }
public class BlockTester
{
//Could be removed
private static TransformBlock<Message, Message> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<Message, Message>(s => s);
var processor = new ActionBlock<Message>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
public static void ProcessMessage(Message x)
{
itemsRecieved.Add(x);
if (itemsRecieved.Count > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
编辑
回答原始问题:
Why does TryReceiveAll
return false:
因为 TryReceiveAll
是 运行 时 BufferBlock
已经 "completed"。对于要完成的块,它的输出缓冲区中必须包含 0 个项目。 Consume
方法在块被允许完成之前将所有项目拉出,你最终会在一个空块上调用 TryRecieveAll
。
我认为这是非常基本的方法,但我还没有找到任何示例。 我有一个生产者和一个消费者,我想在至少处理 x 个对象时完成管道。此外,我需要知道收到了哪些对象。
我就是这样做的:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
我对 buffer.TryReceiveAll 有点困惑。等待消费任务和 TryReceiveAll 有什么区别?为什么 TryReceiveAll 在我的场景中是错误的?我想我实现目标的方法还是有问题。
您的 Consume
方法应该是 ActionBlock
。无需使用 OutputAvailableAsync
或 TryRecieveAll
。将 BufferBlock
替换为 ActionBlock
并在 ActionBlock
中进行处理。目前还不清楚为什么你需要 TransformBlock
除非你在这个过程中有不止一个步骤。
public class BlockTester
{
//Could be removed
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<int, int>(s => s + s);
var processor = new ActionBlock<int>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static int itemsRecieved = 0;
public static void ProcessMessage(int x)
{
Interlocked.Increment(ref itemsRecieved);
if (itemsRecieved > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
或者使用复杂的消息对象:
public class Message { }
public class BlockTester
{
//Could be removed
private static TransformBlock<Message, Message> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<Message, Message>(s => s);
var processor = new ActionBlock<Message>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
public static void ProcessMessage(Message x)
{
itemsRecieved.Add(x);
if (itemsRecieved.Count > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
编辑 回答原始问题:
Why does
TryReceiveAll
return false:
因为 TryReceiveAll
是 运行 时 BufferBlock
已经 "completed"。对于要完成的块,它的输出缓冲区中必须包含 0 个项目。 Consume
方法在块被允许完成之前将所有项目拉出,你最终会在一个空块上调用 TryRecieveAll
。