"bounded" 批处理块 => 动作块。如何完成正确的方法?
"bounded" BatchBlock => ActionBlock. How to complete the proper way?
我正在尝试使用链接到操作块的有界批处理块。
我知道 batchblock 中的项目何时结束,我想触发一个完成链。
问题是:如果我的 BatchBlock<T>
是给定的 BoundedCapacity
,我将不会在操作块中触发所有项目。
这是我的问题示例,它应该(根据我对 TPL 数据流的理解...)打印 0 到 124 但它最终打印 0 到 99。
一定是我遗漏了什么...也许 BoundedCapacity
意味着 "drop items when queue count is over xxx..." 如果是这样,我怎样才能达到保证的最大内存消耗?
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
int itemsCount = 125;
List<int> ints = new List<int>(itemsCount);
for (int i = 0; i < itemsCount; i++)
ints.Add(i);
BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (int i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
// feed the batch block
foreach (int i in ints)
batchBlock.Post(i);
// Don't know how to end the proper way... Meaning it should display 0 to 124 and not 0 to 99
batchBlock.Complete();
batchBlock.TriggerBatch();
actionBlock.Completion.Wait();
}
}
}
Post
上的块并不总是成功。它尝试 post 向块发送消息,但如果达到 BoundedCapacity
它将失败并且 return false
.
您可以使用 SendAsync
而不是 return 等待任务。如果该块有空间容纳您的消息,它将异步完成。如果没有,那么块 return 是一个任务,当它有空间接受新消息时将完成。您可以等待该任务并限制您的插入:
async Task MainAsync()
{
var ints = Enumerable.Range(0, 125).ToList();
var batchBlock = new BatchBlock<int>(50, new GroupingDataflowBlockOptions { BoundedCapacity = 100 });
var actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (var i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var i in ints)
await batchBlock.SendAsync(i); // wait synchronously for the block to accept.
batchBlock.Complete();
await actionBlock.Completion;
}
我正在尝试使用链接到操作块的有界批处理块。 我知道 batchblock 中的项目何时结束,我想触发一个完成链。
问题是:如果我的 BatchBlock<T>
是给定的 BoundedCapacity
,我将不会在操作块中触发所有项目。
这是我的问题示例,它应该(根据我对 TPL 数据流的理解...)打印 0 到 124 但它最终打印 0 到 99。
一定是我遗漏了什么...也许 BoundedCapacity
意味着 "drop items when queue count is over xxx..." 如果是这样,我怎样才能达到保证的最大内存消耗?
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
int itemsCount = 125;
List<int> ints = new List<int>(itemsCount);
for (int i = 0; i < itemsCount; i++)
ints.Add(i);
BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (int i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
// feed the batch block
foreach (int i in ints)
batchBlock.Post(i);
// Don't know how to end the proper way... Meaning it should display 0 to 124 and not 0 to 99
batchBlock.Complete();
batchBlock.TriggerBatch();
actionBlock.Completion.Wait();
}
}
}
Post
上的块并不总是成功。它尝试 post 向块发送消息,但如果达到 BoundedCapacity
它将失败并且 return false
.
您可以使用 SendAsync
而不是 return 等待任务。如果该块有空间容纳您的消息,它将异步完成。如果没有,那么块 return 是一个任务,当它有空间接受新消息时将完成。您可以等待该任务并限制您的插入:
async Task MainAsync()
{
var ints = Enumerable.Range(0, 125).ToList();
var batchBlock = new BatchBlock<int>(50, new GroupingDataflowBlockOptions { BoundedCapacity = 100 });
var actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (var i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var i in ints)
await batchBlock.SendAsync(i); // wait synchronously for the block to accept.
batchBlock.Complete();
await actionBlock.Completion;
}