为什么客户端收不到NetMQ框架服务器的消息?
Why the client cann't receive message from server with NetMQ framework?
最近,我使用NetMQ在服务器和客户端之间发送或接收消息。
服务器代码如:
void Main()
{
CreatePullAndPushSocket();
Task.Factory.StartNew(()=> {
while (true)
{
Thread.Sleep(1);
if (Pull != null)
{
var message = Pull.ReceiveFrameString();
}
}
});
}
PullSocket Pull;
PushSocket Push;
private void CreatePullAndPushSocket()
{
Pull = new PullSocket("tcp://ip1:port1");
Push = new PushSocket("tcp://ip2:port2");
}
public void SendMessageToClient(string message)
{
if (Push != null)
{
Push.SendFrame(message);
}
}
客户端代码如下:
void Main()
{
new Thread(()=> {
while (true)
{
Thread.Sleep(1);
if (Pull != null)
{
var message = Pull.ReceiveFrameString();
}
}
}).Start();
}
PullSocket Pull;
PushSocket Push;
private void CreatePullAndPushSocket()
{
Pull = new PullSocket("tcp://ip2:port2");
Push = new PushSocket("tcp://ip1:port1");
}
public void SendMessageToClient(string message)
{
if (Push != null)
{
Push.SendFrame(message);
}
}
当我运行两个应用,一个是服务端app,一个是客户端app。
- 1:客户端向服务器发送消息
- 2:服务端可以收到客户端的消息
- 3:服务器向客户端发送另一条消息
- 4:客户端收不到消息!!!
好奇怪,我按照指导做了https://netmq.readthedocs.io/en/latest/push-pull/!
在 NetMQ 中非常重要的是线程模型。真正重要的是,你不应该在多线程中使用套接字。因此,如果 Thread#1 创建了套接字,那么它应该使用它。如果您想通过使用相同的套接字从其他线程(假设线程#2)发送消息,请忘记这一点。你应该以某种方式将消息从线程#2 发送到线程#1,然后它应该通过套接字将它发送到客户端。
所以基本上 CreatePullAndPushSocket 是错误的,并且可能会发生奇怪的事情。您正在一个线程中创建套接字并在其他线程中使用。完全错了。
另一件事是你的 Thread.Sleep。你不应该使用 Thread.Sleep,因为你的线程正在休眠 1s 然后检查套接字一次,而不是休眠并检查一次。 NetMQ 有函数 TryReceive,带超时。因此它可以检查套接字 1 秒然后退出以检查您是否调用 cancel/stop 或其他任何方式。或者更好的是有一个轮询器,它会一直监听套接字,并允许我们从其他线程调用停止。
让我们看一下这段代码:
private Poller _poller;
public void Start()
{
Task.Factory.StartNew(()=> {
using(var pullSocket = new PullSocket("tcp://ip1:port1"))
using(_poller = new Poller())
{
pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) =>
{
var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
}
_poller.Add(pullSocket);
_poller.Run();
}
});
}
public void Stop()
{
_poller?.Stop();
}
或者如果您想在 while 循环中使用不带轮询器的代码:
私有只读 CancellationTokenSource _cts;
public void Start()
{
_cts = new CancellationTokenSource();
var token = _cts.Token;
Task.Factory.StartNew(()=> {
using(var pullSocket = new PullSocket("tcp://ip1:port1"))
{
while (cancellationToken.IsCancellationRequested == false)
{
NetMQMessage message = new NetMQMessage();
bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message);
if (success == false)
{
continue;
}
//if You reach this line, than You have a message
}
}
},
token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public void Stop()
{
_cts.Cancel();
_cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop
}
所以回到你的问题,你应该只在你创建它的线程内部使用套接字。好在using语句中总是使用socket,最后总是release。
我看不到 SendMessageToClient 方法的用法,但我假设您是从某个按钮或其他东西调用它的。如果从该线程调用套接字的构造函数,则可以这样做。如果你能告诉我你在哪里调用这个方法,我可以谈谈这个。
最近,我使用NetMQ在服务器和客户端之间发送或接收消息。 服务器代码如:
void Main()
{
CreatePullAndPushSocket();
Task.Factory.StartNew(()=> {
while (true)
{
Thread.Sleep(1);
if (Pull != null)
{
var message = Pull.ReceiveFrameString();
}
}
});
}
PullSocket Pull;
PushSocket Push;
private void CreatePullAndPushSocket()
{
Pull = new PullSocket("tcp://ip1:port1");
Push = new PushSocket("tcp://ip2:port2");
}
public void SendMessageToClient(string message)
{
if (Push != null)
{
Push.SendFrame(message);
}
}
客户端代码如下:
void Main()
{
new Thread(()=> {
while (true)
{
Thread.Sleep(1);
if (Pull != null)
{
var message = Pull.ReceiveFrameString();
}
}
}).Start();
}
PullSocket Pull;
PushSocket Push;
private void CreatePullAndPushSocket()
{
Pull = new PullSocket("tcp://ip2:port2");
Push = new PushSocket("tcp://ip1:port1");
}
public void SendMessageToClient(string message)
{
if (Push != null)
{
Push.SendFrame(message);
}
}
当我运行两个应用,一个是服务端app,一个是客户端app。
- 1:客户端向服务器发送消息
- 2:服务端可以收到客户端的消息
- 3:服务器向客户端发送另一条消息
- 4:客户端收不到消息!!!
好奇怪,我按照指导做了https://netmq.readthedocs.io/en/latest/push-pull/!
在 NetMQ 中非常重要的是线程模型。真正重要的是,你不应该在多线程中使用套接字。因此,如果 Thread#1 创建了套接字,那么它应该使用它。如果您想通过使用相同的套接字从其他线程(假设线程#2)发送消息,请忘记这一点。你应该以某种方式将消息从线程#2 发送到线程#1,然后它应该通过套接字将它发送到客户端。
所以基本上 CreatePullAndPushSocket 是错误的,并且可能会发生奇怪的事情。您正在一个线程中创建套接字并在其他线程中使用。完全错了。
另一件事是你的 Thread.Sleep。你不应该使用 Thread.Sleep,因为你的线程正在休眠 1s 然后检查套接字一次,而不是休眠并检查一次。 NetMQ 有函数 TryReceive,带超时。因此它可以检查套接字 1 秒然后退出以检查您是否调用 cancel/stop 或其他任何方式。或者更好的是有一个轮询器,它会一直监听套接字,并允许我们从其他线程调用停止。
让我们看一下这段代码:
private Poller _poller;
public void Start()
{
Task.Factory.StartNew(()=> {
using(var pullSocket = new PullSocket("tcp://ip1:port1"))
using(_poller = new Poller())
{
pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) =>
{
var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
}
_poller.Add(pullSocket);
_poller.Run();
}
});
}
public void Stop()
{
_poller?.Stop();
}
或者如果您想在 while 循环中使用不带轮询器的代码:
私有只读 CancellationTokenSource _cts;
public void Start()
{
_cts = new CancellationTokenSource();
var token = _cts.Token;
Task.Factory.StartNew(()=> {
using(var pullSocket = new PullSocket("tcp://ip1:port1"))
{
while (cancellationToken.IsCancellationRequested == false)
{
NetMQMessage message = new NetMQMessage();
bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message);
if (success == false)
{
continue;
}
//if You reach this line, than You have a message
}
}
},
token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public void Stop()
{
_cts.Cancel();
_cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop
}
所以回到你的问题,你应该只在你创建它的线程内部使用套接字。好在using语句中总是使用socket,最后总是release。
我看不到 SendMessageToClient 方法的用法,但我假设您是从某个按钮或其他东西调用它的。如果从该线程调用套接字的构造函数,则可以这样做。如果你能告诉我你在哪里调用这个方法,我可以谈谈这个。