TPL 数据流广播块丢弃最后一条消息
TPL Dataflow Broadcastblock discards last message
我有一个很简单的问题。我需要一种方法来轻松地对需要一些时间的消息执行一些处理。在处理过程中,可能会输入新的请求,但可以丢弃除最后一个请求之外的所有请求。
所以我认为 TPL Broadcastblock
应该这样做,查看文档和帖子,例如 StackExchange。我创建了以下解决方案并为其添加了一些单元测试,但在单元测试中,有时最后一项不会发送。
这不是我所期望的。如果它应该丢弃任何东西,我会说它应该丢弃第一个项目,因为如果它不能处理消息,它应该覆盖它的 1 缓冲区。谁能看到它是什么?
任何帮助将不胜感激!
这是块的代码:
/// <summary>
/// This block will take items and perform the specified action on it. Any incoming messages while the action is being performed
/// will be discarded.
/// </summary>
public class DiscardWhileBusyActionBlock<T> : ITargetBlock<T>
{
private readonly BroadcastBlock<T> broadcastBlock;
private readonly ActionBlock<T> actionBlock;
/// <summary>
/// Initializes a new instance of the <see cref="DiscardWhileBusyActionBlock{T}"/> class.
/// Constructs a SyncFilterTarget{TInput}.
/// </summary>
/// <param name="actionToPerform">Thing to do.</param>
public DiscardWhileBusyActionBlock(Action<T> actionToPerform)
{
if (actionToPerform == null)
{
throw new ArgumentNullException(nameof(actionToPerform));
}
this.broadcastBlock = new BroadcastBlock<T>(item => item);
this.actionBlock = new ActionBlock<T>(actionToPerform, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
this.broadcastBlock.LinkTo(this.actionBlock);
this.broadcastBlock.Completion.ContinueWith(task => this.actionBlock.Complete());
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return ((ITargetBlock<T>)this.broadcastBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public void Complete()
{
this.broadcastBlock.Complete();
}
public void Fault(Exception exception)
{
((ITargetBlock<T>)this.broadcastBlock).Fault(exception);
}
public Task Completion => this.actionBlock.Completion;
}
下面是测试代码:
[TestClass]
public class DiscardWhileBusyActionBlockTest
{
[TestMethod]
public void PostToConnectedBuffer_ActionNotBusy_MessageConsumed()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
buffer.Post(1);
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
[TestMethod]
public void PostToConnectedBuffer_ActionBusy_MessagesConsumedWhenActionBecomesAvailable()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
actionPerformer.SetBusy();
// 1st message will set the actionperformer to busy, 2nd message should be sent when
// it becomes available.
buffer.Post(1);
buffer.Post(2);
actionPerformer.SetAvailable();
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1, 2 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
[TestMethod]
public void PostToConnectedBuffer_ActionBusy_DiscardMessagesInBetweenAndProcessOnlyLastMessage()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
actionPerformer.SetBusy();
buffer.Post(1);
buffer.Post(2);
buffer.Post(3);
buffer.Post(4);
buffer.Post(5);
actionPerformer.SetAvailable();
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1, 5 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
private static void WaitForCompletion(IDataflowBlock source, IDataflowBlock target)
{
source.Complete();
target.Completion.Wait(TimeSpan.FromSeconds(1));
}
private static BufferBlock<int> SetupBuffer(ITargetBlock<int> block)
{
var buffer = new BufferBlock<int>();
buffer.LinkTo(block);
buffer.Completion.ContinueWith(task => block.Complete());
return buffer;
}
private class ActionPerformer
{
private readonly ManualResetEvent resetEvent = new ManualResetEvent(true);
public List<int> LastReceivedMessage { get; } = new List<int>();
public void Perform(int message)
{
this.resetEvent.WaitOne(TimeSpan.FromSeconds(3));
this.LastReceivedMessage.Add(message);
}
public void SetBusy()
{
this.resetEvent.Reset();
}
public void SetAvailable()
{
this.resetEvent.Set();
}
}
}
当你将操作块的 BoundedCapacity
级别设置为 1
时,这意味着,如果它进行处理,并且它的队列中已经有项目,它将丢弃该消息,这将 运行 超出范围。所以基本上发生的事情是你的块完成它的工作,在缓冲区已满时拒绝新消息。广播块完成后,将整个消息发送给收件人,并调用 Completion
,完成整个管道。
您需要检查返回的 Post
布尔值以获取最后一条消息,或者更可能的是,将最后一条消息存储在某个变量中,确保它将进入管道。看来您最好不要使用 BroadcastBlock
, as its purpose to provide a copy of the message to the number of linked blocks, and just write your logic by yourself. Maybe you can use a simple BufferBlock
。
更新: OfferMessage
方法也确实提供了有关所提供消息的信息。我认为您根本不需要缓冲块,因为您必须处理管道的非默认逻辑。拥有一个像 _lastMessage
这样的字段更容易,在其中存储最后一个值,并在 actionBlock
接受请求时将其删除。您甚至可以完全删除数据流依赖性,因为您所做的只是为请求调用方法。
旁注:您可以link blocks with completion propagation在选项中设置:
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
this.broadcastBlock.LinkTo(this.actionBlock, linkOptions);
这可以删除一些使用 potentially dangerous ContinueWith
的代码。如果您需要异步行为,您也可以 await broadcastBlock.SendAsync()
而不是 Post
。
我有一个很简单的问题。我需要一种方法来轻松地对需要一些时间的消息执行一些处理。在处理过程中,可能会输入新的请求,但可以丢弃除最后一个请求之外的所有请求。
所以我认为 TPL Broadcastblock
应该这样做,查看文档和帖子,例如 StackExchange。我创建了以下解决方案并为其添加了一些单元测试,但在单元测试中,有时最后一项不会发送。
这不是我所期望的。如果它应该丢弃任何东西,我会说它应该丢弃第一个项目,因为如果它不能处理消息,它应该覆盖它的 1 缓冲区。谁能看到它是什么?
任何帮助将不胜感激!
这是块的代码:
/// <summary>
/// This block will take items and perform the specified action on it. Any incoming messages while the action is being performed
/// will be discarded.
/// </summary>
public class DiscardWhileBusyActionBlock<T> : ITargetBlock<T>
{
private readonly BroadcastBlock<T> broadcastBlock;
private readonly ActionBlock<T> actionBlock;
/// <summary>
/// Initializes a new instance of the <see cref="DiscardWhileBusyActionBlock{T}"/> class.
/// Constructs a SyncFilterTarget{TInput}.
/// </summary>
/// <param name="actionToPerform">Thing to do.</param>
public DiscardWhileBusyActionBlock(Action<T> actionToPerform)
{
if (actionToPerform == null)
{
throw new ArgumentNullException(nameof(actionToPerform));
}
this.broadcastBlock = new BroadcastBlock<T>(item => item);
this.actionBlock = new ActionBlock<T>(actionToPerform, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
this.broadcastBlock.LinkTo(this.actionBlock);
this.broadcastBlock.Completion.ContinueWith(task => this.actionBlock.Complete());
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return ((ITargetBlock<T>)this.broadcastBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public void Complete()
{
this.broadcastBlock.Complete();
}
public void Fault(Exception exception)
{
((ITargetBlock<T>)this.broadcastBlock).Fault(exception);
}
public Task Completion => this.actionBlock.Completion;
}
下面是测试代码:
[TestClass]
public class DiscardWhileBusyActionBlockTest
{
[TestMethod]
public void PostToConnectedBuffer_ActionNotBusy_MessageConsumed()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
buffer.Post(1);
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
[TestMethod]
public void PostToConnectedBuffer_ActionBusy_MessagesConsumedWhenActionBecomesAvailable()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
actionPerformer.SetBusy();
// 1st message will set the actionperformer to busy, 2nd message should be sent when
// it becomes available.
buffer.Post(1);
buffer.Post(2);
actionPerformer.SetAvailable();
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1, 2 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
[TestMethod]
public void PostToConnectedBuffer_ActionBusy_DiscardMessagesInBetweenAndProcessOnlyLastMessage()
{
var actionPerformer = new ActionPerformer();
var block = new DiscardWhileBusyActionBlock<int>(actionPerformer.Perform);
var buffer = DiscardWhileBusyActionBlockTest.SetupBuffer(block);
actionPerformer.SetBusy();
buffer.Post(1);
buffer.Post(2);
buffer.Post(3);
buffer.Post(4);
buffer.Post(5);
actionPerformer.SetAvailable();
DiscardWhileBusyActionBlockTest.WaitForCompletion(buffer, block);
var expectedMessages = new[] { 1, 5 };
actionPerformer.LastReceivedMessage.Should().BeEquivalentTo(expectedMessages);
}
private static void WaitForCompletion(IDataflowBlock source, IDataflowBlock target)
{
source.Complete();
target.Completion.Wait(TimeSpan.FromSeconds(1));
}
private static BufferBlock<int> SetupBuffer(ITargetBlock<int> block)
{
var buffer = new BufferBlock<int>();
buffer.LinkTo(block);
buffer.Completion.ContinueWith(task => block.Complete());
return buffer;
}
private class ActionPerformer
{
private readonly ManualResetEvent resetEvent = new ManualResetEvent(true);
public List<int> LastReceivedMessage { get; } = new List<int>();
public void Perform(int message)
{
this.resetEvent.WaitOne(TimeSpan.FromSeconds(3));
this.LastReceivedMessage.Add(message);
}
public void SetBusy()
{
this.resetEvent.Reset();
}
public void SetAvailable()
{
this.resetEvent.Set();
}
}
}
当你将操作块的 BoundedCapacity
级别设置为 1
时,这意味着,如果它进行处理,并且它的队列中已经有项目,它将丢弃该消息,这将 运行 超出范围。所以基本上发生的事情是你的块完成它的工作,在缓冲区已满时拒绝新消息。广播块完成后,将整个消息发送给收件人,并调用 Completion
,完成整个管道。
您需要检查返回的 Post
布尔值以获取最后一条消息,或者更可能的是,将最后一条消息存储在某个变量中,确保它将进入管道。看来您最好不要使用 BroadcastBlock
, as its purpose to provide a copy of the message to the number of linked blocks, and just write your logic by yourself. Maybe you can use a simple BufferBlock
。
更新: OfferMessage
方法也确实提供了有关所提供消息的信息。我认为您根本不需要缓冲块,因为您必须处理管道的非默认逻辑。拥有一个像 _lastMessage
这样的字段更容易,在其中存储最后一个值,并在 actionBlock
接受请求时将其删除。您甚至可以完全删除数据流依赖性,因为您所做的只是为请求调用方法。
旁注:您可以link blocks with completion propagation在选项中设置:
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
this.broadcastBlock.LinkTo(this.actionBlock, linkOptions);
这可以删除一些使用 potentially dangerous ContinueWith
的代码。如果您需要异步行为,您也可以 await broadcastBlock.SendAsync()
而不是 Post
。