每个数据库具有专用处理器的 MSMQ 体系结构
MSMQ ARCHITECTURE WITH DEDICATED PROCESSORS PER DATABASE
我在 ASP.NET MVC 和 C# 中有一个 Web 应用程序,我有一个特定的用例需要很长时间才能处理,用户必须等到该过程完成。我想使用 MSMQ 并将繁重的工作转交给专用的 MSMQ consumer/servicer。我们的应用程序有多个客户端,每个客户端都有自己的 SQL 数据库。因此,假设 100 个客户端创建 100 个独立的 SQL 数据库。我面临的真正挑战是使用 MSMQ 使流程更快,但 1 个客户端的任务不应影响其他客户端的性能。所以我有 2 个解决方案:
Option-1: 每个数据库唯一的 MSMQ 专用队列所以在我的例子中它将是 100 个队列并且还在增长。 1 个侦听专用 MSMQ 的专用 ASP.NET 控制台应用程序,因此在我的例子中它将是 100 个处理器或控制台应用程序。
Option-2: 1 个用于所有数据库的大型 MSMQ 专用队列
A:每个数据库 1 个专用 MSMQ 使用者,因此 100 个处理器
B: 1个听大MSMQ的MSMQ消费者
我想坚持使用选项 1,但我想知道这是一个可行的企业型解决方案吗?
你其实有两个问题
首先,你如何allocate a resources affinity to a processor
到SQL Server
。
Select Sql Management Studio 中的数据库,右键单击并按照此..
定期清理数据库
DBCC FREEPROCCACHE;
DBCC DROPCLEANBUFFERS;
MSMQ,开启[journaling][2]
,还要考虑另外一个队列进程RabbitMQ等,或者写一个简单的到enquque的jobssample from here
public class MultiThreadQueue
{
BlockingCollection<string> _jobs = new BlockingCollection<string>();
public MultiThreadQueue(int numThreads)
{
for (int i = 0; i < numThreads; i++)
{
var thread = new Thread(OnHandlerStart)
{ IsBackground = true };//Mark 'false' if you want to prevent program exit until jobs finish
thread.Start();
}
}
public void Enqueue(string job)
{
if (!_jobs.IsAddingCompleted)
{
_jobs.Add(job);
}
}
public void Stop()
{
//This will cause '_jobs.GetConsumingEnumerable' to stop blocking and exit when it's empty
_jobs.CompleteAdding();
}
private void OnHandlerStart()
{
foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None))
{
Console.WriteLine(job);
Thread.Sleep(10);
}
}
}
希望这对您有所帮助:)
问题已改写,他说处理器时还有其他意思。
更新添加了一个带有 onPeek 的消费者模式:
你真的需要post一些代码!
考虑使用 OnPeekCompleted
方法。如有错误可在queue
留言
如果您有某种 header 标识消息,您可以切换到不同的 dedicated/thread。
private static void OnPeekCompleted(Object sourceQueue, PeekCompletedEventArgs asyncResult)
{
// Set up and connect to the queue.
MessageQueue mq = (MessageQueue)sourceQueue;
// gets a new transaction going
using (var txn = new MessageQueueTransaction())
{
try
{
// retrieve message and process
txn.Begin();
// End the asynchronous peek operation.
var message = mq.Receive(txn);
#if DEBUG
// Display message information on the screen.
if (message != null)
{
Console.WriteLine("{0}: {1}", message.Label, (string)message.Body);
}
#endif
// message will be removed on txn.Commit.
txn.Commit();
}
catch (Exception ex)
{
// If there is an error you can leave the message on the queue, don't remove message from queue
Console.WriteLine(ex.ToString());
txn.Abort();
}
}
// Restart the asynchronous peek operation.
mq.BeginPeek();
}
您也可以使用 service broker
我在 ASP.NET MVC 和 C# 中有一个 Web 应用程序,我有一个特定的用例需要很长时间才能处理,用户必须等到该过程完成。我想使用 MSMQ 并将繁重的工作转交给专用的 MSMQ consumer/servicer。我们的应用程序有多个客户端,每个客户端都有自己的 SQL 数据库。因此,假设 100 个客户端创建 100 个独立的 SQL 数据库。我面临的真正挑战是使用 MSMQ 使流程更快,但 1 个客户端的任务不应影响其他客户端的性能。所以我有 2 个解决方案:
Option-1: 每个数据库唯一的 MSMQ 专用队列所以在我的例子中它将是 100 个队列并且还在增长。 1 个侦听专用 MSMQ 的专用 ASP.NET 控制台应用程序,因此在我的例子中它将是 100 个处理器或控制台应用程序。
Option-2: 1 个用于所有数据库的大型 MSMQ 专用队列 A:每个数据库 1 个专用 MSMQ 使用者,因此 100 个处理器 B: 1个听大MSMQ的MSMQ消费者
我想坚持使用选项 1,但我想知道这是一个可行的企业型解决方案吗?
你其实有两个问题
首先,你如何
allocate a resources affinity to a processor
到SQL Server
。Select Sql Management Studio 中的数据库,右键单击并按照此..
定期清理数据库
DBCC FREEPROCCACHE;
DBCC DROPCLEANBUFFERS;
MSMQ,开启
[journaling][2]
,还要考虑另外一个队列进程RabbitMQ等,或者写一个简单的到enquque的jobssample from herepublic class MultiThreadQueue { BlockingCollection<string> _jobs = new BlockingCollection<string>(); public MultiThreadQueue(int numThreads) { for (int i = 0; i < numThreads; i++) { var thread = new Thread(OnHandlerStart) { IsBackground = true };//Mark 'false' if you want to prevent program exit until jobs finish thread.Start(); } } public void Enqueue(string job) { if (!_jobs.IsAddingCompleted) { _jobs.Add(job); } } public void Stop() { //This will cause '_jobs.GetConsumingEnumerable' to stop blocking and exit when it's empty _jobs.CompleteAdding(); } private void OnHandlerStart() { foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None)) { Console.WriteLine(job); Thread.Sleep(10); } } }
希望这对您有所帮助:)
问题已改写,他说处理器时还有其他意思。
更新添加了一个带有 onPeek 的消费者模式:
你真的需要post一些代码!
考虑使用 OnPeekCompleted
方法。如有错误可在queue
如果您有某种 header 标识消息,您可以切换到不同的 dedicated/thread。
private static void OnPeekCompleted(Object sourceQueue, PeekCompletedEventArgs asyncResult)
{
// Set up and connect to the queue.
MessageQueue mq = (MessageQueue)sourceQueue;
// gets a new transaction going
using (var txn = new MessageQueueTransaction())
{
try
{
// retrieve message and process
txn.Begin();
// End the asynchronous peek operation.
var message = mq.Receive(txn);
#if DEBUG
// Display message information on the screen.
if (message != null)
{
Console.WriteLine("{0}: {1}", message.Label, (string)message.Body);
}
#endif
// message will be removed on txn.Commit.
txn.Commit();
}
catch (Exception ex)
{
// If there is an error you can leave the message on the queue, don't remove message from queue
Console.WriteLine(ex.ToString());
txn.Abort();
}
}
// Restart the asynchronous peek operation.
mq.BeginPeek();
}
您也可以使用 service broker