ZMQ:REQ/REP 因多个并发请求和轮询而失败
ZMQ: REQ/REP fails with multiple concurrent requests and polling
我 运行 遇到 ZeroMQ 的奇怪行为,我现在一直在尝试调试一整天。
这是重现问题的最小示例脚本。它可以是 运行 和 Python3.
一台带有REP套接字的服务器启动,五个带有REP套接字的客户端基本上同时连接到它。结果是服务器在前几条消息之后由于某种原因开始阻塞。似乎 poller.poll(1000)
是无限期阻塞的。
这种行为似乎也与时间有关。在启动客户端的循环中插入一个 sleep(0.1)
,它按预期工作。
我原以为 REP 套接字会将所有传入消息排队,并通过 sock.recv_multipart()
.
一个接一个地释放它们
这里发生了什么?
import logging
from threading import Thread
from time import sleep
import zmq
logging.basicConfig(level=logging.INFO)
PORT = "3446"
stop_flag = False
def server():
logging.info("started server")
context = zmq.Context()
sock = context.socket(zmq.REP)
sock.bind("tcp://*:" + PORT)
logging.info("bound server")
poller = zmq.Poller()
poller.register(sock, zmq.POLLIN)
while not stop_flag:
socks = dict(poller.poll(1000))
if socks.get(sock) == zmq.POLLIN:
request = sock.recv_multipart()
logging.info("received %s", request)
# sleep(0.5)
sock.send_multipart(["reply".encode()] + request)
sock.close()
def client(name:str):
context = zmq.Context()
sock = context.socket(zmq.REQ)
sock.connect("tcp://localhost:" + PORT)
sock.send_multipart([name.encode()])
logging.info(sock.recv_multipart())
sock.close()
logging.info("starting server")
server_thread = Thread(target=server)
server_thread.start()
sleep(1)
nr_of_clients = 5
for i in range(nr_of_clients):
Thread(target=client, args=[str(i)]).start()
stop_flag = True
对我来说,问题似乎是在所有客户端收到回复之前,您是 "shutting down" 服务器。所以我猜不是服务器在阻塞而是客户端在阻塞。
您可以通过在设置 stop_flag
:
之前等待一段时间来解决此问题
sleep(5)
stop_flag = True
或者,更好的是,您明确加入客户端线程,例如:
nr_of_clients = 5
threads = []
for i in range(nr_of_clients):
thread = Thread(target=client, args=[str(i)])
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
stop_flag = True
我 运行 遇到 ZeroMQ 的奇怪行为,我现在一直在尝试调试一整天。
这是重现问题的最小示例脚本。它可以是 运行 和 Python3.
一台带有REP套接字的服务器启动,五个带有REP套接字的客户端基本上同时连接到它。结果是服务器在前几条消息之后由于某种原因开始阻塞。似乎 poller.poll(1000)
是无限期阻塞的。
这种行为似乎也与时间有关。在启动客户端的循环中插入一个 sleep(0.1)
,它按预期工作。
我原以为 REP 套接字会将所有传入消息排队,并通过 sock.recv_multipart()
.
这里发生了什么?
import logging
from threading import Thread
from time import sleep
import zmq
logging.basicConfig(level=logging.INFO)
PORT = "3446"
stop_flag = False
def server():
logging.info("started server")
context = zmq.Context()
sock = context.socket(zmq.REP)
sock.bind("tcp://*:" + PORT)
logging.info("bound server")
poller = zmq.Poller()
poller.register(sock, zmq.POLLIN)
while not stop_flag:
socks = dict(poller.poll(1000))
if socks.get(sock) == zmq.POLLIN:
request = sock.recv_multipart()
logging.info("received %s", request)
# sleep(0.5)
sock.send_multipart(["reply".encode()] + request)
sock.close()
def client(name:str):
context = zmq.Context()
sock = context.socket(zmq.REQ)
sock.connect("tcp://localhost:" + PORT)
sock.send_multipart([name.encode()])
logging.info(sock.recv_multipart())
sock.close()
logging.info("starting server")
server_thread = Thread(target=server)
server_thread.start()
sleep(1)
nr_of_clients = 5
for i in range(nr_of_clients):
Thread(target=client, args=[str(i)]).start()
stop_flag = True
对我来说,问题似乎是在所有客户端收到回复之前,您是 "shutting down" 服务器。所以我猜不是服务器在阻塞而是客户端在阻塞。
您可以通过在设置 stop_flag
:
sleep(5)
stop_flag = True
或者,更好的是,您明确加入客户端线程,例如:
nr_of_clients = 5
threads = []
for i in range(nr_of_clients):
thread = Thread(target=client, args=[str(i)])
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
stop_flag = True