TPL 数据流 LinkTo TransformBlock 非常慢
TPL Dataflow LinkTo TransformBlock is very slow
我有两个 TransformBlocks,它们排列成一个循环。他们 link 互相分享他们的数据。 TransformBlock 1 是一个 I/O 块读取数据并且被限制为最多 50 个任务。它读取数据和一些元数据。然后它们被传递到第二个块。如果消息再次进入第一个块,第二个块决定元数据。因此,在元数据符合条件并稍等片刻后,数据应该再次返回到 I/O 块。第二块 MaxDegreeOfParallelism 可以是无限的。
现在我注意到当我向 I/O 块发送大量数据时,需要很长时间才能将消息 linked 到第二个块。 link 数据大约需要 10 分钟,而且它们都是成批发送的。几秒钟内就像 1000 个条目。
通常我会这样实现它:
public void Start()
{
_ioBlock = new TransformBlock<Data,Tuple<Data, MetaData>>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
return new Tuple<Data, MetaData>(data, metaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new TransformBlock<Tuple<Data, MetaData>,Data>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (!metaData.Repost)
{
return null;
}
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return data;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
_ioBlock.LinkTo(_waitBlock);
_waitBlock.LinkTo(_ioBlock, data => data != null);
_waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
但是由于所描述的问题,我必须这样实现它:
public void Start()
{
_ioBlock = new ActionBlock<Data>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
var dataMetaData= new Tuple<Data, MetaData>(data, metaData);
_waitBlock.Post(dataMetaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new ActionBlock<Tuple<Data, MetaData>>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (metaData.Repost)
{
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
_ioBlock.Post(data);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
当我使用第二种方法时,数据变得 linked/posted 更快(一个接一个)。但这对我来说更像是一种黑客攻击。有人知道如何解决这个问题吗?一些朋友推荐我使用 TPL 管道,但我觉得它更复杂。
问题已解决。您需要设置
ExecutionDataflowBlockOptions.EnsureOrdered
立即将数据转发到next/wait块。
更多信息:
我有两个 TransformBlocks,它们排列成一个循环。他们 link 互相分享他们的数据。 TransformBlock 1 是一个 I/O 块读取数据并且被限制为最多 50 个任务。它读取数据和一些元数据。然后它们被传递到第二个块。如果消息再次进入第一个块,第二个块决定元数据。因此,在元数据符合条件并稍等片刻后,数据应该再次返回到 I/O 块。第二块 MaxDegreeOfParallelism 可以是无限的。
现在我注意到当我向 I/O 块发送大量数据时,需要很长时间才能将消息 linked 到第二个块。 link 数据大约需要 10 分钟,而且它们都是成批发送的。几秒钟内就像 1000 个条目。 通常我会这样实现它:
public void Start()
{
_ioBlock = new TransformBlock<Data,Tuple<Data, MetaData>>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
return new Tuple<Data, MetaData>(data, metaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new TransformBlock<Tuple<Data, MetaData>,Data>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (!metaData.Repost)
{
return null;
}
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return data;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
_ioBlock.LinkTo(_waitBlock);
_waitBlock.LinkTo(_ioBlock, data => data != null);
_waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
但是由于所描述的问题,我必须这样实现它:
public void Start()
{
_ioBlock = new ActionBlock<Data>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
var dataMetaData= new Tuple<Data, MetaData>(data, metaData);
_waitBlock.Post(dataMetaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new ActionBlock<Tuple<Data, MetaData>>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (metaData.Repost)
{
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
_ioBlock.Post(data);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
当我使用第二种方法时,数据变得 linked/posted 更快(一个接一个)。但这对我来说更像是一种黑客攻击。有人知道如何解决这个问题吗?一些朋友推荐我使用 TPL 管道,但我觉得它更复杂。
问题已解决。您需要设置
ExecutionDataflowBlockOptions.EnsureOrdered
立即将数据转发到next/wait块。
更多信息: