使用 Python 多处理的 ZeroMQ 异步客户端-服务器

ZeroMQ Asynchronous Client-Server using Python multiprocessing

我正在尝试采用 ZeroMQ 异步客户端-服务器模式 here with python multiprocessing. A brief description in the ZeroMQ guide

它是 DEALER/ROUTER 用于 客户端到服务器前端的通信 DEALER/DEALER 用于服务器后端到服务器工作人员的通信。服务器 frontendbackend 使用 zmq.proxy()-instance 连接。

我不想使用线程,而是想在服务器上使用 multiprocessing。但是来自客户端的请求不会到达服务器工作人员。但是,它们确实会到达服务器前端。还有后端。但是后端无法连接到服务器工作人员。

我们通常如何调试 pyzmq 中的这些问题?
如何为套接字打开详细日志记录?

我正在使用的python代码片段-

server.py

import zmq
import time

from multiprocessing import Process


def run(context, worker_id):
    socket = context.socket(zmq.DEALER)
    socket.connect("ipc://backend.ipc")
    print(f"Worker {worker_id} started")

    try:
        while True:
            ident, msg = socket.recv_multipart()
            print("Worker received %s from %s" % (msg, "ident"))

            time.sleep(5)

            socket.send_multipart([ident, msg])
            print("Worker sent %s from %s" % (msg, ident))

    except:
        socket.close()


if __name__ == "__main__":

    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5570")

    backend = context.socket(zmq.DEALER)
    backend.bind("ipc://backend.ipc")

    N_WORKERS = 7
    jobs = []
    try:
        for worker_id in range(N_WORKERS):
            job = Process(target=run, args=(context, worker_id,))
            jobs.append(job)
            job.start()

        zmq.proxy(frontend, backend)

        for job in jobs:
            job.join()

    except:
        frontend.close()
        backend.close()
        context.term()

client.py

import re

import zmq
from uuid import uuid4


if __name__ == "__main__":
    context = zmq.Context()
    socket = context.socket(zmq.DEALER)

    identity = uuid4()
    socket.identity = identity.encode("ascii")

    socket.connect("tcp://localhost:5570")

    poll = zmq.Poller()
    poll.register(socket, zmq.POLLIN)

    request = {
        "body": "Some request body.",
    }

    socket.send_string(json.dumps(request))

    while True:
        for i in range(5):
            sockets = dict(poll.poll(10))
            if socket in sockets:
                msg = socket.recv()
                print(msg)

Q : "How to turn on verbose logging for the sockets?"

开始使用已发布的本机 API socket_monitor() 获取所有相关详细信息,报告为来自套接字的事件-(实例)-监控中。


Q : "How do we generally debug these issues in pyzmq?"

做这件事没有一般的策略。进入 的领域后,您几乎总是会创建自己的、特定于项目的工具,一次“收集”和“viewing/interpreting” - (主要) 分布式事件的有序流。


最后但同样重要的是:
避免尝试共享一个Context()实例,
"其中"8个进程

零之禅的艺术强烈主张避免任何形式和形式的分享。在这里,通过 multiprocessing.Process' 引用了同一个 Context()-实例("shared") s 进程实例化调用签名接口,它不会使进程间-“共享”起作用。

可以让每个派生的进程实例创建它自己的 Context() 实例,并在其自己的生命周期中从其私有 space 内部使用它。

顺便说一句,您的代码忽略了任何 return-codes,记录在本机 API 中,帮助您处理(在更坏的情况下 debug post-mortem )伴随着try: ... except: ... finally: 脚手架在这里也有很大帮助。

无论如何,您越早学会停止使用 { .send() | .recv() | .poll() } 方法的阻塞形式,您的代码就可以更好地开始重新使用 ZeroMQ 的实际功能。