Parallel.For SendAsync 到 BufferBlock 到异步转换?

Parallel.For SendAsync to BufferBlock to Async Transform?

我正在了解 TPL Dataflow 图书馆。到目前为止,这正是我要找的。

我创建了一个简单的 class(下图),它执行以下功能

所以我的问题是;

欢迎任何指导、改进和陷阱建议。

using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;

namespace My.ImportService
{
    public class ImportService
    {

        private readonly IApiService _apiService;
        private readonly IXmlService _xmlService;
        private readonly IRepositoryService _repositoryService;

        public ImportService(IApiService apiService,
            IXmlService xmlService,
            IRepositoryService repositoryService)
        {
            _apiService = apiService;
            _xmlService = xmlService;
            _repositoryService = repositoryService;

            ConstructPipeline();
        }

        private BufferBlock<propertiesProperty> propertyBufferBlock;
        private TransformBlock<propertiesProperty, string> propertyXmlBlock;
        private TransformBlock<string, propertyType> propertyDeserializeBlock;
        private ActionBlock<propertyType> propertyCompleteBlock;

        public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
        {
            var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);

            if (string.IsNullOrEmpty(propertyListXml))
                return false;

            var properties = _xmlService.DeserializePropertyList(propertyListXml);

            if (properties?.property == null || properties.property.Length == 0)
                return false;

            // limited to the first 20 for testing
            Parallel.For(0, 20,
                new ParallelOptions {MaxDegreeOfParallelism = 3},
                i => propertyBufferBlock.SendAsync(properties.property[i]));

            propertyBufferBlock.Complete();

            await propertyCompleteBlock.Completion;

            return true;
        }

        private void ConstructPipeline()
        {
            propertyBufferBlock = GetPropertyBuffer();
            propertyXmlBlock = GetPropertyXmlBlock();
            propertyDeserializeBlock = GetPropertyDeserializeBlock();
            propertyCompleteBlock = GetPropertyCompleteBlock();

            propertyBufferBlock.LinkTo(
                propertyXmlBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyXmlBlock.LinkTo(
                propertyDeserializeBlock,
                new DataflowLinkOptions {PropagateCompletion = true});

            propertyDeserializeBlock.LinkTo(
                propertyCompleteBlock,
                new DataflowLinkOptions {PropagateCompletion = true});
        }

        private BufferBlock<propertiesProperty> GetPropertyBuffer()
        {
            return new BufferBlock<propertiesProperty>();
        }

        private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
        {
            return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
                {
                    Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
                    var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
                    return propertyXml;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
        {
            return new TransformBlock<string, propertyType>(xmlAsString =>
                {
                    Debug.WriteLine($"deserializing");
                    var propertyType = _xmlService.DeserializeProperty(xmlAsString);
                    return propertyType;
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }

        private ActionBlock<propertyType> GetPropertyCompleteBlock()
        {
            return new ActionBlock<propertyType>(propertyType =>
                {
                    Debug.WriteLine($"complete {propertyType.id}");
                    Debug.WriteLine(propertyType.address.display);
                },
                new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity = 2
                });
        }
    }
}

Are there any potential bottlenecks or areas of the code that could be troublesome?

总的来说,您的方法看起来不错,潜在的瓶颈是您使用 MaxDegreeOfParallelism = 1 限制了块的并行处理。根据问题的描述,每个项目都可以独立于其他项目进行处理,这就是为什么您可以一次处理多个项目。

Is it ok to await async calls inside a TransformBlock or is this a bottleneck?

非常好,因为 TPL DataFlow 支持异步操作。

Although the code works , I am worried about the buffering and asyncronsity of the Parallel.For, BufferBlock and async in the TransformBlock. I'm not sure its the best way and I maybe mixing up some concepts.

第一,代码中可能让您搬起石头砸自己脚的潜在问题是在 Parallel.For 中调用异步方法,然后调用 propertyBufferBlock.Complete();。这里的问题是 Parallel.For 不支持异步操作,您调用它的方式将调用 propertyBufferBlock.SendAsync 并在返回的任务完成之前继续。这意味着到 Parallel.For 退出时,某些操作可能仍处于 运行 状态,并且项目尚未添加到缓冲区块。如果您随后调用 propertyBufferBlock.Complete();,那些待处理的项目将抛出异常并且项目不会被添加到处理中。你会得到未观察到的异常。

您可以使用 ForEachAsync 形式 this blog post 来确保在完成块之前将所有项目添加到块中。但是,如果您仍然将处理限制为 1 个操作,则只需一次添加一个项目即可。我不确定 propertyBufferBlock.SendAsync 是如何实现的,但可能会在内部限制一次添加一项,因此并行添加没有任何意义。

你确实做错了一些事情:

i => propertyBufferBlock.SendAsync(properties.property[i])

您需要 await 方法,否则您会同时创建太多任务。

还有这一行:

MaxDegreeOfParallelism = 1

会将块的执行限制为后续执行,这会降低性能。

正如您在评论中所说,您已切换到 同步 方法 Post 并通过设置 BoundedCapacity 限制了块的容量。应谨慎使用此变体,因为您需要检查它的 return 值,其中说明消息是否已被接受。

至于您担心在块内等待 async 方法 - 绝对没问题,应该像在其他 async 方法使用情况下一样完成。