如何并行读取队列中的消息?
How can I read messages from a queue in parallel?
情况
我们有一个消息队列。我们希望并行处理消息并限制同时处理的消息数。
我们下面的试用代码确实会并行处理消息,但它只会在前一个处理完成后才开始新的一批处理。我们想在任务完成后重新启动它们。
换句话说:只要消息队列不为空,最大数量的任务就应该始终处于活动状态。
试用码
static string queue = @".\Private$\concurrenttest";
private static void Process(CancellationToken token)
{
Task.Factory.StartNew(async () =>
{
while (true)
{
IEnumerable<Task> consumerTasks = ConsumerTasks();
await Task.WhenAll(consumerTasks);
await PeekAsync(new MessageQueue(queue));
}
});
}
private static IEnumerable<Task> ConsumerTasks()
{
for (int i = 0; i < 15; i++)
{
Command1 message;
try
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
Message msg = msMq.Receive();
message = (Command1)msg.Body;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
yield break; // nothing in queue
else throw;
}
yield return Task.Run(() =>
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
});
}
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
您有一组要处理的东西。
您为正在处理的事物创建另一个集合(这可能是您的任务对象或引用任务的某种项目)。
你创建了一个循环,只要你有工作要做就会重复。也就是说,工作项正在等待启动,或者您仍有正在处理的工作项。
在循环开始时,您可以使用任意数量的任务填充您的活动任务集合并发 运行 并在添加它们时启动它们。
你让事情 运行 暂时(比如 Thread.Sleep(10);)。
您创建了一个内部循环来检查所有已启动的任务是否已完成。如果一个已完成,您将其删除并报告结果或做任何适当的事情。
就是这样。在下一轮,外循环的上半部分将向 运行ning 任务集合添加任务,直到数量等于您设置的最大值,使您的正在进行的工作集合保持完整。
您可能希望在工作线程上执行所有这些操作并在循环中监视取消请求。
.NET 中的任务库用于并行执行多个任务。虽然有一些方法可以限制活动任务的数量,但图书馆本身会根据计算机限制活动任务的数量CPU。
第一个需要回答的问题是为什么需要再创建一个限制?如果任务库的限制没问题,那你就可以一直创建任务,依靠任务库去执行,性能不错。
如果这没问题,那么一旦您从 MSMQ 收到一条消息,就启动一个任务来处理该消息,跳过等待(WhenAll 调用),重新开始等待下一条消息。
您可以使用自定义任务计划程序来限制并发任务的数量。有关 MSDN 的更多信息:https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx.
我的同事提出了以下解决方案。此解决方案有效,但我会在 Code Review.
上审核此代码
根据给出的答案和我们自己的一些研究,我们找到了解决方案。我们正在使用 SemaphoreSlim
来限制并行任务的数量。
static string queue = @".\Private$\concurrenttest";
private static async Task Process(CancellationToken token)
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
SemaphoreSlim s = new SemaphoreSlim(15, 15);
while (true)
{
await s.WaitAsync();
await PeekAsync(msMq);
Command1 message = await ReceiveAsync(msMq);
Task.Run(async () =>
{
try
{
await HandleMessage(message);
}
catch (Exception)
{
// Exception handling
}
s.Release();
});
}
}
private static Task HandleMessage(Command1 message)
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
return Task.FromResult(1);
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
public class Command1
{
public int id { get; set; }
public string name { get; set; }
}
private static async Task<Command1> ReceiveAsync(MessageQueue msMq)
{
var receiveAsync = await Task.Factory.FromAsync<Message>(msMq.BeginReceive(), msMq.EndPeek);
return (Command1)receiveAsync.Body;
}
您应该考虑为此使用 Microsoft 的 Reactive Framework。
您的代码可能如下所示:
var query =
from command1 in FromQueue<Command1>(queue)
from text in Observable.Start(() =>
{
Thread.Sleep(1000);
return "id: " + command1.id + ", name: " + command1.name;
})
select text;
var subscription =
query
.Subscribe(text => Console.WriteLine(text));
这会并行执行所有处理,并确保处理正确分布在所有内核中。当一个值结束时,另一个值开始。
要取消订阅,只需调用 subscription.Dispose()
。
FromQueue
的代码是:
static IObservable<T> FromQueue<T>(string serverQueue)
{
return Observable.Create<T>(observer =>
{
var responseQueue = Environment.MachineName + "\Private$\" + Guid.NewGuid().ToString();
var queue = MessageQueue.Create(responseQueue);
var frm = new System.Messaging.BinaryMessageFormatter();
var srv = new MessageQueue(serverQueue);
srv.Formatter = frm;
queue.Formatter = frm;
srv.Send("S " + responseQueue);
var loop = NewThreadScheduler.Default.ScheduleLongRunning(cancel =>
{
while (!cancel.IsDisposed)
{
var msg = queue.Receive();
observer.OnNext((T)msg.Body);
}
});
return new CompositeDisposable(
loop,
Disposable.Create(() =>
{
srv.Send("D " + responseQueue);
MessageQueue.Delete(responseQueue);
})
);
});
}
只需 NuGet "Rx-Main" 即可获取位。
为了限制并发你可以这样做:
int maxConcurrent = 2;
var query =
FromQueue<Command1>(queue)
.Select(command1 => Observable.Start(() =>
{
Thread.Sleep(1000);
return "id: " + command1.id + ", name: " + command1.name;
}))
.Merge(maxConcurrent);
编辑
我花了很多时间思考泵的可靠性 - 特别是如果从 MessageQueue
收到消息,取消变得棘手 - 所以我提供了两种终止队列的方法:
- 向
CancellationToken
发出信号以尽快停止管道,这可能会导致消息丢失。
- 调用
MessagePump.Stop()
终止泵,但允许在 MessagePump.Completion
任务转换为 RanToCompletion
之前完全处理所有已从队列中取出的消息。
该解决方案使用 TPL 数据流(NuGet:Microsoft.Tpl.Dataflow)。
完整实施:
using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Whosebug.Q34437298
{
/// <summary>
/// Pumps the message queue and processes messages in parallel.
/// </summary>
public sealed class MessagePump
{
/// <summary>
/// Creates a <see cref="MessagePump"/> and immediately starts pumping.
/// </summary>
public static MessagePump Run(
MessageQueue messageQueue,
Func<Message, Task> processMessage,
int maxDegreeOfParallelism,
CancellationToken ct = default(CancellationToken))
{
if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue));
if (processMessage == null) throw new ArgumentNullException(nameof(processMessage));
if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
ct.ThrowIfCancellationRequested();
return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct);
}
private readonly TaskCompletionSource<bool> _stop = new TaskCompletionSource<bool>();
/// <summary>
/// <see cref="Task"/> which completes when this instance
/// stops due to a <see cref="Stop"/> or cancellation request.
/// </summary>
public Task Completion { get; }
/// <summary>
/// Maximum number of parallel message processors.
/// </summary>
public int MaxDegreeOfParallelism { get; }
/// <summary>
/// <see cref="MessageQueue"/> that is pumped by this instance.
/// </summary>
public MessageQueue MessageQueue { get; }
/// <summary>
/// Creates a new <see cref="MessagePump"/> instance.
/// </summary>
private MessagePump(MessageQueue messageQueue, Func<Message, Task> processMessage, int maxDegreeOfParallelism, CancellationToken ct)
{
MessageQueue = messageQueue;
MaxDegreeOfParallelism = maxDegreeOfParallelism;
// Kick off the loop.
Completion = RunAsync(processMessage, ct);
}
/// <summary>
/// Soft-terminates the pump so that no more messages will be pumped.
/// Any messages already removed from the message queue will be
/// processed before this instance fully completes.
/// </summary>
public void Stop()
{
// Multiple calls to Stop are fine.
_stop.TrySetResult(true);
}
/// <summary>
/// Pump implementation.
/// </summary>
private async Task RunAsync(Func<Message, Task> processMessage, CancellationToken ct = default(CancellationToken))
{
using (CancellationTokenSource producerCTS = ct.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(ct)
: new CancellationTokenSource())
{
// This CancellationToken will either be signaled
// externally, or if our consumer errors.
ct = producerCTS.Token;
// Handover between producer and consumer.
DataflowBlockOptions bufferOptions = new DataflowBlockOptions {
// There is no point in dequeuing more messages than we can process,
// so we'll throttle the producer by limiting the buffer capacity.
BoundedCapacity = MaxDegreeOfParallelism,
CancellationToken = ct
};
BufferBlock<Message> buffer = new BufferBlock<Message>(bufferOptions);
Task producer = Task.Run(async () =>
{
try
{
while (_stop.Task.Status != TaskStatus.RanToCompletion)
{
// This line and next line are the *only* two cancellation
// points which will not cause dropped messages.
ct.ThrowIfCancellationRequested();
Task<Message> peekTask = WithCancellation(PeekAsync(MessageQueue), ct);
if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task)
{
// Stop was signaled before PeekAsync returned. Wind down the producer gracefully
// by breaking out and propagating completion to the consumer blocks.
break;
}
await peekTask.ConfigureAwait(false); // Observe Peek exceptions.
ct.ThrowIfCancellationRequested();
// Zero timeout means that we will error if someone else snatches the
// peeked message from the queue before we get to it (due to a race).
// I deemed this better than getting stuck waiting for a message which
// may never arrive, or, worse yet, let this ReceiveAsync run onobserved
// due to a cancellation (if we choose to abandon it like we do PeekAsync).
// You will have to restart the pump if this throws.
// Omit timeout if this behaviour is undesired.
Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false);
await buffer.SendAsync(message, ct).ConfigureAwait(false);
}
}
finally
{
buffer.Complete();
}
},
ct);
// Wire up the parallel consumers.
ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions {
CancellationToken = ct,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
SingleProducerConstrained = true, // We don't require thread safety guarantees.
BoundedCapacity = MaxDegreeOfParallelism,
};
ActionBlock<Message> consumer = new ActionBlock<Message>(async message =>
{
ct.ThrowIfCancellationRequested();
await processMessage(message).ConfigureAwait(false);
},
executionOptions);
buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion)
{
// If we got here, consumer probably errored. Stop the producer
// before we throw so we don't go dequeuing more messages.
producerCTS.Cancel();
}
// Task.WhenAll checks faulted tasks before checking any
// canceled tasks, so if our consumer threw a legitimate
// execption, that's what will be rethrown, not the OCE.
await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false);
}
}
/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndPeek.
/// </summary>
private static Task<Message> PeekAsync(MessageQueue messageQueue)
{
return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek);
}
/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndReceive.
/// </summary>
private static Task<Message> ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout)
{
return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek);
}
/// <summary>
/// Allows abandoning tasks which do not natively
/// support cancellation. Use with caution.
/// </summary>
private static async Task<T> WithCancellation<T>(Task<T> task, CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
using (ct.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs, false))
{
if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
// Cancellation task completed first.
// We are abandoning the original task.
throw new OperationCanceledException(ct);
}
}
// Task completed: synchronously return result or propagate exceptions.
return await task.ConfigureAwait(false);
}
}
}
用法:
using (MessageQueue msMq = GetQueue())
{
MessagePump pump = MessagePump.Run(
msMq,
async message =>
{
await Task.Delay(50);
Console.WriteLine($"Finished processing message {message.Id}");
},
maxDegreeOfParallelism: 4
);
for (int i = 0; i < 100; i++)
{
msMq.Send(new Message());
Thread.Sleep(25);
}
pump.Stop();
await pump.Completion;
}
不整洁但功能正常的单元测试:
https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da
原始答案
如我的评论所述,.NET 中已建立 producer/consumer 模式,其中之一是管道。 Microsoft 自己的 Stephen Toub 在 "Patterns of Parallel Programming" 中可以找到一个很好的例子(全文在这里:https://www.microsoft.com/en-au/download/details.aspx?id=19222,第 55 页)。
这个想法很简单:生产者不断地将东西放入队列中,消费者将其拉出并处理(与生产者并行,也可能彼此并行)。
这是一个消息管道示例,其中消费者使用同步、阻塞方法在项目到达时对其进行处理(我已将消费者并行化以适合您的场景):
void MessageQueueWithBlockingCollection()
{
// If your processing is continuous and never stops throughout the lifetime of
// your application, you can ignore the fact that BlockingCollection is IDisposable.
using (BlockingCollection<Message> messages = new BlockingCollection<Message>())
{
Task producer = Task.Run(() =>
{
try
{
for (int i = 0; i < 10; i++)
{
// Hand over the message to the consumer.
messages.Add(new Message());
// Simulated arrival delay for the next message.
Thread.Sleep(10);
}
}
finally
{
// Notify consumer that there is no more data.
messages.CompleteAdding();
}
});
Task consumer = Task.Run(() =>
{
ParallelOptions options = new ParallelOptions {
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => {
ProcessMessage(message);
});
});
Task.WaitAll(producer, consumer);
}
}
void ProcessMessage(Message message)
{
Thread.Sleep(40);
}
上面的代码在大约 130-140 毫秒内完成,这正是您对消费者并行化的预期。
现在,在您的场景中,您正在使用更适合 TPL 数据流的 Task
s 和 async
/await
(Microsoft 官方支持的库,专为并行和异步序列处理量身定制)。
这里有一个小演示,展示了您将用于该作业的不同类型的 TPL 数据流处理块:
async Task MessageQueueWithTPLDataflow()
{
// Set up our queue.
BufferBlock<Message> queue = new BufferBlock<Message>();
// Set up our processing stage (consumer).
ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions {
CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation.
MaxDegreeOfParallelism = 4
};
ActionBlock<Message> consumer = new ActionBlock<Message>(m => ProcessMessageAsync(m), options);
// Link the queue to the consumer.
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
// Wire up our producer.
Task producer = Task.Run(async () =>
{
try
{
for (int i = 0; i < 10; i++)
{
queue.Post(new Message());
await Task.Delay(10).ConfigureAwait(false);
}
}
finally
{
// Signal to the consumer that there are no more items.
queue.Complete();
}
});
await consumer.Completion.ConfigureAwait(false);
}
Task ProcessMessageAsync(Message message)
{
return Task.Delay(40);
}
调整以上内容以使用您的 MessageQueue
并不难,您可以确定最终结果不会出现线程问题。如果我有更多时间,我会这样做 today/tomorrow。
情况
我们有一个消息队列。我们希望并行处理消息并限制同时处理的消息数。
我们下面的试用代码确实会并行处理消息,但它只会在前一个处理完成后才开始新的一批处理。我们想在任务完成后重新启动它们。
换句话说:只要消息队列不为空,最大数量的任务就应该始终处于活动状态。
试用码
static string queue = @".\Private$\concurrenttest";
private static void Process(CancellationToken token)
{
Task.Factory.StartNew(async () =>
{
while (true)
{
IEnumerable<Task> consumerTasks = ConsumerTasks();
await Task.WhenAll(consumerTasks);
await PeekAsync(new MessageQueue(queue));
}
});
}
private static IEnumerable<Task> ConsumerTasks()
{
for (int i = 0; i < 15; i++)
{
Command1 message;
try
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
Message msg = msMq.Receive();
message = (Command1)msg.Body;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
yield break; // nothing in queue
else throw;
}
yield return Task.Run(() =>
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
});
}
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
您有一组要处理的东西。 您为正在处理的事物创建另一个集合(这可能是您的任务对象或引用任务的某种项目)。
你创建了一个循环,只要你有工作要做就会重复。也就是说,工作项正在等待启动,或者您仍有正在处理的工作项。
在循环开始时,您可以使用任意数量的任务填充您的活动任务集合并发 运行 并在添加它们时启动它们。
你让事情 运行 暂时(比如 Thread.Sleep(10);)。
您创建了一个内部循环来检查所有已启动的任务是否已完成。如果一个已完成,您将其删除并报告结果或做任何适当的事情。
就是这样。在下一轮,外循环的上半部分将向 运行ning 任务集合添加任务,直到数量等于您设置的最大值,使您的正在进行的工作集合保持完整。
您可能希望在工作线程上执行所有这些操作并在循环中监视取消请求。
.NET 中的任务库用于并行执行多个任务。虽然有一些方法可以限制活动任务的数量,但图书馆本身会根据计算机限制活动任务的数量CPU。
第一个需要回答的问题是为什么需要再创建一个限制?如果任务库的限制没问题,那你就可以一直创建任务,依靠任务库去执行,性能不错。
如果这没问题,那么一旦您从 MSMQ 收到一条消息,就启动一个任务来处理该消息,跳过等待(WhenAll 调用),重新开始等待下一条消息。
您可以使用自定义任务计划程序来限制并发任务的数量。有关 MSDN 的更多信息:https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx.
我的同事提出了以下解决方案。此解决方案有效,但我会在 Code Review.
上审核此代码根据给出的答案和我们自己的一些研究,我们找到了解决方案。我们正在使用 SemaphoreSlim
来限制并行任务的数量。
static string queue = @".\Private$\concurrenttest";
private static async Task Process(CancellationToken token)
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
SemaphoreSlim s = new SemaphoreSlim(15, 15);
while (true)
{
await s.WaitAsync();
await PeekAsync(msMq);
Command1 message = await ReceiveAsync(msMq);
Task.Run(async () =>
{
try
{
await HandleMessage(message);
}
catch (Exception)
{
// Exception handling
}
s.Release();
});
}
}
private static Task HandleMessage(Command1 message)
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
return Task.FromResult(1);
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
public class Command1
{
public int id { get; set; }
public string name { get; set; }
}
private static async Task<Command1> ReceiveAsync(MessageQueue msMq)
{
var receiveAsync = await Task.Factory.FromAsync<Message>(msMq.BeginReceive(), msMq.EndPeek);
return (Command1)receiveAsync.Body;
}
您应该考虑为此使用 Microsoft 的 Reactive Framework。
您的代码可能如下所示:
var query =
from command1 in FromQueue<Command1>(queue)
from text in Observable.Start(() =>
{
Thread.Sleep(1000);
return "id: " + command1.id + ", name: " + command1.name;
})
select text;
var subscription =
query
.Subscribe(text => Console.WriteLine(text));
这会并行执行所有处理,并确保处理正确分布在所有内核中。当一个值结束时,另一个值开始。
要取消订阅,只需调用 subscription.Dispose()
。
FromQueue
的代码是:
static IObservable<T> FromQueue<T>(string serverQueue)
{
return Observable.Create<T>(observer =>
{
var responseQueue = Environment.MachineName + "\Private$\" + Guid.NewGuid().ToString();
var queue = MessageQueue.Create(responseQueue);
var frm = new System.Messaging.BinaryMessageFormatter();
var srv = new MessageQueue(serverQueue);
srv.Formatter = frm;
queue.Formatter = frm;
srv.Send("S " + responseQueue);
var loop = NewThreadScheduler.Default.ScheduleLongRunning(cancel =>
{
while (!cancel.IsDisposed)
{
var msg = queue.Receive();
observer.OnNext((T)msg.Body);
}
});
return new CompositeDisposable(
loop,
Disposable.Create(() =>
{
srv.Send("D " + responseQueue);
MessageQueue.Delete(responseQueue);
})
);
});
}
只需 NuGet "Rx-Main" 即可获取位。
为了限制并发你可以这样做:
int maxConcurrent = 2;
var query =
FromQueue<Command1>(queue)
.Select(command1 => Observable.Start(() =>
{
Thread.Sleep(1000);
return "id: " + command1.id + ", name: " + command1.name;
}))
.Merge(maxConcurrent);
编辑
我花了很多时间思考泵的可靠性 - 特别是如果从 MessageQueue
收到消息,取消变得棘手 - 所以我提供了两种终止队列的方法:
- 向
CancellationToken
发出信号以尽快停止管道,这可能会导致消息丢失。 - 调用
MessagePump.Stop()
终止泵,但允许在MessagePump.Completion
任务转换为RanToCompletion
之前完全处理所有已从队列中取出的消息。
该解决方案使用 TPL 数据流(NuGet:Microsoft.Tpl.Dataflow)。
完整实施:
using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Whosebug.Q34437298
{
/// <summary>
/// Pumps the message queue and processes messages in parallel.
/// </summary>
public sealed class MessagePump
{
/// <summary>
/// Creates a <see cref="MessagePump"/> and immediately starts pumping.
/// </summary>
public static MessagePump Run(
MessageQueue messageQueue,
Func<Message, Task> processMessage,
int maxDegreeOfParallelism,
CancellationToken ct = default(CancellationToken))
{
if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue));
if (processMessage == null) throw new ArgumentNullException(nameof(processMessage));
if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
ct.ThrowIfCancellationRequested();
return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct);
}
private readonly TaskCompletionSource<bool> _stop = new TaskCompletionSource<bool>();
/// <summary>
/// <see cref="Task"/> which completes when this instance
/// stops due to a <see cref="Stop"/> or cancellation request.
/// </summary>
public Task Completion { get; }
/// <summary>
/// Maximum number of parallel message processors.
/// </summary>
public int MaxDegreeOfParallelism { get; }
/// <summary>
/// <see cref="MessageQueue"/> that is pumped by this instance.
/// </summary>
public MessageQueue MessageQueue { get; }
/// <summary>
/// Creates a new <see cref="MessagePump"/> instance.
/// </summary>
private MessagePump(MessageQueue messageQueue, Func<Message, Task> processMessage, int maxDegreeOfParallelism, CancellationToken ct)
{
MessageQueue = messageQueue;
MaxDegreeOfParallelism = maxDegreeOfParallelism;
// Kick off the loop.
Completion = RunAsync(processMessage, ct);
}
/// <summary>
/// Soft-terminates the pump so that no more messages will be pumped.
/// Any messages already removed from the message queue will be
/// processed before this instance fully completes.
/// </summary>
public void Stop()
{
// Multiple calls to Stop are fine.
_stop.TrySetResult(true);
}
/// <summary>
/// Pump implementation.
/// </summary>
private async Task RunAsync(Func<Message, Task> processMessage, CancellationToken ct = default(CancellationToken))
{
using (CancellationTokenSource producerCTS = ct.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(ct)
: new CancellationTokenSource())
{
// This CancellationToken will either be signaled
// externally, or if our consumer errors.
ct = producerCTS.Token;
// Handover between producer and consumer.
DataflowBlockOptions bufferOptions = new DataflowBlockOptions {
// There is no point in dequeuing more messages than we can process,
// so we'll throttle the producer by limiting the buffer capacity.
BoundedCapacity = MaxDegreeOfParallelism,
CancellationToken = ct
};
BufferBlock<Message> buffer = new BufferBlock<Message>(bufferOptions);
Task producer = Task.Run(async () =>
{
try
{
while (_stop.Task.Status != TaskStatus.RanToCompletion)
{
// This line and next line are the *only* two cancellation
// points which will not cause dropped messages.
ct.ThrowIfCancellationRequested();
Task<Message> peekTask = WithCancellation(PeekAsync(MessageQueue), ct);
if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task)
{
// Stop was signaled before PeekAsync returned. Wind down the producer gracefully
// by breaking out and propagating completion to the consumer blocks.
break;
}
await peekTask.ConfigureAwait(false); // Observe Peek exceptions.
ct.ThrowIfCancellationRequested();
// Zero timeout means that we will error if someone else snatches the
// peeked message from the queue before we get to it (due to a race).
// I deemed this better than getting stuck waiting for a message which
// may never arrive, or, worse yet, let this ReceiveAsync run onobserved
// due to a cancellation (if we choose to abandon it like we do PeekAsync).
// You will have to restart the pump if this throws.
// Omit timeout if this behaviour is undesired.
Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false);
await buffer.SendAsync(message, ct).ConfigureAwait(false);
}
}
finally
{
buffer.Complete();
}
},
ct);
// Wire up the parallel consumers.
ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions {
CancellationToken = ct,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
SingleProducerConstrained = true, // We don't require thread safety guarantees.
BoundedCapacity = MaxDegreeOfParallelism,
};
ActionBlock<Message> consumer = new ActionBlock<Message>(async message =>
{
ct.ThrowIfCancellationRequested();
await processMessage(message).ConfigureAwait(false);
},
executionOptions);
buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion)
{
// If we got here, consumer probably errored. Stop the producer
// before we throw so we don't go dequeuing more messages.
producerCTS.Cancel();
}
// Task.WhenAll checks faulted tasks before checking any
// canceled tasks, so if our consumer threw a legitimate
// execption, that's what will be rethrown, not the OCE.
await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false);
}
}
/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndPeek.
/// </summary>
private static Task<Message> PeekAsync(MessageQueue messageQueue)
{
return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek);
}
/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndReceive.
/// </summary>
private static Task<Message> ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout)
{
return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek);
}
/// <summary>
/// Allows abandoning tasks which do not natively
/// support cancellation. Use with caution.
/// </summary>
private static async Task<T> WithCancellation<T>(Task<T> task, CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
using (ct.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs, false))
{
if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
// Cancellation task completed first.
// We are abandoning the original task.
throw new OperationCanceledException(ct);
}
}
// Task completed: synchronously return result or propagate exceptions.
return await task.ConfigureAwait(false);
}
}
}
用法:
using (MessageQueue msMq = GetQueue())
{
MessagePump pump = MessagePump.Run(
msMq,
async message =>
{
await Task.Delay(50);
Console.WriteLine($"Finished processing message {message.Id}");
},
maxDegreeOfParallelism: 4
);
for (int i = 0; i < 100; i++)
{
msMq.Send(new Message());
Thread.Sleep(25);
}
pump.Stop();
await pump.Completion;
}
不整洁但功能正常的单元测试:
https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da
原始答案
如我的评论所述,.NET 中已建立 producer/consumer 模式,其中之一是管道。 Microsoft 自己的 Stephen Toub 在 "Patterns of Parallel Programming" 中可以找到一个很好的例子(全文在这里:https://www.microsoft.com/en-au/download/details.aspx?id=19222,第 55 页)。
这个想法很简单:生产者不断地将东西放入队列中,消费者将其拉出并处理(与生产者并行,也可能彼此并行)。
这是一个消息管道示例,其中消费者使用同步、阻塞方法在项目到达时对其进行处理(我已将消费者并行化以适合您的场景):
void MessageQueueWithBlockingCollection()
{
// If your processing is continuous and never stops throughout the lifetime of
// your application, you can ignore the fact that BlockingCollection is IDisposable.
using (BlockingCollection<Message> messages = new BlockingCollection<Message>())
{
Task producer = Task.Run(() =>
{
try
{
for (int i = 0; i < 10; i++)
{
// Hand over the message to the consumer.
messages.Add(new Message());
// Simulated arrival delay for the next message.
Thread.Sleep(10);
}
}
finally
{
// Notify consumer that there is no more data.
messages.CompleteAdding();
}
});
Task consumer = Task.Run(() =>
{
ParallelOptions options = new ParallelOptions {
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => {
ProcessMessage(message);
});
});
Task.WaitAll(producer, consumer);
}
}
void ProcessMessage(Message message)
{
Thread.Sleep(40);
}
上面的代码在大约 130-140 毫秒内完成,这正是您对消费者并行化的预期。
现在,在您的场景中,您正在使用更适合 TPL 数据流的 Task
s 和 async
/await
(Microsoft 官方支持的库,专为并行和异步序列处理量身定制)。
这里有一个小演示,展示了您将用于该作业的不同类型的 TPL 数据流处理块:
async Task MessageQueueWithTPLDataflow()
{
// Set up our queue.
BufferBlock<Message> queue = new BufferBlock<Message>();
// Set up our processing stage (consumer).
ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions {
CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation.
MaxDegreeOfParallelism = 4
};
ActionBlock<Message> consumer = new ActionBlock<Message>(m => ProcessMessageAsync(m), options);
// Link the queue to the consumer.
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
// Wire up our producer.
Task producer = Task.Run(async () =>
{
try
{
for (int i = 0; i < 10; i++)
{
queue.Post(new Message());
await Task.Delay(10).ConfigureAwait(false);
}
}
finally
{
// Signal to the consumer that there are no more items.
queue.Complete();
}
});
await consumer.Completion.ConfigureAwait(false);
}
Task ProcessMessageAsync(Message message)
{
return Task.Delay(40);
}
调整以上内容以使用您的 MessageQueue
并不难,您可以确定最终结果不会出现线程问题。如果我有更多时间,我会这样做 today/tomorrow。