Parallel.For SendAsync 到 BufferBlock 到异步转换?
Parallel.For SendAsync to BufferBlock to Async Transform?
我正在了解 TPL Dataflow
图书馆。到目前为止,这正是我要找的。
我创建了一个简单的 class(下图),它执行以下功能
- 执行
ImportPropertiesForBranch
后,我转到第 3 方 api 并获取属性列表
- 一个xml列表被返回并反序列化为一个属性数据数组(id,api端点,lastupdated)。大约有 400 多个属性(如房屋)。
- 然后我使用
Parallel.For
将 SendAsync
的 属性 数据放入我的 propertyBufferBlock
propertyBufferBlock
链接到 propertyXmlBlock
(它本身是 TransformBlock
)。
propertyXmlBlock
然后(异步地)返回到 API(使用 属性 数据中提供的 api 端点)并获取 属性 xml 用于反序列化。
- 一旦 xml 返回并可用,我们就可以反序列化
- 稍后,我将添加更多
TransformBlock
以将其保存到数据存储中。
所以我的问题是;
- 代码中是否存在任何潜在的瓶颈或区域可能会带来麻烦?我知道我没有包括任何错误处理或取消(即将到来)。
- 在
TransformBlock
中 await
异步调用是否可以,或者这是一个
瓶颈?
- 尽管代码有效,但我担心
Parallel.For
、BufferBlock
和 TransformBlock
中的异步的缓冲和异步性。我不确定这是最好的方法,我可能混淆了一些概念。
欢迎任何指导、改进和陷阱建议。
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;
namespace My.ImportService
{
public class ImportService
{
private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;
public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;
ConstructPipeline();
}
private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;
public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);
if (string.IsNullOrEmpty(propertyListXml))
return false;
var properties = _xmlService.DeserializePropertyList(propertyListXml);
if (properties?.property == null || properties.property.Length == 0)
return false;
// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));
propertyBufferBlock.Complete();
await propertyCompleteBlock.Completion;
return true;
}
private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();
propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}
private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}
private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}
Are there any potential bottlenecks or areas of the code that could be troublesome?
总的来说,您的方法看起来不错,潜在的瓶颈是您使用 MaxDegreeOfParallelism = 1
限制了块的并行处理。根据问题的描述,每个项目都可以独立于其他项目进行处理,这就是为什么您可以一次处理多个项目。
Is it ok to await async calls inside a TransformBlock
or is this a bottleneck?
非常好,因为 TPL DataFlow 支持异步操作。
Although the code works , I am worried about the buffering and asyncronsity of the Parallel.For
, BufferBlock
and async in the TransformBlock
. I'm not sure its the best way and I maybe mixing up some concepts.
第一,代码中可能让您搬起石头砸自己脚的潜在问题是在 Parallel.For
中调用异步方法,然后调用 propertyBufferBlock.Complete();
。这里的问题是 Parallel.For
不支持异步操作,您调用它的方式将调用 propertyBufferBlock.SendAsync
并在返回的任务完成之前继续。这意味着到 Parallel.For
退出时,某些操作可能仍处于 运行 状态,并且项目尚未添加到缓冲区块。如果您随后调用 propertyBufferBlock.Complete();
,那些待处理的项目将抛出异常并且项目不会被添加到处理中。你会得到未观察到的异常。
您可以使用 ForEachAsync
形式 this blog post 来确保在完成块之前将所有项目添加到块中。但是,如果您仍然将处理限制为 1 个操作,则只需一次添加一个项目即可。我不确定 propertyBufferBlock.SendAsync
是如何实现的,但可能会在内部限制一次添加一项,因此并行添加没有任何意义。
你确实做错了一些事情:
i => propertyBufferBlock.SendAsync(properties.property[i])
您需要 await
方法,否则您会同时创建太多任务。
还有这一行:
MaxDegreeOfParallelism = 1
会将块的执行限制为后续执行,这会降低性能。
正如您在评论中所说,您已切换到 同步 方法 Post
并通过设置 BoundedCapacity
限制了块的容量。应谨慎使用此变体,因为您需要检查它的 return 值,其中说明消息是否已被接受。
至于您担心在块内等待 async
方法 - 绝对没问题,应该像在其他 async
方法使用情况下一样完成。
我正在了解 TPL Dataflow
图书馆。到目前为止,这正是我要找的。
我创建了一个简单的 class(下图),它执行以下功能
- 执行
ImportPropertiesForBranch
后,我转到第 3 方 api 并获取属性列表 - 一个xml列表被返回并反序列化为一个属性数据数组(id,api端点,lastupdated)。大约有 400 多个属性(如房屋)。
- 然后我使用
Parallel.For
将SendAsync
的 属性 数据放入我的propertyBufferBlock
propertyBufferBlock
链接到propertyXmlBlock
(它本身是TransformBlock
)。propertyXmlBlock
然后(异步地)返回到 API(使用 属性 数据中提供的 api 端点)并获取 属性 xml 用于反序列化。- 一旦 xml 返回并可用,我们就可以反序列化
- 稍后,我将添加更多
TransformBlock
以将其保存到数据存储中。
所以我的问题是;
- 代码中是否存在任何潜在的瓶颈或区域可能会带来麻烦?我知道我没有包括任何错误处理或取消(即将到来)。
- 在
TransformBlock
中await
异步调用是否可以,或者这是一个 瓶颈? - 尽管代码有效,但我担心
Parallel.For
、BufferBlock
和TransformBlock
中的异步的缓冲和异步性。我不确定这是最好的方法,我可能混淆了一些概念。
欢迎任何指导、改进和陷阱建议。
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;
namespace My.ImportService
{
public class ImportService
{
private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;
public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;
ConstructPipeline();
}
private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;
public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);
if (string.IsNullOrEmpty(propertyListXml))
return false;
var properties = _xmlService.DeserializePropertyList(propertyListXml);
if (properties?.property == null || properties.property.Length == 0)
return false;
// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));
propertyBufferBlock.Complete();
await propertyCompleteBlock.Completion;
return true;
}
private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();
propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}
private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}
private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}
Are there any potential bottlenecks or areas of the code that could be troublesome?
总的来说,您的方法看起来不错,潜在的瓶颈是您使用 MaxDegreeOfParallelism = 1
限制了块的并行处理。根据问题的描述,每个项目都可以独立于其他项目进行处理,这就是为什么您可以一次处理多个项目。
Is it ok to await async calls inside a
TransformBlock
or is this a bottleneck?
非常好,因为 TPL DataFlow 支持异步操作。
Although the code works , I am worried about the buffering and asyncronsity of the
Parallel.For
,BufferBlock
and async in theTransformBlock
. I'm not sure its the best way and I maybe mixing up some concepts.
第一,代码中可能让您搬起石头砸自己脚的潜在问题是在 Parallel.For
中调用异步方法,然后调用 propertyBufferBlock.Complete();
。这里的问题是 Parallel.For
不支持异步操作,您调用它的方式将调用 propertyBufferBlock.SendAsync
并在返回的任务完成之前继续。这意味着到 Parallel.For
退出时,某些操作可能仍处于 运行 状态,并且项目尚未添加到缓冲区块。如果您随后调用 propertyBufferBlock.Complete();
,那些待处理的项目将抛出异常并且项目不会被添加到处理中。你会得到未观察到的异常。
您可以使用 ForEachAsync
形式 this blog post 来确保在完成块之前将所有项目添加到块中。但是,如果您仍然将处理限制为 1 个操作,则只需一次添加一个项目即可。我不确定 propertyBufferBlock.SendAsync
是如何实现的,但可能会在内部限制一次添加一项,因此并行添加没有任何意义。
你确实做错了一些事情:
i => propertyBufferBlock.SendAsync(properties.property[i])
您需要 await
方法,否则您会同时创建太多任务。
还有这一行:
MaxDegreeOfParallelism = 1
会将块的执行限制为后续执行,这会降低性能。
正如您在评论中所说,您已切换到 同步 方法 Post
并通过设置 BoundedCapacity
限制了块的容量。应谨慎使用此变体,因为您需要检查它的 return 值,其中说明消息是否已被接受。
至于您担心在块内等待 async
方法 - 绝对没问题,应该像在其他 async
方法使用情况下一样完成。