防止 BroadcastBlock 在 LinkTo 上发送缓冲消息
Prevent BroadcastBlock from sending buffered message on LinkTo
给定 BroadcastBlock
缓冲区中有一条消息,是否可以阻止该消息被发送到新的 linked 目标?例如:
static void Main(string[] args)
{
var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));
myBroadcastBlock.Post("Hello World!"); // No linked targets here.
myBroadcastBlock.LinkTo(myActionBlock); // Link a target.
// etc.
}
此代码将打印“Hello World”。基本上,BroadcastBlock
仍会在 .LinkTo
上将缓冲消息发送到 ActionBlock
,尽管消息已在 link 建立之前发布。
是否有内置的方法来防止这种行为?我只想将消息发送到当前的 link,而不是未来的。
使用内置 BroadcastBlock
class 无法实现此行为。它的行为是不可配置的。如果你迫切需要这种行为,你可以尝试下面的实现。它使用一个内部 BroadcastBlock<(T, long)>
和一个随着每条新消息递增的索引,以便在链接期间可以过滤掉当前活动的消息。
BroadcastBlockNewOnly
class 中有相当多的间接寻址,因为需要从 T
转换到 (T, long)
再返回到 T
.这使得 class 难以维护,而且效率不高。在收到的每条消息上都会分配一个新对象,从而为垃圾收集器创建更多工作,因此请谨慎使用此 class。
public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
private long _index;
public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
DataflowBlockOptions dataflowBlockOptions = null)
{
if (cloningFunction == null)
throw new ArgumentNullException(nameof(cloningFunction));
_broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
{
var (value, index) = entry;
return (cloningFunction(value), index);
}, dataflowBlockOptions ?? new DataflowBlockOptions());
}
public Task Completion => _broadcastBlock.Completion;
public void Complete() => _broadcastBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
if (target == null) throw new ArgumentNullException(nameof(target));
var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
}
private long GetNewIndex() => Interlocked.Increment(ref _index);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T value, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ?
new SourceProxy(source, this, GetNewIndex) : null;
return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
sourceProxy, consumeToAccept);
}
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<T> target, out bool messageConsumed)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
out messageConsumed);
return value;
}
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
return _broadcastBlock.ReserveMessage(header, targetProxy);
}
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
_broadcastBlock.ReleaseReservation(header, targetProxy);
}
private class LinkedTargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
private readonly long _indexLimit;
public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
long indexLimit)
{
_realTarget = realTarget;
_realSource = realSource;
_indexLimit = indexLimit;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
if (index <= _indexLimit) return DataflowMessageStatus.Declined;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => _realTarget.Complete();
void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
}
private class SourceProxy : ISourceBlock<(T, long)>
{
private readonly ISourceBlock<T> _realSource;
private readonly ITargetBlock<T> _realTarget;
private readonly Func<long> _getNewIndex;
public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
Func<long> getNewIndex)
{
_realSource = realSource;
_realTarget = realTarget;
_getNewIndex = getNewIndex;
}
(T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target, out bool messageConsumed)
{
var value = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
var newIndex = _getNewIndex();
return (value, newIndex);
}
bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}
void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
private class TargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
{
_realTarget = realTarget;
_realSource = realSource;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
}
}
给定 BroadcastBlock
缓冲区中有一条消息,是否可以阻止该消息被发送到新的 linked 目标?例如:
static void Main(string[] args)
{
var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));
myBroadcastBlock.Post("Hello World!"); // No linked targets here.
myBroadcastBlock.LinkTo(myActionBlock); // Link a target.
// etc.
}
此代码将打印“Hello World”。基本上,BroadcastBlock
仍会在 .LinkTo
上将缓冲消息发送到 ActionBlock
,尽管消息已在 link 建立之前发布。
是否有内置的方法来防止这种行为?我只想将消息发送到当前的 link,而不是未来的。
使用内置 BroadcastBlock
class 无法实现此行为。它的行为是不可配置的。如果你迫切需要这种行为,你可以尝试下面的实现。它使用一个内部 BroadcastBlock<(T, long)>
和一个随着每条新消息递增的索引,以便在链接期间可以过滤掉当前活动的消息。
BroadcastBlockNewOnly
class 中有相当多的间接寻址,因为需要从 T
转换到 (T, long)
再返回到 T
.这使得 class 难以维护,而且效率不高。在收到的每条消息上都会分配一个新对象,从而为垃圾收集器创建更多工作,因此请谨慎使用此 class。
public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
private long _index;
public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
DataflowBlockOptions dataflowBlockOptions = null)
{
if (cloningFunction == null)
throw new ArgumentNullException(nameof(cloningFunction));
_broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
{
var (value, index) = entry;
return (cloningFunction(value), index);
}, dataflowBlockOptions ?? new DataflowBlockOptions());
}
public Task Completion => _broadcastBlock.Completion;
public void Complete() => _broadcastBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
if (target == null) throw new ArgumentNullException(nameof(target));
var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
}
private long GetNewIndex() => Interlocked.Increment(ref _index);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T value, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ?
new SourceProxy(source, this, GetNewIndex) : null;
return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
sourceProxy, consumeToAccept);
}
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<T> target, out bool messageConsumed)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
out messageConsumed);
return value;
}
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
return _broadcastBlock.ReserveMessage(header, targetProxy);
}
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
_broadcastBlock.ReleaseReservation(header, targetProxy);
}
private class LinkedTargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
private readonly long _indexLimit;
public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
long indexLimit)
{
_realTarget = realTarget;
_realSource = realSource;
_indexLimit = indexLimit;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
if (index <= _indexLimit) return DataflowMessageStatus.Declined;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => _realTarget.Complete();
void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
}
private class SourceProxy : ISourceBlock<(T, long)>
{
private readonly ISourceBlock<T> _realSource;
private readonly ITargetBlock<T> _realTarget;
private readonly Func<long> _getNewIndex;
public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
Func<long> getNewIndex)
{
_realSource = realSource;
_realTarget = realTarget;
_getNewIndex = getNewIndex;
}
(T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target, out bool messageConsumed)
{
var value = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
var newIndex = _getNewIndex();
return (value, newIndex);
}
bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}
void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
private class TargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
{
_realTarget = realTarget;
_realSource = realSource;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
}
}