NetMQ Receive/Response 循环时不工作

NetMQ Receive/Response not working when looping

我在 https://netmq.readthedocs.io/ 上使用了一个简单的 receive/request 套接字示例,并想让它在无限循环中与 parametrizedThread 一起工作。 该代码在几个循环中运行良好,之后它抛出

A non-blocking socket operation could not be completed immediately

对于我得到的结果,应该在第一个循环之后立即发生,而不是随机发生。这里的问题是什么?听起来好像必须清除某些东西才能再次获得干净的连接(不确定)。

    class Program
{
    public class Connector
    {
        public String connection { get; set; }
        public ResponseSocket server { get; set; }

        public Connector(string address, ResponseSocket server_)
        {
            this.connection = address;
            this.server = server_;
        }
    }

    static void Main(string[] args)
    {
        string connection = "tcp://localhost:5555";
        using (var server = new ResponseSocket())
        {
            while (true)
            {
                try
                {
                    server.Bind(connection);
                }
                catch (NetMQException e)
                {
                    Console.WriteLine(e.ErrorCode);
                }

                Connector c = new Connector(connection, server);

                ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
                Thread t = new Thread(parametrizedClientThread);
                t.Start(c);
                //runClientSide(connection, server);
            }
        }
    }

    private static void runClientSide(object param)
    {
        Connector conn = (Connector)param;
        string connection = conn.connection;
        ResponseSocket server = conn.server;
        using (var client = new RequestSocket())
        {
            client.Connect(connection);
            client.SendFrame("Hello");

            string fromClientMessage = server.ReceiveFrameString();
            Console.WriteLine("From Client: {0}", fromClientMessage);
            server.SendFrame("Hi Back");

            string fromServerMessage = client.ReceiveFrameString();
            Console.WriteLine("From Server: {0}", fromServerMessage);

            //Console.ReadLine();
        }
    }

NetMQSockets 不是线程安全的,您正在从客户端线程内部访问服务器以获取 send/receive 数据。客户端无论如何都不应该访问服务器套接字。

首先将 Bind 移动到 while 循环之外,它只需要一次,而不是每个创建的客户端都需要。 要等待消息,请使用 NetMQPoller,它将为您处理所有其他事情,并会在收到消息后引发服务器 ReceiveReady 事件。

static void Main(string[] args) {
    string connection = "tcp://localhost:5555";
    using (var poller = new NetMQPoller()) {
        using (var server = new ResponseSocket()) {
            server.ReceiveReady += Server_ReceiveReady;
            poller.Add(server);
            poller.RunAsync();

            server.Bind(connection);

            // start 10000 clients
            for(int i = 0; i < 10000; i++) {

                ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
                Thread t = new Thread(parametrizedClientThread);
                t.Start(connection);
            }

            Console.ReadLine(); //let server run until user pressed Enter key
        }
    }
}

//server (e.Socket) is receiving data here and can answer it
private static void Server_ReceiveReady(object sender, NetMQSocketEventArgs e) {
    string fromClientMessage = e.Socket.ReceiveFrameString();
    Console.WriteLine("From Client: {0}", fromClientMessage);
    e.Socket.SendFrame("Hi Back");
}

private static void runClientSide(object param) {
    string connection = (string) param;

    using (var client = new RequestSocket()) {
        client.Connect(connection);
        client.SendFrame("Hello");

        //Removed server side code here and put it into ReceiveReady event

        string fromServerMessage = client.ReceiveFrameString();
        Console.WriteLine("From Server: {0}", fromServerMessage);
    }
}