Zeromq(pyzmq) ROUTER处理多个客户端的数据及后续超时处理
Zeromq (pyzmq) ROUTER procession of multiple clients' data and subsequent timeout handling
我有一个ROUTER
,其目的是从多个DEALER
客户端收集图像数据,并对完整图像执行OCR。我发现处理 OCR 最有效的方法是利用 Python 的多处理库;累积的图像字节 put
变成 Queue
,以便在单独的 Process
中进行适当处理。但是,我需要确保当客户端遇到超时时 Process
已正确终止并且不会毫无意义地逗留和占用资源。
在我当前的解决方案中,我将每个新连接的客户端插入到一个 dict
中,其中 value
是我的 ClientHandler
class,它拥有所有图像数据并生成一个Thread
在 5 秒后将名为 "timeout" 的 boolean
变量设置为 True
。如果在 5 秒帧内接收到新消息,将调用 bump
并将计时器重置回 0,否则我会在线程终止之前进行清理,并从 main 中的 dict
中删除引用循环:
import threading
import time
import zmq
class ClientHandler(threading.Thread):
def __init__(self, socket):
self.elapsed = time.time()
self.timeout = False
self.socket = socket
super(ClientHandler, self).__init__()
def run(self):
while time.time() - self.elapsed < 5.0:
pass
self.timeout = True
# CLIENT TIMED OUT
# HANDLE TERMINATION AND CLEAN UP HERE
def bump(self):
self.elapsed = time.time()
def handle(self, id, header, data):
# HANDLE CLIENT DATA HERE
# ACCUMULATE IMAGE BYTES, ETC
self.socket.send_multipart([id, str(0)])
def server_task():
clients = dict()
context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.setsockopt(zmq.RCVTIMEO, 0)
server.bind("tcp://127.0.0.1:7777")
while True:
try:
id, header, data = server.recv_multipart()
client = clients.get(id)
if client == None:
client = clients[id] = ClientHandler(server)
client.start()
client.bump()
client.handle(id, header, data)
except zmq.Again:
for id in clients.keys():
if clients[id].timeout:
del clients[id]
context.term()
if __name__ == "__main__":
server_task()
但是整个方法感觉不对。我这样做不当吗?如果是这样,如果有人能指出正确的方向,我将不胜感激。
自己想出来的,希望对其他人有帮助。
我在分配的端口上有一个 ROUTER,它向每个客户端分配唯一的端口,然后连接到所述唯一端口上新绑定的套接字。当客户端断开连接时,端口将被回收以重新分配。
import sys
import zmq
from multiprocessing import Process, Queue, Value
def server_task():
context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.bind("tcp://127.0.0.1:7777")
timeout_queue = Queue()
port_list = [ 1 ]
proc_list = [ ]
while True:
try:
id = server.recv_multipart()[0]
# Get an unused port from the list
# Ports from clients that have timed out are recycled here
while not timeout_queue.empty():
port_list.append(timeout_queue.get())
port = port_list.pop()
if len(port_list) == 0:
port_list.append(port + 1)
# Spawn a new worker task, binding the port to a socket
proc_running = Value("b", True)
proc_list.append(proc_running)
Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()
# Send the new port to the client
server.send_multipart([id, str(7777 + port)])
except KeyboardInterrupt:
break
# Safely allow our worker processes to terminate
for proc_running in proc_list:
proc_running.value = False
context.term()
def worker_task(proc_running, port, timeout_queue):
context = zmq.Context.instance()
worker = context.socket(zmq.ROUTER)
worker.setsockopt(zmq.RCVTIMEO, 5000)
worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))
while proc_running.value:
try:
id, data = worker.recv_multipart()
worker.send_multipart([id, data])
except zmq.Again:
timeout_queue.put(port)
context.term()
break
print("Client on port %d disconnected" % (7777 + port, ))
我有一个ROUTER
,其目的是从多个DEALER
客户端收集图像数据,并对完整图像执行OCR。我发现处理 OCR 最有效的方法是利用 Python 的多处理库;累积的图像字节 put
变成 Queue
,以便在单独的 Process
中进行适当处理。但是,我需要确保当客户端遇到超时时 Process
已正确终止并且不会毫无意义地逗留和占用资源。
在我当前的解决方案中,我将每个新连接的客户端插入到一个 dict
中,其中 value
是我的 ClientHandler
class,它拥有所有图像数据并生成一个Thread
在 5 秒后将名为 "timeout" 的 boolean
变量设置为 True
。如果在 5 秒帧内接收到新消息,将调用 bump
并将计时器重置回 0,否则我会在线程终止之前进行清理,并从 main 中的 dict
中删除引用循环:
import threading
import time
import zmq
class ClientHandler(threading.Thread):
def __init__(self, socket):
self.elapsed = time.time()
self.timeout = False
self.socket = socket
super(ClientHandler, self).__init__()
def run(self):
while time.time() - self.elapsed < 5.0:
pass
self.timeout = True
# CLIENT TIMED OUT
# HANDLE TERMINATION AND CLEAN UP HERE
def bump(self):
self.elapsed = time.time()
def handle(self, id, header, data):
# HANDLE CLIENT DATA HERE
# ACCUMULATE IMAGE BYTES, ETC
self.socket.send_multipart([id, str(0)])
def server_task():
clients = dict()
context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.setsockopt(zmq.RCVTIMEO, 0)
server.bind("tcp://127.0.0.1:7777")
while True:
try:
id, header, data = server.recv_multipart()
client = clients.get(id)
if client == None:
client = clients[id] = ClientHandler(server)
client.start()
client.bump()
client.handle(id, header, data)
except zmq.Again:
for id in clients.keys():
if clients[id].timeout:
del clients[id]
context.term()
if __name__ == "__main__":
server_task()
但是整个方法感觉不对。我这样做不当吗?如果是这样,如果有人能指出正确的方向,我将不胜感激。
自己想出来的,希望对其他人有帮助。
我在分配的端口上有一个 ROUTER,它向每个客户端分配唯一的端口,然后连接到所述唯一端口上新绑定的套接字。当客户端断开连接时,端口将被回收以重新分配。
import sys
import zmq
from multiprocessing import Process, Queue, Value
def server_task():
context = zmq.Context.instance()
server = context.socket(zmq.ROUTER)
server.bind("tcp://127.0.0.1:7777")
timeout_queue = Queue()
port_list = [ 1 ]
proc_list = [ ]
while True:
try:
id = server.recv_multipart()[0]
# Get an unused port from the list
# Ports from clients that have timed out are recycled here
while not timeout_queue.empty():
port_list.append(timeout_queue.get())
port = port_list.pop()
if len(port_list) == 0:
port_list.append(port + 1)
# Spawn a new worker task, binding the port to a socket
proc_running = Value("b", True)
proc_list.append(proc_running)
Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()
# Send the new port to the client
server.send_multipart([id, str(7777 + port)])
except KeyboardInterrupt:
break
# Safely allow our worker processes to terminate
for proc_running in proc_list:
proc_running.value = False
context.term()
def worker_task(proc_running, port, timeout_queue):
context = zmq.Context.instance()
worker = context.socket(zmq.ROUTER)
worker.setsockopt(zmq.RCVTIMEO, 5000)
worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))
while proc_running.value:
try:
id, data = worker.recv_multipart()
worker.send_multipart([id, data])
except zmq.Again:
timeout_queue.put(port)
context.term()
break
print("Client on port %d disconnected" % (7777 + port, ))