如何获得 ActionBlock 的输入队列?
How can I gain access input queue of ActionBlock?
我正在传递一些 class 的 Actionblock 实例。如果我打电话
cancellationSource.Cancel();
然后处理将停止。但有些实例可以留在 ActionBlock 的输入队列中。我需要访问剩余的实例以释放一些资源。
我怎样才能实现这个目标?
如果您迫切需要一个带有公开输入缓冲区的 ActionBlock
,您可以尝试下面的自定义实现。它支持 ActionBlock
的所有 built-in 功能,还包括自定义 IEnumerable<T> InputQueue
属性。当 ActionBlockEx
在故障或取消状态下完成时,输入缓冲区不会被清空。
public class ActionBlockEx<T> : ITargetBlock<T>
{
private readonly ITargetBlock<object> _actionBlock;
private readonly Queue<T> _queue;
public ActionBlockEx(Func<T, Task> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (action == null) throw new ArgumentNullException(nameof(action));
_actionBlock = new ActionBlock<object>(_ =>
{
T item; lock (_queue) item = _queue.Dequeue();
return action(item);
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
_queue = new Queue<T>();
}
public ActionBlockEx(Action<T> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null) : this(
item => { action(item); return Task.CompletedTask; }, dataflowBlockOptions)
{
if (action == null) throw new ArgumentNullException(nameof(action));
}
public int InputCount { get { lock (_queue) return _queue.Count; } }
public IEnumerable<T> InputQueue { get { lock (_queue) return _queue.ToList(); } }
public Task Completion => _actionBlock.Completion;
public void Complete() => _actionBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T item, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ? new SourceProxy(source, this) : null;
lock (_queue)
{
var offerResult = _actionBlock.OfferMessage(header, null, sourceProxy,
consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted
&& (sourceProxy == null || !sourceProxy.ConsumeMessageInvoked))
{
_queue.Enqueue(item);
}
return offerResult;
}
}
private class SourceProxy : ISourceBlock<object>
{
private readonly ISourceBlock<T> _realSource;
private readonly ActionBlockEx<T> _realTarget;
public bool ConsumeMessageInvoked { get; private set; }
public SourceProxy(ISourceBlock<T> realSource, ActionBlockEx<T> realTarget)
{
_realSource = realSource;
_realTarget = realTarget;
}
object ISourceBlock<object>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<object> target, out bool messageConsumed)
{
this.ConsumeMessageInvoked = true;
lock (_realTarget._queue)
{
var item = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
if (messageConsumed) _realTarget._queue.Enqueue(item);
}
return null;
}
bool ISourceBlock<object>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<object> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}
void ISourceBlock<object>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<object> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<object>.LinkTo(ITargetBlock<object> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
}
此实现基于随虚拟 null
消息一起提供的内部 ActionBlock<object>
。它与链接 ISourceBlock
的通信被拦截,以便获取实际消息并将其存储在内部 Queue<T>
中。这种间接增加了一些开销(对象分配发生在每条收到的消息上),因此请谨慎使用此 class!
我正在传递一些 class 的 Actionblock 实例。如果我打电话
cancellationSource.Cancel();
然后处理将停止。但有些实例可以留在 ActionBlock 的输入队列中。我需要访问剩余的实例以释放一些资源。
我怎样才能实现这个目标?
如果您迫切需要一个带有公开输入缓冲区的 ActionBlock
,您可以尝试下面的自定义实现。它支持 ActionBlock
的所有 built-in 功能,还包括自定义 IEnumerable<T> InputQueue
属性。当 ActionBlockEx
在故障或取消状态下完成时,输入缓冲区不会被清空。
public class ActionBlockEx<T> : ITargetBlock<T>
{
private readonly ITargetBlock<object> _actionBlock;
private readonly Queue<T> _queue;
public ActionBlockEx(Func<T, Task> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (action == null) throw new ArgumentNullException(nameof(action));
_actionBlock = new ActionBlock<object>(_ =>
{
T item; lock (_queue) item = _queue.Dequeue();
return action(item);
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
_queue = new Queue<T>();
}
public ActionBlockEx(Action<T> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null) : this(
item => { action(item); return Task.CompletedTask; }, dataflowBlockOptions)
{
if (action == null) throw new ArgumentNullException(nameof(action));
}
public int InputCount { get { lock (_queue) return _queue.Count; } }
public IEnumerable<T> InputQueue { get { lock (_queue) return _queue.ToList(); } }
public Task Completion => _actionBlock.Completion;
public void Complete() => _actionBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T item, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ? new SourceProxy(source, this) : null;
lock (_queue)
{
var offerResult = _actionBlock.OfferMessage(header, null, sourceProxy,
consumeToAccept);
if (offerResult == DataflowMessageStatus.Accepted
&& (sourceProxy == null || !sourceProxy.ConsumeMessageInvoked))
{
_queue.Enqueue(item);
}
return offerResult;
}
}
private class SourceProxy : ISourceBlock<object>
{
private readonly ISourceBlock<T> _realSource;
private readonly ActionBlockEx<T> _realTarget;
public bool ConsumeMessageInvoked { get; private set; }
public SourceProxy(ISourceBlock<T> realSource, ActionBlockEx<T> realTarget)
{
_realSource = realSource;
_realTarget = realTarget;
}
object ISourceBlock<object>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<object> target, out bool messageConsumed)
{
this.ConsumeMessageInvoked = true;
lock (_realTarget._queue)
{
var item = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
if (messageConsumed) _realTarget._queue.Enqueue(item);
}
return null;
}
bool ISourceBlock<object>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<object> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}
void ISourceBlock<object>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<object> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<object>.LinkTo(ITargetBlock<object> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
}
此实现基于随虚拟 null
消息一起提供的内部 ActionBlock<object>
。它与链接 ISourceBlock
的通信被拦截,以便获取实际消息并将其存储在内部 Queue<T>
中。这种间接增加了一些开销(对象分配发生在每条收到的消息上),因此请谨慎使用此 class!