ZeroMQ ROUTER-DEALER 重定向到另一个 ROUTER-DEALER 问题

ZeroMQ ROUTER-DEALER redirect to another ROUTER-DEALER issue

我有一个名为 vops-server.com 的静态 DNS,它指向我的 Dell PowerEdge 2950。在这个 2950 上,我 运行 最初绑定到 tcp://0.0.0.0:7777server.py,并且随后将任何传入的客户端重定向到绑定到大于 7777 端口的套接字,其中第一个是 7778。这个新产生的 ROUTER-DEALER 对简单地回显 "Hello, world!".

Dell PowerEdge 2950 运行 Ubuntu 16.04.1 LTS 并且无法正确 运行 代码,在键盘中断时输出以下内容:

k▒Eg
Listening on port 7778
Recv
Resource temporarily unavailable
Client on port 7778 disconnected
^CProcess Process-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "test_server.py", line 76, in worker_task
    data = worker_socket.recv_multipart()
  File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 395, in recv_multipart
    parts = [self.recv(flags, copy=copy, track=track)]
  File "zmq/backend/cython/socket.pyx", line 693, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7283)
  File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7081)
  File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:2033)
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7522)
    PyErr_CheckSignals()
KeyboardInterrupt

客户端输出如下:

Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv

此时客户端应该输出最后一行[Host] Hello, world!,但它在等待接收消息时挂起。

现在,在运行 Windows 10 Home 1511 的笔记本电脑上,输出符合预期:

 Ç  )
Listening on port 7778
Recv
['\x00\x80\x00\x00)', 'Hello, world!']
Sent

客户端现在可以正确输出:

Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
[Host] Hello, world!

请查看下面的代码。

server.py:

import sys
import zmq
from multiprocessing import Process, Queue, Array, Value
import time

def server_task():
    port_base = 7777

    server_context = zmq.Context.instance()

    server_socket = server_context.socket(zmq.ROUTER)

    server_socket.bind("tcp://0.0.0.0:%d" % (port_base, ))

    timeout_queue = Queue()
    port_list = [ 1 ]

    proc_list = [ ]

    while True:
        try:
            client_id = server_socket.recv_multipart()[0]

            print(client_id)

            # Get an unused port from the list
            # Ports from clients that have timed out are recycled here

            while not timeout_queue.empty():
                port_list.append(timeout_queue.get())

            port_offset = port_list.pop()

            if len(port_list) == 0:
                port_list.append(port_offset + 1)

            # Spawn a new worker task, binding the port to a socket

            proc_running = Value("b", True)

            proc_list.append(proc_running)

            Process(target=worker_task, args=(proc_running, port_base, port_offset, timeout_queue)).start()

            # Send the new port to the client

            server_socket.send_multipart([client_id, str(port_base + port_offset)])

        except KeyboardInterrupt:
            break

    for proc_running in proc_list:
        proc_running.value = False

    server_socket.close()
    server_context.term()

def worker_task(proc_running, port_base, port_offset, timeout_queue):
    port = port_base + port_offset

    print("Listening on port %d" % (port, ))

    worker_context = zmq.Context.instance()

    worker_socket = worker_context.socket(zmq.ROUTER)

    worker_socket.setsockopt(zmq.RCVTIMEO, 5000)
    worker_socket.bind("tcp://0.0.0.0:%d" % (port, ))

    while proc_running.value:
        try:
            print("Recv")
            data = worker_socket.recv_multipart()

            print(data)
            worker_socket.send_multipart(data)

            print("Sent")
        except zmq.ZMQError as e:
            print(e)
            break

    print("Client on port %d disconnected" % (port, ))

    timeout_queue.put(port_offset)

    worker_socket.close()
    worker_context.term()

if __name__ == "__main__":
    server_task()

client.py:

import os
import io
import time
import zmq
from multiprocessing import Process

def connect_to_host(context, host, port, timeout):
    address = ("tcp://%s:%s" % (host, port))

    socket = context.socket(zmq.DEALER)

    socket.setsockopt(zmq.RCVTIMEO, timeout)
    socket.connect(address)

    print("Connecting to distribution server %s" % (address, ))

    while True:
        try:
            print("[Dist] Send")
            socket.send_multipart([str(0)])
            print("[Dist] Recv")
            port = socket.recv_multipart()[0]
            print("[Dist] %s" % (port, ))

            break
        except zmq.Again:
            socket.close()

            socket = context.socket(zmq.DEALER)

            socket.setsockopt(zmq.RCVTIMEO, timeout)
            socket.connect(address)

            print("Connecting to distribution server %s" % (address, ))

    socket.close()

    address = ("tcp://%s:%s" % (host, port))

    socket = context.socket(zmq.DEALER)

    socket.setsockopt(zmq.RCVTIMEO, timeout)
    socket.connect(address)

    print("Connecting to host %s" % (address, ))

    return socket

def client_task(client_type):
    timeout = 5000
    host = "vops-server.com"
    port = "7777"

    context = zmq.Context().instance()

    socket = connect_to_host(context, host, port, timeout)

    while True:
        try:
            try:
                print("[Host] Send")
                socket.send_multipart(["Hello, world!"])

                print("[Host] Recv")
                data = socket.recv_multipart()[0]

                print("[Host] %s" % (data, ))
            except zmq.Again:
                socket.close()

                socket = connect_to_host(context, host, port, timeout)

        except KeyboardInterrupt:
            break

    print("Connection terminated")

    socket.close()
    context.term()

if __name__ == "__main__":
    client_task(0)

我无法理解为什么相同的代码在一台机器上运行而在另一台机器上运行不正常;我考虑过在 Dell PowerEdge 2950 上安装 Windows Server 2012,我希望错误出在 OS 而不是硬件本身。现在我等待并希望某个地方的专家能解决我的问题。

相同的代码在 Dell PowerEdge 2950 运行 Windows Server 2012 R2 上运行良好。 ZMQ 似乎与 Ubuntu 有问题。