具有延迟的 TPL 数据流队列

TPL DataFlow Queue with Postponement

我正在使用 ActionBlock.

同时处理一个队列

这里的一个问题是,在处理队列中的一项时,我可能想等到队列中的另一项的处理满足依赖性。

我想我应该可以使用 TPL DataFlow 库来完成此操作,包括链接、推迟和推迟发布,但我不确定要使用什么结构。

在伪代码中:

public class Item 
{
    public string Name { get; set; }
    public List<string> DependsOn = new List<string>();
}

ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => {
    if (!HasActionBlockProcessedAllDependencies(o.DependsOn)) 
    {
       // enqueue a callback when ALL dependencies have been completed
    } 
    else 
    {
        DoWork(o);
    }
},
new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
});

var items = new[] 
{
    new Item { Name = "Apple", DependsOn = { "Pear" } },
    new Item { Name = "Pear" }
}

我不确定这是否对您有帮助,但这里有一个自定义 DependencyTransformBlock class,它知道接收到的项目之间的依赖关系,并且仅在接收到项目后才处理每个项目依赖项已成功处理。此自定义块支持普通 TransformBlock, except from the EnsureOrdered 选项的所有 built-in 功能。

此 class 的构造函数接受用于检索每个项目的键的 Func<TInput, TKey> lambda 和用于检索其依赖项的 Func<TInput, IReadOnlyCollection<TKey>> lambda。密钥应该是唯一的。如果发现重复键,块将失败完成。

如果项目之间存在循环依赖关系,受影响的项目将保持未处理状态。 属性 TInput[] Unprocessed 允许在块完成后检索未处理的项目。如果未提供项目的任何依赖项,项目也可以保持未处理状态。

public class DependencyTransformBlock<TInput, TKey, TOutput> :
    ITargetBlock<TInput>, ISourceBlock<TOutput>
{
    private readonly ITargetBlock<TInput> _inputBlock;
    private readonly IPropagatorBlock<Item, TOutput> _transformBlock;

    private readonly object _locker = new object();
    private readonly Dictionary<TKey, Item> _items;

    private int _pendingCount = 1;
    // The initial 1 represents the completion of the _inputBlock

    private class Item
    {
        public TKey Key;
        public TInput Input;
        public bool HasInput;
        public bool IsCompleted;
        public HashSet<Item> Dependencies;
        public HashSet<Item> Dependents;

        public Item(TKey key) => Key = key;
    }

    public DependencyTransformBlock(
        Func<TInput, Task<TOutput>> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null)
    {
        if (transform == null)
            throw new ArgumentNullException(nameof(transform));
        if (keySelector == null)
            throw new ArgumentNullException(nameof(keySelector));
        if (dependenciesSelector == null)
            throw new ArgumentNullException(nameof(dependenciesSelector));

        dataflowBlockOptions =
            dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
        keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;

        _items = new Dictionary<TKey, Item>(keyComparer);

        _inputBlock = new ActionBlock<TInput>(async input =>
        {
            var key = keySelector(input);
            var dependencyKeys = dependenciesSelector(input);
            bool isReadyForProcessing = true;
            Item item;
            lock (_locker)
            {
                if (!_items.TryGetValue(key, out item))
                {
                    item = new Item(key);
                    _items.Add(key, item);
                }
                if (item.HasInput)
                    throw new InvalidOperationException($"Duplicate key ({key}).");
                item.Input = input;
                item.HasInput = true;

                if (dependencyKeys != null && dependencyKeys.Count > 0)
                {
                    item.Dependencies = new HashSet<Item>();
                    foreach (var dependencyKey in dependencyKeys)
                    {
                        if (!_items.TryGetValue(dependencyKey, out var dependency))
                        {
                            dependency = new Item(dependencyKey);
                            _items.Add(dependencyKey, dependency);
                        }
                        if (!dependency.IsCompleted)
                        {
                            item.Dependencies.Add(dependency);
                            if (dependency.Dependents == null)
                                dependency.Dependents = new HashSet<Item>();
                            dependency.Dependents.Add(item);
                        }
                    }
                    isReadyForProcessing = item.Dependencies.Count == 0;
                }
                if (isReadyForProcessing) _pendingCount++;
            }
            if (isReadyForProcessing)
            {
                await _transformBlock.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = 1
        });

        var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        });

        _transformBlock = new TransformBlock<Item, TOutput>(async item =>
        {
            try
            {
                TInput input;
                lock (_locker)
                {
                    Debug.Assert(item.HasInput && !item.IsCompleted);
                    input = item.Input;
                }
                var result = await transform(input).ConfigureAwait(false);
                lock (_locker)
                {
                    item.IsCompleted = true;
                    if (item.Dependents != null)
                    {
                        foreach (var dependent in item.Dependents)
                        {
                            Debug.Assert(dependent.Dependencies != null);
                            var removed = dependent.Dependencies.Remove(item);
                            Debug.Assert(removed);
                            if (dependent.HasInput
                                && dependent.Dependencies.Count == 0)
                            {
                                middleBuffer.Post(dependent);
                                _pendingCount++;
                            }
                        }
                    }
                    item.Input = default; // Cleanup
                    item.Dependencies = null;
                    item.Dependents = null;
                }
                return result;
            }
            finally
            {
                lock (_locker)
                {
                    _pendingCount--;
                    if (_pendingCount == 0) middleBuffer.Complete();
                }
            }
        }, dataflowBlockOptions);

        middleBuffer.LinkTo(_transformBlock);

        PropagateCompletion(_inputBlock, middleBuffer,
            condition: () => { lock (_locker) return --_pendingCount == 0; });
        PropagateCompletion(middleBuffer, _transformBlock);
        PropagateFailure(_transformBlock, middleBuffer);
        PropagateFailure(_transformBlock, _inputBlock);
    }

    // Constructor with synchronous lambda
    public DependencyTransformBlock(
        Func<TInput, TOutput> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null) : this(
            input => Task.FromResult(transform(input)),
            keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
    {
        if (transform == null) throw new ArgumentNullException(nameof(transform));
    }

    public TInput[] Unprocessed
    {
        get
        {
            lock (_locker) return _items.Values
                .Where(item => item.HasInput && !item.IsCompleted)
                .Select(item => item.Input)
                .ToArray();
        }
    }

    public Task Completion => _transformBlock.Completion;
    public void Complete() => _inputBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
    }

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target, out bool messageConsumed)
    {
        return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
    }

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        return _transformBlock.ReserveMessage(header, target);
    }

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    {
        _transformBlock.ReleaseReservation(header, target);
    }

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
    {
        return _transformBlock.LinkTo(target, linkOptions);
    }

    private async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<bool> condition = null)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
        else
            if (condition == null || condition()) target.Complete();
    }

    private async void PropagateFailure(IDataflowBlock source,
        IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
    }
}

用法示例:

var block = new DependencyTransformBlock<Item, string, Item>(item =>
{
    DoWork(item);
    return item;
},
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
},
keyComparer: StringComparer.OrdinalIgnoreCase);

//...

block.LinkTo(DataflowBlock.NullTarget<Item>());

在此示例中,块链接到 NullTarget 以丢弃其输出,因此它本质上变成了 ActionBlock 等价物。