netMQ 4.0 多线程

netMQ 4.0 multithreading

我在使用基于 netMQ 4.0 的多线程服务器时遇到了一些问题。我尝试使用 http://zguide.zeromq.org/cs:mtserver,但 netMQ 4.0 上没有上下文。

我试过:

for (var i = 0; i < workerCount; ++i)
{
    new Thread(() => Worker(connStr.Value)).Start();
}

//...
private void Worker(string connStr)
{
    using (var socket = new DealerSocket(connStr))
    {
        while (true)
        {
            var msg = socket.ReceiveMultipartMessage();
            //...
        }
    }
}

但我收到错误:

NetMQ.TerminatingException: CheckContextTerminated

是的,它已终止。

如何在 netMQ 4.0 中创建上下文或如何使用 netMQ 4.0 创建多线程服务器?

正确答案:

using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
    {
        workers.Bind("inproc://workers");
            for (var i = 0; i < workerCount; i++)
            {
                new Thread(Worker).Start();
            }
            var prx = new Proxy(clients, workers);
            prx.Start();
            }

private void Worker()
    {
        using (var socket = new ResponseSocket())
        {
            socket.Connect("inproc://workers");
            while (true)
            {
                 //...
            }
        }
    }

如果您使用的是 .NET 4.0 或更高版本,Thread 创建方法已经过时,不应以这种方式使用 - 如果您的 workerCount 足够高并且您没有提供任何调度程序逻辑,您的性能可能会显着下降而不是受益。

您可以使用 TPL:

代替您的方法
  1. 您可以轻松地将工作线程替换为 LongRunning tasks
  2. 您可能应该为您的工人引入 CancellationToken 以正确阻止他们。

所以你的代码可能是这样的:

/// field in your class
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
{
    workers.Bind("inproc://workers");
    for (var i = 0; i < workerCount; ++i)
    {
        Task.Factory.StartNew(Worker
            , cancellationTokenSource.Token
            , TaskCreationOptions.LongRunning
            , TaskScheduler.Default);
    }
    var prx = new Proxy(clients, workers);
    prx.Start();
}

private void Worker()
{
    using (var socket = new ResponseSocket())
    {
        socket.Connect("inproc://workers");
        while (!cancellationTokenSource.Token.IsCancellationRequested)
        {
             //...
        }
        // Cancel the task and exit
        cancellationTokenSource.Token.ThrowIfCancellationRequested();
    }
}

为了简化它,您可以将 CancellationToken 作为参数传递给 Worker 方法。