未调用 TPL BufferBlock Consume 方法
TPL BufferBlock Consume Method Not Being Called
我想使用连续运行的 BufferBlock 实现 consumer/producer 模式,类似于问题 here and the code here。
我尝试使用像 OP 这样的 ActionBlock,但如果缓冲区已满并且新消息在其队列中,那么新消息永远不会添加到 ConcurrentDictionary _queue。
在下面的代码中,当使用此调用将新消息添加到缓冲区块时,永远不会调用 ConsumeAsync 方法:_messageBufferBlock.SendAsync(message)
如何更正下面的代码,以便每次使用 _messageBufferBlock.SendAsync(message)
添加新消息时调用 ConsumeAsync 方法?
public class PriorityMessageQueue
{
private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
private volatile BufferBlock<MyMessage> _messageBufferBlock;
private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
private int _dictionaryKey;
public PriorityMessageQueue()
{
_initializingTask = Init();
}
public async Task<bool> EnqueueAsync(MyMessage message)
{
return await _messageBufferBlock.SendAsync(message);
}
private async Task<bool> ConsumeAsync()
{
try
{
// This code does not fire when a new message is added to the buffereblock
while (await _messageBufferBlock.OutputAvailableAsync())
{
// A message object is never received from the bufferblock
var message = await _messageBufferBlock.ReceiveAsync();
}
return true;
}
catch (Exception ex)
{
return false;
}
}
private async Task<bool> Init()
{
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
{
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageBufferBlock = new BufferBlock<MyMessage>();
_messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});
return await ConsumeAsync();
}
}
编辑
我删除了所有多余的代码并添加了注释。
我仍然不完全确定你想要完成什么,但我会尽力为你指明正确的方向。示例中的大部分代码并非绝对必要。
I need to know when a new message arrives
如果这是您唯一的要求,那么我假设您只需要在传入新消息时 运行 一些任意代码。在数据流中执行此操作的最简单方法是使用 TransformBlock
并将该块设置为管道中的初始接收器。每个块都有自己的缓冲区,因此除非您需要另一个缓冲区,否则您可以将其省略。
public class PriorityMessageQueue {
private TransformBlock<MyMessage, MyMessage> _messageReciever;
public PriorityMessageQueue() {
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => {
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageReciever = new TransformBlock<MyMessage, MyMessage>(msg => NewMessageRecieved(msg), executionDataflowBlockOptions);
_messageReciever.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task<bool> EnqueueAsync(MyMessage message) {
return await _messageReciever.SendAsync(message);
}
private MyMessage NewMessageRecieved(MyMessage message) {
//do something when a new message arrives
//pass the message along in the pipeline
return message;
}
private void SetMessagePriority(MyMessage message) {
//Handle a message
}
}
当然,您的另一个选择是在从 SendAsync
返回任务之前立即在 EnqueAsync
内执行您需要的任何操作,但是 TransformBlock
为您提供了额外的灵活性.
我想使用连续运行的 BufferBlock 实现 consumer/producer 模式,类似于问题 here and the code here。
我尝试使用像 OP 这样的 ActionBlock,但如果缓冲区已满并且新消息在其队列中,那么新消息永远不会添加到 ConcurrentDictionary _queue。
在下面的代码中,当使用此调用将新消息添加到缓冲区块时,永远不会调用 ConsumeAsync 方法:_messageBufferBlock.SendAsync(message)
如何更正下面的代码,以便每次使用 _messageBufferBlock.SendAsync(message)
添加新消息时调用 ConsumeAsync 方法?
public class PriorityMessageQueue
{
private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
private volatile BufferBlock<MyMessage> _messageBufferBlock;
private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
private int _dictionaryKey;
public PriorityMessageQueue()
{
_initializingTask = Init();
}
public async Task<bool> EnqueueAsync(MyMessage message)
{
return await _messageBufferBlock.SendAsync(message);
}
private async Task<bool> ConsumeAsync()
{
try
{
// This code does not fire when a new message is added to the buffereblock
while (await _messageBufferBlock.OutputAvailableAsync())
{
// A message object is never received from the bufferblock
var message = await _messageBufferBlock.ReceiveAsync();
}
return true;
}
catch (Exception ex)
{
return false;
}
}
private async Task<bool> Init()
{
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
{
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageBufferBlock = new BufferBlock<MyMessage>();
_messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});
return await ConsumeAsync();
}
}
编辑 我删除了所有多余的代码并添加了注释。
我仍然不完全确定你想要完成什么,但我会尽力为你指明正确的方向。示例中的大部分代码并非绝对必要。
I need to know when a new message arrives
如果这是您唯一的要求,那么我假设您只需要在传入新消息时 运行 一些任意代码。在数据流中执行此操作的最简单方法是使用 TransformBlock
并将该块设置为管道中的初始接收器。每个块都有自己的缓冲区,因此除非您需要另一个缓冲区,否则您可以将其省略。
public class PriorityMessageQueue {
private TransformBlock<MyMessage, MyMessage> _messageReciever;
public PriorityMessageQueue() {
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => {
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageReciever = new TransformBlock<MyMessage, MyMessage>(msg => NewMessageRecieved(msg), executionDataflowBlockOptions);
_messageReciever.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task<bool> EnqueueAsync(MyMessage message) {
return await _messageReciever.SendAsync(message);
}
private MyMessage NewMessageRecieved(MyMessage message) {
//do something when a new message arrives
//pass the message along in the pipeline
return message;
}
private void SetMessagePriority(MyMessage message) {
//Handle a message
}
}
当然,您的另一个选择是在从 SendAsync
返回任务之前立即在 EnqueAsync
内执行您需要的任何操作,但是 TransformBlock
为您提供了额外的灵活性.