Zeromq(pyzmq) ROUTER处理多个客户端的数据及后续超时处理

Zeromq (pyzmq) ROUTER procession of multiple clients' data and subsequent timeout handling

我有一个ROUTER,其目的是从多个DEALER客户端收集图像数据,并对完整图像执行OCR。我发现处理 OCR 最有效的方法是利用 Python 的多处理库;累积的图像字节 put 变成 Queue,以便在单独的 Process 中进行适当处理。但是,我需要确保当客户端遇到超时时 Process 已正确终止并且不会毫无意义地逗留和占用资源。

在我当前的解决方案中,我将每个新连接的客户端插入到一个 dict 中,其中 value 是我的 ClientHandler class,它拥有所有图像数据并生成一个Thread 在 5 秒后将名为 "timeout" 的 boolean 变量设置为 True。如果在 5 秒帧内接收到新消息,将调用 bump 并将计时器重置回 0,否则我会在线程终止之前进行清理,并从 main 中的 dict 中删除引用循环:

import threading
import time
import zmq

class ClientHandler(threading.Thread):
    def __init__(self, socket):
        self.elapsed = time.time()
        self.timeout = False

        self.socket = socket

        super(ClientHandler, self).__init__()

    def run(self):
        while time.time() - self.elapsed < 5.0:
            pass

        self.timeout = True

        # CLIENT TIMED OUT
        # HANDLE TERMINATION AND CLEAN UP HERE

    def bump(self):
        self.elapsed = time.time()

    def handle(self, id, header, data):
        # HANDLE CLIENT DATA HERE
        # ACCUMULATE IMAGE BYTES, ETC

        self.socket.send_multipart([id, str(0)])

def server_task():
    clients = dict()

    context = zmq.Context.instance()
    server = context.socket(zmq.ROUTER)

    server.setsockopt(zmq.RCVTIMEO, 0)

    server.bind("tcp://127.0.0.1:7777")

    while True:
        try:
            id, header, data = server.recv_multipart()

            client = clients.get(id)

            if client == None:
                client = clients[id] = ClientHandler(server)

                client.start()

            client.bump()
            client.handle(id, header, data)
        except zmq.Again:
            for id in clients.keys():
                if clients[id].timeout:
                    del clients[id]

    context.term()

if __name__ == "__main__":
    server_task()

但是整个方法感觉不对。我这样做不当吗?如果是这样,如果有人能指出正确的方向,我将不胜感激。

自己想出来的,希望对其他人有帮助。

我在分配的端口上有一个 ROUTER,它向每个客户端分配唯一的端口,然后连接到所述唯一端口上新绑定的套接字。当客户端断开连接时,端口将被回收以重新分配。

import sys
import zmq
from multiprocessing import Process, Queue, Value

def server_task():
    context = zmq.Context.instance()

    server = context.socket(zmq.ROUTER)

    server.bind("tcp://127.0.0.1:7777")

    timeout_queue = Queue()
    port_list = [ 1 ]

    proc_list = [ ]

    while True:
        try:
            id = server.recv_multipart()[0]

            # Get an unused port from the list
            # Ports from clients that have timed out are recycled here

            while not timeout_queue.empty():
                port_list.append(timeout_queue.get())

            port = port_list.pop()

            if len(port_list) == 0:
                port_list.append(port + 1)

            # Spawn a new worker task, binding the port to a socket

            proc_running = Value("b", True)

            proc_list.append(proc_running)

            Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()

            # Send the new port to the client

            server.send_multipart([id, str(7777 + port)])

        except KeyboardInterrupt:
            break

    # Safely allow our worker processes to terminate
    for proc_running in proc_list:
        proc_running.value = False

    context.term()

def worker_task(proc_running, port, timeout_queue):
    context = zmq.Context.instance()

    worker = context.socket(zmq.ROUTER)

    worker.setsockopt(zmq.RCVTIMEO, 5000)
    worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))

    while proc_running.value:
        try:
            id, data = worker.recv_multipart()

            worker.send_multipart([id, data])
        except zmq.Again:
            timeout_queue.put(port)

            context.term()

            break

    print("Client on port %d disconnected" % (7777 + port, ))