使用 Batchblock.Triggerbatch() 在 TPL 数据流管道中传播数据
Data Propagation in TPL Dataflow Pipeline with Batchblock.Triggerbatch()
在我的生产者-消费者场景中,我有多个消费者,每个消费者都会向外部硬件发送一个动作,这可能需要一些时间。我的管道看起来有点像这样:
BatchBlock --> TransformBlock --> BufferBlock --> (几个) ActionBlocks
我已将我的 ActionBlocks 的 BoundedCapacity 指定为 1。
理论上我想要的是,仅当我的一个 Actionblock 可用于操作时,我想触发 Batchblock 将一组项目发送到 Transformblock。到那时,Batchblock 应该只保留缓冲元素,而不是将它们传递给 Transformblock。我的批量大小是可变的。由于 Batchsize 是强制性的,我确实对 BatchBlock 批处理大小有一个非常高的上限,但是我真的不想达到这个限制,我想根据执行上述任务的 Actionblocks 的可用性来触发我的批处理.
我是借助 Triggerbatch() 方法实现的。有趣的是,经过几天的正常工作后,我将 Batchblock.Triggerbatch() 作为 ActionBlock.However 中的最后一个操作调用,管道出现故障。经过检查,我发现有时在 ActionBlocks 完成工作后,batchblock 的输入才会进来。在这种情况下,ActionBlocks 实际上会在其工作结束时调用 Triggerbatch,但是由于此时 Batchblock 根本没有输入,因此对 TriggerBatch 的调用没有结果。一段时间后,当输入确实流入 Batchblock 时,就没有人可以调用 TriggerBatch 并重新启动 Pipeline。我一直在寻找可以检查 Batchblock 的输入缓冲区中是否确实存在某些东西的东西,但是没有这样的功能可用,我也找不到检查 TriggerBatch 是否有效的方法。
任何人都可以为我的问题提出一个可能的解决方案。不幸的是,我不适合使用 Timer 来触发批处理。除了 Pipeline 的启动,节流应该仅由 ActionBlocks 之一的可用性来控制。
示例代码在这里:
static BatchBlock<int> _groupReadTags;
static void Main(string[] args)
{
_groupReadTags = new BatchBlock<int>(1000);
var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
int batchNo = 1;
TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
{
Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
//_groupReadTags.TriggerBatch();
int sum = 0;
foreach (int item in list)
{
Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
sum += item;
}
batchNo++;
return sum;
});
ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
{
Console.WriteLine("Number from ONE :{0}",x);
await Task.Delay(500);
Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);
_groupReadTags.TriggerBatch();
},consumerOptions);
ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
{
Console.WriteLine("Number from TWO :{0}", x);
await Task.Delay(2000);
_groupReadTags.TriggerBatch();
}, consumerOptions);
_groupReadTags.LinkTo(_workingBlock);
_workingBlock.LinkTo(_frameBuffer);
_frameBuffer.LinkTo(_worker1);
_frameBuffer.LinkTo(_worker2);
_groupReadTags.Post(10);
_groupReadTags.Post(20);
_groupReadTags.TriggerBatch();
Task postingTask = new Task(() => PostStuff());
postingTask.Start();
Console.ReadLine();
}
static void PostStuff()
{
for (int i = 0; i < 10; i++)
{
_groupReadTags.Post(i);
Thread.Sleep(100);
}
Parallel.Invoke(
() => _groupReadTags.Post(100),
() => _groupReadTags.Post(200),
() => _groupReadTags.Post(300),
() => _groupReadTags.Post(400),
() => _groupReadTags.Post(500),
() => _groupReadTags.Post(600),
() => _groupReadTags.Post(700),
() => _groupReadTags.Post(800)
);
}
我发现以这种方式使用 TriggerBatch
是不可靠的:
_groupReadTags.Post(10);
_groupReadTags.Post(20);
_groupReadTags.TriggerBatch();
显然 TriggerBatch
是为了在块内使用,而不是像这样在块外使用。我已经看到这会导致奇怪的计时问题,例如下一批批次中的项目被包含在当前批次中,即使首先调用了 TriggerBatch。
请参阅我对这个问题的回答,了解使用 DataflowBlock.Encapsulate
的替代方法:BatchBlock produces batch with elements sent after TriggerBatch()
这是具有一些额外功能的替代 BatchBlock
实现。它包括一个带有此签名的 TriggerBatch
方法:
public int TriggerBatch(int nextMinBatchSizeIfEmpty);
如果输入队列不为空,调用此方法将立即触发一个批处理,否则它会设置一个临时 MinBatchSize
,仅影响下一个批处理。您可以使用较小的 nextMinBatchSizeIfEmpty
值调用此方法,以确保在当前无法生成批次的情况下,下一批次将比块构造函数中配置的 BatchSize
发生得更快。
此方法returns生产的批次大小。它 returns 0
如果输入队列为空,或者输出队列已满,或者块已完成。
public class BatchBlockEx<T> : ITargetBlock<T>, ISourceBlock<T[]>
{
private readonly ITargetBlock<T> _input;
private readonly IPropagatorBlock<T[], T[]> _output;
private readonly Queue<T> _queue;
private readonly object _locker = new object();
private int _nextMinBatchSize = Int32.MaxValue;
public Task Completion { get; }
public int InputCount { get { lock (_locker) return _queue.Count; } }
public int OutputCount => ((BufferBlock<T[]>)_output).Count;
public int BatchSize { get; }
public BatchBlockEx(int batchSize, DataflowBlockOptions dataflowBlockOptions = null)
{
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
dataflowBlockOptions = dataflowBlockOptions ?? new DataflowBlockOptions();
if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded &&
dataflowBlockOptions.BoundedCapacity < batchSize)
throw new ArgumentOutOfRangeException(nameof(batchSize),
"Number must be no greater than the value specified in BoundedCapacity.");
this.BatchSize = batchSize;
_output = new BufferBlock<T[]>(dataflowBlockOptions);
_queue = new Queue<T>(batchSize);
_input = new ActionBlock<T>(async item =>
{
T[] batch = null;
lock (_locker)
{
_queue.Enqueue(item);
if (_queue.Count == batchSize || _queue.Count >= _nextMinBatchSize)
{
batch = _queue.ToArray(); _queue.Clear();
_nextMinBatchSize = Int32.MaxValue;
}
}
if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = 1,
CancellationToken = dataflowBlockOptions.CancellationToken
});
var inputContinuation = _input.Completion.ContinueWith(async t =>
{
try
{
T[] batch = null;
lock (_locker)
{
if (_queue.Count > 0)
{
batch = _queue.ToArray(); _queue.Clear();
}
}
if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
}
finally
{
if (t.IsFaulted)
{
_output.Fault(t.Exception.InnerException);
}
else
{
_output.Complete();
}
}
}, TaskScheduler.Default).Unwrap();
this.Completion = Task.WhenAll(inputContinuation, _output.Completion);
}
public void Complete() => _input.Complete();
void IDataflowBlock.Fault(Exception ex) => _input.Fault(ex);
public int TriggerBatch(Func<T[], bool> condition, int nextMinBatchSizeIfEmpty)
{
if (nextMinBatchSizeIfEmpty < 1)
throw new ArgumentOutOfRangeException(nameof(nextMinBatchSizeIfEmpty));
int count = 0;
lock (_locker)
{
if (_queue.Count > 0)
{
T[] batch = _queue.ToArray();
if (condition == null || condition(batch))
{
bool accepted = _output.Post(batch);
if (accepted) { _queue.Clear(); count = batch.Length; }
}
_nextMinBatchSize = Int32.MaxValue;
}
else
{
_nextMinBatchSize = nextMinBatchSizeIfEmpty;
}
}
return count;
}
public int TriggerBatch(Func<T[], bool> condition)
=> TriggerBatch(condition, Int32.MaxValue);
public int TriggerBatch(int nextMinBatchSizeIfEmpty)
=> TriggerBatch(null, nextMinBatchSizeIfEmpty);
public int TriggerBatch() => TriggerBatch(null, Int32.MaxValue);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue,
ISourceBlock<T> source, bool consumeToAccept)
{
return _input.OfferMessage(messageHeader, messageValue, source,
consumeToAccept);
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
{
return _output.ConsumeMessage(messageHeader, target, out messageConsumed);
}
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
return _output.ReserveMessage(messageHeader, target);
}
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
_output.ReleaseReservation(messageHeader, target);
}
IDisposable ISourceBlock<T[]>.LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
{
return _output.LinkTo(target, linkOptions);
}
}
TriggerBatch
方法的另一个重载允许检查当前可以生产的批次,并决定是否应该触发它:
public int TriggerBatch(Func<T[], bool> condition);
BatchBlockEx
class不支持内置BatchBlock
的Greedy
and MaxNumberOfGroups
选项。
在我的生产者-消费者场景中,我有多个消费者,每个消费者都会向外部硬件发送一个动作,这可能需要一些时间。我的管道看起来有点像这样:
BatchBlock --> TransformBlock --> BufferBlock --> (几个) ActionBlocks
我已将我的 ActionBlocks 的 BoundedCapacity 指定为 1。 理论上我想要的是,仅当我的一个 Actionblock 可用于操作时,我想触发 Batchblock 将一组项目发送到 Transformblock。到那时,Batchblock 应该只保留缓冲元素,而不是将它们传递给 Transformblock。我的批量大小是可变的。由于 Batchsize 是强制性的,我确实对 BatchBlock 批处理大小有一个非常高的上限,但是我真的不想达到这个限制,我想根据执行上述任务的 Actionblocks 的可用性来触发我的批处理.
我是借助 Triggerbatch() 方法实现的。有趣的是,经过几天的正常工作后,我将 Batchblock.Triggerbatch() 作为 ActionBlock.However 中的最后一个操作调用,管道出现故障。经过检查,我发现有时在 ActionBlocks 完成工作后,batchblock 的输入才会进来。在这种情况下,ActionBlocks 实际上会在其工作结束时调用 Triggerbatch,但是由于此时 Batchblock 根本没有输入,因此对 TriggerBatch 的调用没有结果。一段时间后,当输入确实流入 Batchblock 时,就没有人可以调用 TriggerBatch 并重新启动 Pipeline。我一直在寻找可以检查 Batchblock 的输入缓冲区中是否确实存在某些东西的东西,但是没有这样的功能可用,我也找不到检查 TriggerBatch 是否有效的方法。
任何人都可以为我的问题提出一个可能的解决方案。不幸的是,我不适合使用 Timer 来触发批处理。除了 Pipeline 的启动,节流应该仅由 ActionBlocks 之一的可用性来控制。
示例代码在这里:
static BatchBlock<int> _groupReadTags;
static void Main(string[] args)
{
_groupReadTags = new BatchBlock<int>(1000);
var bufferOptions = new DataflowBlockOptions{BoundedCapacity = 2};
BufferBlock<int> _frameBuffer = new BufferBlock<int>(bufferOptions);
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1};
int batchNo = 1;
TransformBlock<int[], int> _workingBlock = new TransformBlock<int[], int>(list =>
{
Console.WriteLine("\n\nWorking on Batch Number {0}", batchNo);
//_groupReadTags.TriggerBatch();
int sum = 0;
foreach (int item in list)
{
Console.WriteLine("Elements in batch {0} :: {1}", batchNo, item);
sum += item;
}
batchNo++;
return sum;
});
ActionBlock<int> _worker1 = new ActionBlock<int>(async x =>
{
Console.WriteLine("Number from ONE :{0}",x);
await Task.Delay(500);
Console.WriteLine("BatchBlock Output Count : {0}", _groupReadTags.OutputCount);
_groupReadTags.TriggerBatch();
},consumerOptions);
ActionBlock<int> _worker2 = new ActionBlock<int>(async x =>
{
Console.WriteLine("Number from TWO :{0}", x);
await Task.Delay(2000);
_groupReadTags.TriggerBatch();
}, consumerOptions);
_groupReadTags.LinkTo(_workingBlock);
_workingBlock.LinkTo(_frameBuffer);
_frameBuffer.LinkTo(_worker1);
_frameBuffer.LinkTo(_worker2);
_groupReadTags.Post(10);
_groupReadTags.Post(20);
_groupReadTags.TriggerBatch();
Task postingTask = new Task(() => PostStuff());
postingTask.Start();
Console.ReadLine();
}
static void PostStuff()
{
for (int i = 0; i < 10; i++)
{
_groupReadTags.Post(i);
Thread.Sleep(100);
}
Parallel.Invoke(
() => _groupReadTags.Post(100),
() => _groupReadTags.Post(200),
() => _groupReadTags.Post(300),
() => _groupReadTags.Post(400),
() => _groupReadTags.Post(500),
() => _groupReadTags.Post(600),
() => _groupReadTags.Post(700),
() => _groupReadTags.Post(800)
);
}
我发现以这种方式使用 TriggerBatch
是不可靠的:
_groupReadTags.Post(10);
_groupReadTags.Post(20);
_groupReadTags.TriggerBatch();
显然 TriggerBatch
是为了在块内使用,而不是像这样在块外使用。我已经看到这会导致奇怪的计时问题,例如下一批批次中的项目被包含在当前批次中,即使首先调用了 TriggerBatch。
请参阅我对这个问题的回答,了解使用 DataflowBlock.Encapsulate
的替代方法:BatchBlock produces batch with elements sent after TriggerBatch()
这是具有一些额外功能的替代 BatchBlock
实现。它包括一个带有此签名的 TriggerBatch
方法:
public int TriggerBatch(int nextMinBatchSizeIfEmpty);
如果输入队列不为空,调用此方法将立即触发一个批处理,否则它会设置一个临时 MinBatchSize
,仅影响下一个批处理。您可以使用较小的 nextMinBatchSizeIfEmpty
值调用此方法,以确保在当前无法生成批次的情况下,下一批次将比块构造函数中配置的 BatchSize
发生得更快。
此方法returns生产的批次大小。它 returns 0
如果输入队列为空,或者输出队列已满,或者块已完成。
public class BatchBlockEx<T> : ITargetBlock<T>, ISourceBlock<T[]>
{
private readonly ITargetBlock<T> _input;
private readonly IPropagatorBlock<T[], T[]> _output;
private readonly Queue<T> _queue;
private readonly object _locker = new object();
private int _nextMinBatchSize = Int32.MaxValue;
public Task Completion { get; }
public int InputCount { get { lock (_locker) return _queue.Count; } }
public int OutputCount => ((BufferBlock<T[]>)_output).Count;
public int BatchSize { get; }
public BatchBlockEx(int batchSize, DataflowBlockOptions dataflowBlockOptions = null)
{
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
dataflowBlockOptions = dataflowBlockOptions ?? new DataflowBlockOptions();
if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded &&
dataflowBlockOptions.BoundedCapacity < batchSize)
throw new ArgumentOutOfRangeException(nameof(batchSize),
"Number must be no greater than the value specified in BoundedCapacity.");
this.BatchSize = batchSize;
_output = new BufferBlock<T[]>(dataflowBlockOptions);
_queue = new Queue<T>(batchSize);
_input = new ActionBlock<T>(async item =>
{
T[] batch = null;
lock (_locker)
{
_queue.Enqueue(item);
if (_queue.Count == batchSize || _queue.Count >= _nextMinBatchSize)
{
batch = _queue.ToArray(); _queue.Clear();
_nextMinBatchSize = Int32.MaxValue;
}
}
if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = 1,
CancellationToken = dataflowBlockOptions.CancellationToken
});
var inputContinuation = _input.Completion.ContinueWith(async t =>
{
try
{
T[] batch = null;
lock (_locker)
{
if (_queue.Count > 0)
{
batch = _queue.ToArray(); _queue.Clear();
}
}
if (batch != null) await _output.SendAsync(batch).ConfigureAwait(false);
}
finally
{
if (t.IsFaulted)
{
_output.Fault(t.Exception.InnerException);
}
else
{
_output.Complete();
}
}
}, TaskScheduler.Default).Unwrap();
this.Completion = Task.WhenAll(inputContinuation, _output.Completion);
}
public void Complete() => _input.Complete();
void IDataflowBlock.Fault(Exception ex) => _input.Fault(ex);
public int TriggerBatch(Func<T[], bool> condition, int nextMinBatchSizeIfEmpty)
{
if (nextMinBatchSizeIfEmpty < 1)
throw new ArgumentOutOfRangeException(nameof(nextMinBatchSizeIfEmpty));
int count = 0;
lock (_locker)
{
if (_queue.Count > 0)
{
T[] batch = _queue.ToArray();
if (condition == null || condition(batch))
{
bool accepted = _output.Post(batch);
if (accepted) { _queue.Clear(); count = batch.Length; }
}
_nextMinBatchSize = Int32.MaxValue;
}
else
{
_nextMinBatchSize = nextMinBatchSizeIfEmpty;
}
}
return count;
}
public int TriggerBatch(Func<T[], bool> condition)
=> TriggerBatch(condition, Int32.MaxValue);
public int TriggerBatch(int nextMinBatchSizeIfEmpty)
=> TriggerBatch(null, nextMinBatchSizeIfEmpty);
public int TriggerBatch() => TriggerBatch(null, Int32.MaxValue);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(
DataflowMessageHeader messageHeader, T messageValue,
ISourceBlock<T> source, bool consumeToAccept)
{
return _input.OfferMessage(messageHeader, messageValue, source,
consumeToAccept);
}
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
{
return _output.ConsumeMessage(messageHeader, target, out messageConsumed);
}
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
return _output.ReserveMessage(messageHeader, target);
}
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
_output.ReleaseReservation(messageHeader, target);
}
IDisposable ISourceBlock<T[]>.LinkTo(ITargetBlock<T[]> target,
DataflowLinkOptions linkOptions)
{
return _output.LinkTo(target, linkOptions);
}
}
TriggerBatch
方法的另一个重载允许检查当前可以生产的批次,并决定是否应该触发它:
public int TriggerBatch(Func<T[], bool> condition);
BatchBlockEx
class不支持内置BatchBlock
的Greedy
and MaxNumberOfGroups
选项。