每个数据库具有专用处理器的 MSMQ 体系结构

MSMQ ARCHITECTURE WITH DEDICATED PROCESSORS PER DATABASE

我在 ASP.NET MVC 和 C# 中有一个 Web 应用程序,我有一个特定的用例需要很长时间才能处理,用户必须等到该过程完成。我想使用 MSMQ 并将繁重的工作转交给专用的 MSMQ consumer/servicer。我们的应用程序有多个客户端,每个客户端都有自己的 SQL 数据库。因此,假设 100 个客户端创建 100 个独立的 SQL 数据库。我面临的真正挑战是使用 MSMQ 使流程更快,但 1 个客户端的任务不应影响其他客户端的性能。所以我有 2 个解决方案:

Option-1: 每个数据库唯一的 MSMQ 专用队列所以在我的例子中它将是 100 个队列并且还在增长。 1 个侦听专用 MSMQ 的专用 ASP.NET 控制台应用程序,因此在我的例子中它将是 100 个处理器或控制台应用程序。

Option-2: 1 个用于所有数据库的大型 MSMQ 专用队列 A:每个数据库 1 个专用 MSMQ 使用者,因此 100 个处理器 B: 1个听大MSMQ的MSMQ消费者

我想坚持使用选项 1,但我想知道这是一个可行的企业型解决方案吗?

你其实有两个问题


  1. 首先,你如何allocate a resources affinity to a processorSQL Server

    Select Sql Management Studio 中的数据库,右键单击并按照此..


  1. 定期清理数据库

    DBCC FREEPROCCACHE;

    DBCC DROPCLEANBUFFERS;


  1. MSMQ,开启[journaling][2],还要考虑另外一个队列进程RabbitMQ等,或者写一个简单的到enquque的jobssample from here

    public class MultiThreadQueue
    {
    BlockingCollection<string> _jobs = new BlockingCollection<string>();
    
    public MultiThreadQueue(int numThreads)
    {
        for (int i = 0; i < numThreads; i++)
        {
            var thread = new Thread(OnHandlerStart)
                { IsBackground = true };//Mark 'false' if you want to prevent program exit until jobs finish
            thread.Start();
        }
    }
    
    public void Enqueue(string job)
    {
        if (!_jobs.IsAddingCompleted)
        {
            _jobs.Add(job);
        }
    }
    
    public void Stop()
    {
        //This will cause '_jobs.GetConsumingEnumerable' to stop blocking and exit when it's empty
        _jobs.CompleteAdding();
    }
    
    private void OnHandlerStart()
    {
        foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None))
        {
            Console.WriteLine(job);
            Thread.Sleep(10);
        }
     }
    }
    

希望这对您有所帮助:)


问题已改写,他说处理器时还有其他意思。


更新添加了一个带有 onPeek 的消费者模式:

你真的需要post一些代码!

考虑使用 OnPeekCompleted 方法。如有错误可在queue

留言

如果您有某种 header 标识消息,您可以切换到不同的 dedicated/thread。

    private static void OnPeekCompleted(Object sourceQueue, PeekCompletedEventArgs asyncResult)
    {
        // Set up and connect to the queue.
        MessageQueue mq = (MessageQueue)sourceQueue;

        // gets a new transaction going
        using (var txn = new MessageQueueTransaction())
        {
            try
            {
                // retrieve message and process
                txn.Begin();
                // End the asynchronous peek operation.
                var message = mq.Receive(txn);

            #if DEBUG
                // Display message information on the screen.
                if (message != null)
                {
                    Console.WriteLine("{0}: {1}", message.Label, (string)message.Body);
                }
            #endif
                // message will be removed on txn.Commit.
                txn.Commit();
            }
            catch (Exception ex)
            {
                // If there is an error you can leave the message on the queue, don't remove message from queue
                Console.WriteLine(ex.ToString());
                txn.Abort();
            }
        }

        // Restart the asynchronous peek operation.
        mq.BeginPeek();
    }

您也可以使用 service broker