等待多个输入时如何使用 TPL TransformBlock?
How do I use a TPL TransformBlock when waiting for multiple inputs?
如果我有多个数据源(比如来自数据库),然后执行一些 CPU 绑定工作,我如何使用 TPL DataFlow 表示它?
我注意到 TransformBlock 采用一个输入源,但输入来自多个源,我想充分利用并行性来实现这一点。
最好的方法是使用常规 TPL 或并行扩展对数据库执行 IO 绑定工作,然后将此数据合并到 TransformBlock 的一个点吗?
看看JoinBlock,它可能是你需要的。
您还可以创建一个 Custom Dataflow Block Type,可以实现您想要的效果。
例如,如果您希望在 "process" 它们之前到达 5 个对象,并且 return 单个对象(我在这里使用 expando 来说明...),到单个接收器(也应该异步等待):
public sealed class YourCustomBlock : IPropagatorBlock<ExpandoObject, ExpandoObject>
{
// The target part of the block. coming in
private readonly ITargetBlock<ExpandoObject> m_target;
// The source part of the block. going out
private readonly ISourceBlock<ExpandoObject> m_source;
// dependency count
private readonly int _size ;
// temporary holding area of incoming stuff
private Queue<ExpandoObject> _queue;
public YourCustomBlock(int inputs)
{
_size = inputs;
_queue = new Queue<ExpandoObject>(_size);
var mainWorker= new TransformBlock<ExpandoObject[], ExpandoObject> (async expandoArray =>
{
// Do Your Stuff with expandoArray and then return something
// ExpandoObject in this example
await Task.Delay(1000).ConfigureAwait(false);
return /*Some ExpandoObject */;
});
var head = new ActionBlock<ExpandoObject>(async item =>
{
_queue.Enqueue(item);
if (_queue.Count > _size)
{
_queue.Dequeue();
}
// Post when you reach
// the size
if (_queue.Count == _size)
{
await mainWorker.SendAsync(_queue.ToArray());
_queue.Clear();
}
});
// expose as a block
m_source = mainWorker;
m_target = head;
}
}
示例使用:
var myBlock = new YourCustomBlock(5);
Task.Run(async () => {
for (var i=0;i<5;i++) {
await myBlock.SendAsync(/*SomeExpandoObject*/).ConfigureAwait(false);
}
});
var results = await myBlock.ReceiveAsync().ConfigureAwait(false);
注意:这还没有经过编译检查,只是一个想法的说明。
如果我有多个数据源(比如来自数据库),然后执行一些 CPU 绑定工作,我如何使用 TPL DataFlow 表示它?
我注意到 TransformBlock 采用一个输入源,但输入来自多个源,我想充分利用并行性来实现这一点。
最好的方法是使用常规 TPL 或并行扩展对数据库执行 IO 绑定工作,然后将此数据合并到 TransformBlock 的一个点吗?
看看JoinBlock,它可能是你需要的。
您还可以创建一个 Custom Dataflow Block Type,可以实现您想要的效果。
例如,如果您希望在 "process" 它们之前到达 5 个对象,并且 return 单个对象(我在这里使用 expando 来说明...),到单个接收器(也应该异步等待):
public sealed class YourCustomBlock : IPropagatorBlock<ExpandoObject, ExpandoObject>
{
// The target part of the block. coming in
private readonly ITargetBlock<ExpandoObject> m_target;
// The source part of the block. going out
private readonly ISourceBlock<ExpandoObject> m_source;
// dependency count
private readonly int _size ;
// temporary holding area of incoming stuff
private Queue<ExpandoObject> _queue;
public YourCustomBlock(int inputs)
{
_size = inputs;
_queue = new Queue<ExpandoObject>(_size);
var mainWorker= new TransformBlock<ExpandoObject[], ExpandoObject> (async expandoArray =>
{
// Do Your Stuff with expandoArray and then return something
// ExpandoObject in this example
await Task.Delay(1000).ConfigureAwait(false);
return /*Some ExpandoObject */;
});
var head = new ActionBlock<ExpandoObject>(async item =>
{
_queue.Enqueue(item);
if (_queue.Count > _size)
{
_queue.Dequeue();
}
// Post when you reach
// the size
if (_queue.Count == _size)
{
await mainWorker.SendAsync(_queue.ToArray());
_queue.Clear();
}
});
// expose as a block
m_source = mainWorker;
m_target = head;
}
}
示例使用:
var myBlock = new YourCustomBlock(5);
Task.Run(async () => {
for (var i=0;i<5;i++) {
await myBlock.SendAsync(/*SomeExpandoObject*/).ConfigureAwait(false);
}
});
var results = await myBlock.ReceiveAsync().ConfigureAwait(false);
注意:这还没有经过编译检查,只是一个想法的说明。