为什么客户端收不到NetMQ框架服务器的消息?

Why the client cann't receive message from server with NetMQ framework?

最近,我使用NetMQ在服务器和客户端之间发送或接收消息。 服务器代码如:

    void Main()
    {
      CreatePullAndPushSocket();
      Task.Factory.StartNew(()=> {
            while (true)
            {
                Thread.Sleep(1);
                if (Pull != null)
                {
                    var message = Pull.ReceiveFrameString();
                }
            }
        });
    }
    PullSocket Pull;
    PushSocket Push;
    private void CreatePullAndPushSocket()
    {
        Pull = new PullSocket("tcp://ip1:port1");
        Push = new PushSocket("tcp://ip2:port2");
    }
    public void SendMessageToClient(string message)
    {
        if (Push != null)
        {
            Push.SendFrame(message);
        }
    }

客户端代码如下:

   void Main()
    { 
      new Thread(()=> {
            while (true)
            {
                Thread.Sleep(1);
                if (Pull != null)
                {
                    var message = Pull.ReceiveFrameString();
                }
            }
        }).Start();
    }
    PullSocket Pull;
    PushSocket Push;
    private void CreatePullAndPushSocket()
    {
        Pull = new PullSocket("tcp://ip2:port2");
        Push = new PushSocket("tcp://ip1:port1");
    }
    public void SendMessageToClient(string message)
    {
        if (Push != null)
        {
            Push.SendFrame(message);
        }
    }

当我运行两个应用,一个是服务端app,一个是客户端app。

好奇怪,我按照指导做了https://netmq.readthedocs.io/en/latest/push-pull/

在 NetMQ 中非常重要的是线程模型。真正重要的是,你不应该在多线程中使用套接字。因此,如果 Thread#1 创建了套接字,那么它应该使用它。如果您想通过使用相同的套接字从其他线程(假设线程#2)发送消息,请忘记这一点。你应该以某种方式将消息从线程#2 发送到线程#1,然后它应该通过套接字将它发送到客户端。

所以基本上 CreatePullAndPushSocket 是错误的,并且可能会发生奇怪的事情。您正在一个线程中创建套接字并在其他线程中使用。完全错了。

另一件事是你的 Thread.Sleep。你不应该使用 Thread.Sleep,因为你的线程正在休眠 1s 然后检查套接字一次,而不是休眠并检查一次。 NetMQ 有函数 TryReceive,带超时。因此它可以检查套接字 1 秒然后退出以检查您是否调用 cancel/stop 或其他任何方式。或者更好的是有一个轮询器,它会一直监听套接字,并允许我们从其他线程调用停止。

让我们看一下这段代码:

private Poller _poller;

public void Start()
{
  Task.Factory.StartNew(()=> {
     using(var pullSocket = new PullSocket("tcp://ip1:port1"))
     using(_poller = new Poller())
     {
        pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) =>
        {
            var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
        }
        _poller.Add(pullSocket);
        _poller.Run();
     }
    });
}
public void Stop()
{
   _poller?.Stop();
}

或者如果您想在 while 循环中使用不带轮询器的代码:

私有只读 CancellationTokenSource _cts;

public void Start()
{
   _cts = new CancellationTokenSource();
   var token = _cts.Token;

  Task.Factory.StartNew(()=> {
     using(var pullSocket = new PullSocket("tcp://ip1:port1"))
     {
        while (cancellationToken.IsCancellationRequested == false)
        {
            NetMQMessage message = new NetMQMessage();
            bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message);

            if (success == false)
            {
                continue;
            }

            //if You reach this line, than You have a message
        }
     }
    },
    token,
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);
}
public void Stop()
{
    _cts.Cancel();
    _cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop
}

所以回到你的问题,你应该只在你创建它的线程内部使用套接字。好在using语句中总是使用socket,最后总是release。

我看不到 SendMessageToClient 方法的用法,但我假设您是从某个按钮或其他东西调用它的。如果从该线程调用套接字的构造函数,则可以这样做。如果你能告诉我你在哪里调用这个方法,我可以谈谈这个。