ZeroMQ ROUTER-DEALER 重定向到另一个 ROUTER-DEALER 问题
ZeroMQ ROUTER-DEALER redirect to another ROUTER-DEALER issue
我有一个名为 vops-server.com
的静态 DNS,它指向我的 Dell PowerEdge 2950。在这个 2950 上,我 运行 最初绑定到 tcp://0.0.0.0:7777
的 server.py
,并且随后将任何传入的客户端重定向到绑定到大于 7777
端口的套接字,其中第一个是 7778
。这个新产生的 ROUTER-DEALER
对简单地回显 "Hello, world!"
.
Dell PowerEdge 2950 运行 Ubuntu 16.04.1 LTS
并且无法正确 运行 代码,在键盘中断时输出以下内容:
k▒Eg
Listening on port 7778
Recv
Resource temporarily unavailable
Client on port 7778 disconnected
^CProcess Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "test_server.py", line 76, in worker_task
data = worker_socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 395, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 693, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7283)
File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7081)
File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:2033)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7522)
PyErr_CheckSignals()
KeyboardInterrupt
客户端输出如下:
Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
此时客户端应该输出最后一行[Host] Hello, world!
,但它在等待接收消息时挂起。
现在,在运行 Windows 10 Home 1511
的笔记本电脑上,输出符合预期:
Ç )
Listening on port 7778
Recv
['\x00\x80\x00\x00)', 'Hello, world!']
Sent
客户端现在可以正确输出:
Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
[Host] Hello, world!
请查看下面的代码。
server.py:
import sys
import zmq
from multiprocessing import Process, Queue, Array, Value
import time
def server_task():
port_base = 7777
server_context = zmq.Context.instance()
server_socket = server_context.socket(zmq.ROUTER)
server_socket.bind("tcp://0.0.0.0:%d" % (port_base, ))
timeout_queue = Queue()
port_list = [ 1 ]
proc_list = [ ]
while True:
try:
client_id = server_socket.recv_multipart()[0]
print(client_id)
# Get an unused port from the list
# Ports from clients that have timed out are recycled here
while not timeout_queue.empty():
port_list.append(timeout_queue.get())
port_offset = port_list.pop()
if len(port_list) == 0:
port_list.append(port_offset + 1)
# Spawn a new worker task, binding the port to a socket
proc_running = Value("b", True)
proc_list.append(proc_running)
Process(target=worker_task, args=(proc_running, port_base, port_offset, timeout_queue)).start()
# Send the new port to the client
server_socket.send_multipart([client_id, str(port_base + port_offset)])
except KeyboardInterrupt:
break
for proc_running in proc_list:
proc_running.value = False
server_socket.close()
server_context.term()
def worker_task(proc_running, port_base, port_offset, timeout_queue):
port = port_base + port_offset
print("Listening on port %d" % (port, ))
worker_context = zmq.Context.instance()
worker_socket = worker_context.socket(zmq.ROUTER)
worker_socket.setsockopt(zmq.RCVTIMEO, 5000)
worker_socket.bind("tcp://0.0.0.0:%d" % (port, ))
while proc_running.value:
try:
print("Recv")
data = worker_socket.recv_multipart()
print(data)
worker_socket.send_multipart(data)
print("Sent")
except zmq.ZMQError as e:
print(e)
break
print("Client on port %d disconnected" % (port, ))
timeout_queue.put(port_offset)
worker_socket.close()
worker_context.term()
if __name__ == "__main__":
server_task()
client.py:
import os
import io
import time
import zmq
from multiprocessing import Process
def connect_to_host(context, host, port, timeout):
address = ("tcp://%s:%s" % (host, port))
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to distribution server %s" % (address, ))
while True:
try:
print("[Dist] Send")
socket.send_multipart([str(0)])
print("[Dist] Recv")
port = socket.recv_multipart()[0]
print("[Dist] %s" % (port, ))
break
except zmq.Again:
socket.close()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to distribution server %s" % (address, ))
socket.close()
address = ("tcp://%s:%s" % (host, port))
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to host %s" % (address, ))
return socket
def client_task(client_type):
timeout = 5000
host = "vops-server.com"
port = "7777"
context = zmq.Context().instance()
socket = connect_to_host(context, host, port, timeout)
while True:
try:
try:
print("[Host] Send")
socket.send_multipart(["Hello, world!"])
print("[Host] Recv")
data = socket.recv_multipart()[0]
print("[Host] %s" % (data, ))
except zmq.Again:
socket.close()
socket = connect_to_host(context, host, port, timeout)
except KeyboardInterrupt:
break
print("Connection terminated")
socket.close()
context.term()
if __name__ == "__main__":
client_task(0)
我无法理解为什么相同的代码在一台机器上运行而在另一台机器上运行不正常;我考虑过在 Dell PowerEdge 2950 上安装 Windows Server 2012,我希望错误出在 OS 而不是硬件本身。现在我等待并希望某个地方的专家能解决我的问题。
相同的代码在 Dell PowerEdge 2950 运行 Windows Server 2012 R2 上运行良好。 ZMQ 似乎与 Ubuntu 有问题。
我有一个名为 vops-server.com
的静态 DNS,它指向我的 Dell PowerEdge 2950。在这个 2950 上,我 运行 最初绑定到 tcp://0.0.0.0:7777
的 server.py
,并且随后将任何传入的客户端重定向到绑定到大于 7777
端口的套接字,其中第一个是 7778
。这个新产生的 ROUTER-DEALER
对简单地回显 "Hello, world!"
.
Dell PowerEdge 2950 运行 Ubuntu 16.04.1 LTS
并且无法正确 运行 代码,在键盘中断时输出以下内容:
k▒Eg
Listening on port 7778
Recv
Resource temporarily unavailable
Client on port 7778 disconnected
^CProcess Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "test_server.py", line 76, in worker_task
data = worker_socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 395, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 693, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7283)
File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7081)
File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:2033)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7522)
PyErr_CheckSignals()
KeyboardInterrupt
客户端输出如下:
Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
此时客户端应该输出最后一行[Host] Hello, world!
,但它在等待接收消息时挂起。
现在,在运行 Windows 10 Home 1511
的笔记本电脑上,输出符合预期:
Ç )
Listening on port 7778
Recv
['\x00\x80\x00\x00)', 'Hello, world!']
Sent
客户端现在可以正确输出:
Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
[Host] Hello, world!
请查看下面的代码。
server.py:
import sys
import zmq
from multiprocessing import Process, Queue, Array, Value
import time
def server_task():
port_base = 7777
server_context = zmq.Context.instance()
server_socket = server_context.socket(zmq.ROUTER)
server_socket.bind("tcp://0.0.0.0:%d" % (port_base, ))
timeout_queue = Queue()
port_list = [ 1 ]
proc_list = [ ]
while True:
try:
client_id = server_socket.recv_multipart()[0]
print(client_id)
# Get an unused port from the list
# Ports from clients that have timed out are recycled here
while not timeout_queue.empty():
port_list.append(timeout_queue.get())
port_offset = port_list.pop()
if len(port_list) == 0:
port_list.append(port_offset + 1)
# Spawn a new worker task, binding the port to a socket
proc_running = Value("b", True)
proc_list.append(proc_running)
Process(target=worker_task, args=(proc_running, port_base, port_offset, timeout_queue)).start()
# Send the new port to the client
server_socket.send_multipart([client_id, str(port_base + port_offset)])
except KeyboardInterrupt:
break
for proc_running in proc_list:
proc_running.value = False
server_socket.close()
server_context.term()
def worker_task(proc_running, port_base, port_offset, timeout_queue):
port = port_base + port_offset
print("Listening on port %d" % (port, ))
worker_context = zmq.Context.instance()
worker_socket = worker_context.socket(zmq.ROUTER)
worker_socket.setsockopt(zmq.RCVTIMEO, 5000)
worker_socket.bind("tcp://0.0.0.0:%d" % (port, ))
while proc_running.value:
try:
print("Recv")
data = worker_socket.recv_multipart()
print(data)
worker_socket.send_multipart(data)
print("Sent")
except zmq.ZMQError as e:
print(e)
break
print("Client on port %d disconnected" % (port, ))
timeout_queue.put(port_offset)
worker_socket.close()
worker_context.term()
if __name__ == "__main__":
server_task()
client.py:
import os
import io
import time
import zmq
from multiprocessing import Process
def connect_to_host(context, host, port, timeout):
address = ("tcp://%s:%s" % (host, port))
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to distribution server %s" % (address, ))
while True:
try:
print("[Dist] Send")
socket.send_multipart([str(0)])
print("[Dist] Recv")
port = socket.recv_multipart()[0]
print("[Dist] %s" % (port, ))
break
except zmq.Again:
socket.close()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to distribution server %s" % (address, ))
socket.close()
address = ("tcp://%s:%s" % (host, port))
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to host %s" % (address, ))
return socket
def client_task(client_type):
timeout = 5000
host = "vops-server.com"
port = "7777"
context = zmq.Context().instance()
socket = connect_to_host(context, host, port, timeout)
while True:
try:
try:
print("[Host] Send")
socket.send_multipart(["Hello, world!"])
print("[Host] Recv")
data = socket.recv_multipart()[0]
print("[Host] %s" % (data, ))
except zmq.Again:
socket.close()
socket = connect_to_host(context, host, port, timeout)
except KeyboardInterrupt:
break
print("Connection terminated")
socket.close()
context.term()
if __name__ == "__main__":
client_task(0)
我无法理解为什么相同的代码在一台机器上运行而在另一台机器上运行不正常;我考虑过在 Dell PowerEdge 2950 上安装 Windows Server 2012,我希望错误出在 OS 而不是硬件本身。现在我等待并希望某个地方的专家能解决我的问题。
相同的代码在 Dell PowerEdge 2950 运行 Windows Server 2012 R2 上运行良好。 ZMQ 似乎与 Ubuntu 有问题。