netMQ 4.0 多线程
netMQ 4.0 multithreading
我在使用基于 netMQ 4.0
的多线程服务器时遇到了一些问题。我尝试使用 http://zguide.zeromq.org/cs:mtserver
,但 netMQ 4.0
上没有上下文。
我试过:
for (var i = 0; i < workerCount; ++i)
{
new Thread(() => Worker(connStr.Value)).Start();
}
//...
private void Worker(string connStr)
{
using (var socket = new DealerSocket(connStr))
{
while (true)
{
var msg = socket.ReceiveMultipartMessage();
//...
}
}
}
但我收到错误:
NetMQ.TerminatingException: CheckContextTerminated
是的,它已终止。
如何在 netMQ 4.0
中创建上下文或如何使用 netMQ 4.0
创建多线程服务器?
正确答案:
using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
{
workers.Bind("inproc://workers");
for (var i = 0; i < workerCount; i++)
{
new Thread(Worker).Start();
}
var prx = new Proxy(clients, workers);
prx.Start();
}
private void Worker()
{
using (var socket = new ResponseSocket())
{
socket.Connect("inproc://workers");
while (true)
{
//...
}
}
}
如果您使用的是 .NET
4.0 或更高版本,Thread
创建方法已经过时,不应以这种方式使用 - 如果您的 workerCount
足够高并且您没有提供任何调度程序逻辑,您的性能可能会显着下降而不是受益。
您可以使用 TPL:
代替您的方法
- 您可以轻松地将工作线程替换为
LongRunning
tasks。
- 您可能应该为您的工人引入
CancellationToken
以正确阻止他们。
所以你的代码可能是这样的:
/// field in your class
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
{
workers.Bind("inproc://workers");
for (var i = 0; i < workerCount; ++i)
{
Task.Factory.StartNew(Worker
, cancellationTokenSource.Token
, TaskCreationOptions.LongRunning
, TaskScheduler.Default);
}
var prx = new Proxy(clients, workers);
prx.Start();
}
private void Worker()
{
using (var socket = new ResponseSocket())
{
socket.Connect("inproc://workers");
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
//...
}
// Cancel the task and exit
cancellationTokenSource.Token.ThrowIfCancellationRequested();
}
}
为了简化它,您可以将 CancellationToken
作为参数传递给 Worker
方法。
我在使用基于 netMQ 4.0
的多线程服务器时遇到了一些问题。我尝试使用 http://zguide.zeromq.org/cs:mtserver
,但 netMQ 4.0
上没有上下文。
我试过:
for (var i = 0; i < workerCount; ++i)
{
new Thread(() => Worker(connStr.Value)).Start();
}
//...
private void Worker(string connStr)
{
using (var socket = new DealerSocket(connStr))
{
while (true)
{
var msg = socket.ReceiveMultipartMessage();
//...
}
}
}
但我收到错误:
NetMQ.TerminatingException: CheckContextTerminated
是的,它已终止。
如何在 netMQ 4.0
中创建上下文或如何使用 netMQ 4.0
创建多线程服务器?
正确答案:
using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
{
workers.Bind("inproc://workers");
for (var i = 0; i < workerCount; i++)
{
new Thread(Worker).Start();
}
var prx = new Proxy(clients, workers);
prx.Start();
}
private void Worker()
{
using (var socket = new ResponseSocket())
{
socket.Connect("inproc://workers");
while (true)
{
//...
}
}
}
如果您使用的是 .NET
4.0 或更高版本,Thread
创建方法已经过时,不应以这种方式使用 - 如果您的 workerCount
足够高并且您没有提供任何调度程序逻辑,您的性能可能会显着下降而不是受益。
您可以使用 TPL:
代替您的方法- 您可以轻松地将工作线程替换为
LongRunning
tasks。 - 您可能应该为您的工人引入
CancellationToken
以正确阻止他们。
所以你的代码可能是这样的:
/// field in your class
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
using (var clients = new RouterSocket(connStr.Value))
using (var workers = new DealerSocket())
{
workers.Bind("inproc://workers");
for (var i = 0; i < workerCount; ++i)
{
Task.Factory.StartNew(Worker
, cancellationTokenSource.Token
, TaskCreationOptions.LongRunning
, TaskScheduler.Default);
}
var prx = new Proxy(clients, workers);
prx.Start();
}
private void Worker()
{
using (var socket = new ResponseSocket())
{
socket.Connect("inproc://workers");
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
//...
}
// Cancel the task and exit
cancellationTokenSource.Token.ThrowIfCancellationRequested();
}
}
为了简化它,您可以将 CancellationToken
作为参数传递给 Worker
方法。