防止 BroadcastBlock 在 LinkTo 上发送缓冲消息

Prevent BroadcastBlock from sending buffered message on LinkTo

给定 BroadcastBlock 缓冲区中有一条消息,是否可以阻止该消息被发送到新的 linked 目标?例如:

static void Main(string[] args)
{
    var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
    var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));

    myBroadcastBlock.Post("Hello World!"); // No linked targets here.

    myBroadcastBlock.LinkTo(myActionBlock); // Link a target.

    // etc.
}

此代码将打印“Hello World”。基本上,BroadcastBlock 仍会在 .LinkTo 上将缓冲消息发送到 ActionBlock,尽管消息已在 link 建立之前发布。

是否有内置的方法来防止这种行为?我只想将消息发送到当前的 link,而不是未来的。

我正在使用 System.Threading.Tasks.Dataflow 4.11.1

使用内置 BroadcastBlock class 无法实现此行为。它的行为是不可配置的。如果你迫切需要这种行为,你可以尝试下面的实现。它使用一个内部 BroadcastBlock<(T, long)> 和一个随着每条新消息递增的索引,以便在链接期间可以过滤掉当前活动的消息。

BroadcastBlockNewOnly class 中有相当多的间接寻址,因为需要从 T 转换到 (T, long) 再返回到 T.这使得 class 难以维护,而且效率不高。在收到的每条消息上都会分配一个新对象,从而为垃圾收集器创建更多工作,因此请谨慎使用此 class。

public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
    private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
    private long _index;

    public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        if (cloningFunction == null)
            throw new ArgumentNullException(nameof(cloningFunction));
        _broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
        {
            var (value, index) = entry;
            return (cloningFunction(value), index);
        }, dataflowBlockOptions ?? new DataflowBlockOptions());
    }

    public Task Completion => _broadcastBlock.Completion;
    public void Complete() => _broadcastBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);

    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        if (target == null) throw new ArgumentNullException(nameof(target));
        var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
        var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
        return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
    }

    private long GetNewIndex() => Interlocked.Increment(ref _index);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
        T value, ISourceBlock<T> source, bool consumeToAccept)
    {
        var sourceProxy = source != null ?
            new SourceProxy(source, this, GetNewIndex) : null;
        return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
            sourceProxy, consumeToAccept);
    }

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<T> target, out bool messageConsumed)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
            out messageConsumed);
        return value;
    }

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<T> target)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        return _broadcastBlock.ReserveMessage(header, targetProxy);
    }

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<T> target)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        _broadcastBlock.ReleaseReservation(header, targetProxy);
    }

    private class LinkedTargetProxy : ITargetBlock<(T, long)>
    {
        private readonly ITargetBlock<T> _realTarget;
        private readonly ISourceBlock<T> _realSource;
        private readonly long _indexLimit;

        public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
            long indexLimit)
        {
            _realTarget = realTarget;
            _realSource = realSource;
            _indexLimit = indexLimit;
        }

        DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
            DataflowMessageHeader header, (T, long) messageValue,
            ISourceBlock<(T, long)> source, bool consumeToAccept)
        {
            var (value, index) = messageValue;
            if (index <= _indexLimit) return DataflowMessageStatus.Declined;
            return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => _realTarget.Complete();
        void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
    }

    private class SourceProxy : ISourceBlock<(T, long)>
    {
        private readonly ISourceBlock<T> _realSource;
        private readonly ITargetBlock<T> _realTarget;
        private readonly Func<long> _getNewIndex;

        public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
            Func<long> getNewIndex)
        {
            _realSource = realSource;
            _realTarget = realTarget;
            _getNewIndex = getNewIndex;
        }

        (T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target, out bool messageConsumed)
        {
            var value = _realSource.ConsumeMessage(header, _realTarget,
                out messageConsumed);
            var newIndex = _getNewIndex();
            return (value, newIndex);
        }

        bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target)
        {
            return _realSource.ReserveMessage(header, _realTarget);
        }

        void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target)
        {
            _realSource.ReleaseReservation(header, _realTarget);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
        IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
            DataflowLinkOptions linkOptions) => throw new NotSupportedException();
    }

    private class TargetProxy : ITargetBlock<(T, long)>
    {
        private readonly ITargetBlock<T> _realTarget;
        private readonly ISourceBlock<T> _realSource;

        public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
        {
            _realTarget = realTarget;
            _realSource = realSource;
        }

        DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
            DataflowMessageHeader header, (T, long) messageValue,
            ISourceBlock<(T, long)> source, bool consumeToAccept)
        {
            var (value, index) = messageValue;
            return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
    }

}