为什么 Mono 上的 NetMQ DealerSocket 不向 Debian Wheezy 上的服务器发送消息,但 Windows 上却发送消息?

Why does NetMQ DealerSocket on Mono send no message to server on Debian Wheezy, but does on Windows?

我在 Debian Wheezy 的 Mono 4.8 上使用 NetMQ 4.0.0.1 时遇到一些问题。

Dealer socket 在我不停止调用它发送新消息之前不发送任何消息。当我将 Thread.Sleep( 1000 ) 放在创建任务之间时,一切正常。我想承认,在 .Net Framework 4.5 和 .Net Core 1.1 中,一切都在 Windows 上运行,没有任何 Thread.Sleep().

我有这样的模式:

我添加了调试消息,我可以看到我正在循环中的任务中创建 100 个 REQ 套接字,路由器正在队列中获取请求,不是通过 Dealer 发送它们,并且在 TCP 的另一端没有发生任何事情,直到我停止在 REQ 套接字上发送呼叫。每 5 个任务的简单 Thread.Sleep() 正在运行。看起来像是 Poller bug,或者 Dealer bug,或者我做错了什么。

中间框代码如下:

public class CollectorDevice : IDisposable
{
    private NetMQPoller _poller;
    private RouterSocket _frontendSocket;
    private DealerSocket _backendSocket;
    private readonly string _backEndAddress;
    private readonly string _frontEndAddress;
    private readonly int _expectedFrameCount;
    private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false);
    private readonly Thread _localThread;
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="backEndAddress"></param>
    /// <param name="frontEndAddress"></param>
    /// <param name="expectedFrameCount"></param>
    public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
    {
        _expectedFrameCount = expectedFrameCount;

        _backEndAddress = backEndAddress;
        _frontEndAddress = frontEndAddress;

        _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
    }

    public void Start()
    {
        _localThread.Start();
        _startSemaphore.WaitOne();


    }

    public void Stop()
    {
        _poller.Stop();
    }

    #region Implementation of IDisposable

    public void Dispose()
    {
        Stop();
    }

    #endregion


    #region Private Methods
    private void DoWork()
    {
        try
        {
            using (_poller = new NetMQPoller())
            using (_frontendSocket = new RouterSocket(_frontEndAddress))
            using (_backendSocket = new DealerSocket(_backEndAddress))
            {
                _backendSocket.ReceiveReady += OnBackEndReady;
                _frontendSocket.ReceiveReady += OnFrontEndReady;


                _poller.Add(_frontendSocket);
                _poller.Add(_backendSocket);

                _startSemaphore.Set();

                _poller.Run();
            }
        }
        catch (Exception e)
        {
            _logger.Error(e);
        }
    }

    private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _frontendSocket.SendMultipartMessage(message);
    }

    private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _backendSocket.SendMultipartMessage(message);
    }

    #endregion
}

这是一个客户端:

class Program
{
    private static Logger _logger = LogManager.GetCurrentClassLogger();


    private static void Main(string[] args)
    {
        Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
        Console.ReadKey();


        using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3))
        {
            collectorDevice.Start();

            List<Task> tasks = new List<Task>();
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine(i);
                int j = i;       
                Task t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        using (var req = new RequestSocket("inproc://broker"))
                        {
                            req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                            _logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                            Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));

                            string responseMessage = req.ReceiveFrameString();
                            _logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                            Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                        }
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                        _logger.Error(e);
                    }
                });
                tasks.Add(t);
                //Thread.Sleep (100);//<- This thread sleep is fixing problem?
            }

            Task.WaitAll(tasks.ToArray());
        }

    }
}

服务器端:

class Program
{
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    static void Main(string[] args)
    {
        try{
        using (var routerSocket = new RouterSocket("@tcp://*:5556"))
        {
            var poller = new NetMQPoller();
            routerSocket.ReceiveReady += RouterSocketOnReceiveReady;
            poller.Add(routerSocket);
            poller.Run();
        }
        }
        catch(Exception e) 
        {      
            Console.WriteLine (e);
        }

        Console.ReadKey ();
    }

    private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
    {
        NetMQMessage clientMessage = new NetMQMessage();
        bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
            ref clientMessage, 5);

        if (result == false)
        {
            Console.WriteLine ("Something went wrong?!");
        }

        var address = clientMessage[0];
        var address2 = clientMessage[1];
        var clientMessageString = clientMessage[3].ConvertToString();

        //_logger.Debug("Message from client received: '{0}'", clientMessageString);
        Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString));

        netMqSocketEventArgs
            .Socket.SendMoreFrame(address.Buffer)
            .SendMoreFrame(address2.Buffer)
            .SendMoreFrameEmpty()
            .SendFrame("I have received your message");
    }
}

有人知道吗?

我在想我可能正在使用来自不同线程的套接字,所以我将它打包到 ThreadLocal 结构中,但它不是 helped.Than我已经阅读了一些与 NetMQ 统一的问题所以我添加了 'AsyncIO.ForceDotNet.Force();' 在每个套接字构造函数调用之前,这也没有帮助。我已经将我的单声道从 4.4 更新到 4.8,它看起来还是一样。

我检查过任务之间的 Thread.Sleep(100) 正在修复问题。很奇怪

我测试了代码,它确实花费了很多时间,但最终服务器收到了所有消息(大约需要一分钟)。

问题是线程的数量,当有 100 个线程时,所有应该在 io 完成端口线程上完成的异步操作都会花费很多时间。我能够使用以下代码在没有 NetMQ 的情况下重现它

    public static void Main(string[] args)
    {
        ManualResetEvent resetEvent = new ManualResetEvent(false);

        List<Task> tasks = new List<Task>();

        for (int i = 0; i < 100; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                resetEvent.WaitOne();
            }));
        }

        Thread.Sleep(100);

        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        listener.Bind(new IPEndPoint(IPAddress.Any, 5556));
        listener.Listen(1);

        SocketAsyncEventArgs args1 = new SocketAsyncEventArgs();
        args1.Completed += (sender, eventArgs) =>
        {
            Console.WriteLine($"Accepted {args1.SocketError}");
            resetEvent.Set();
        };
        listener.AcceptAsync(args1);

        SocketAsyncEventArgs args2 = new SocketAsyncEventArgs();
        args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556);
        args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected");
        Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        client.ConnectAsync(args2);

        Task.WaitAll(tasks.ToArray());

        Console.WriteLine("all tasks completed");
    }

您可以看到这也需要大约一分钟的时间。只有 5 个线程,它立即完成。

无论如何你可能想要启动更少的线程and/or重新解决单声道项目中的错误。