使用 Python 多处理的 ZeroMQ 异步客户端-服务器
ZeroMQ Asynchronous Client-Server using Python multiprocessing
我正在尝试采用 ZeroMQ 异步客户端-服务器模式 here with python multiprocessing. A brief description in the ZeroMQ guide
它是 DEALER/ROUTER
用于 客户端到服务器前端的通信 和 DEALER/DEALER
用于服务器后端到服务器工作人员的通信。服务器 frontend 和 backend 使用 zmq.proxy()
-instance 连接。
我不想使用线程,而是想在服务器上使用 multiprocessing
。但是来自客户端的请求不会到达服务器工作人员。但是,它们确实会到达服务器前端。还有后端。但是后端无法连接到服务器工作人员。
我们通常如何调试 pyzmq
中的这些问题?
如何为套接字打开详细日志记录?
我正在使用的python代码片段-
server.py
import zmq
import time
from multiprocessing import Process
def run(context, worker_id):
socket = context.socket(zmq.DEALER)
socket.connect("ipc://backend.ipc")
print(f"Worker {worker_id} started")
try:
while True:
ident, msg = socket.recv_multipart()
print("Worker received %s from %s" % (msg, "ident"))
time.sleep(5)
socket.send_multipart([ident, msg])
print("Worker sent %s from %s" % (msg, ident))
except:
socket.close()
if __name__ == "__main__":
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5570")
backend = context.socket(zmq.DEALER)
backend.bind("ipc://backend.ipc")
N_WORKERS = 7
jobs = []
try:
for worker_id in range(N_WORKERS):
job = Process(target=run, args=(context, worker_id,))
jobs.append(job)
job.start()
zmq.proxy(frontend, backend)
for job in jobs:
job.join()
except:
frontend.close()
backend.close()
context.term()
client.py
import re
import zmq
from uuid import uuid4
if __name__ == "__main__":
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = uuid4()
socket.identity = identity.encode("ascii")
socket.connect("tcp://localhost:5570")
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
request = {
"body": "Some request body.",
}
socket.send_string(json.dumps(request))
while True:
for i in range(5):
sockets = dict(poll.poll(10))
if socket in sockets:
msg = socket.recv()
print(msg)
Q : "How to turn on verbose logging for the sockets?"
开始使用已发布的本机 API socket_monitor()
获取所有相关详细信息,报告为来自套接字的事件-(实例)-监控中。
Q : "How do we generally debug these issues in pyzmq
?"
做这件事没有一般的策略。进入 distributed-computing 的领域后,您几乎总是会创建自己的、特定于项目的工具,一次“收集”和“viewing/interpreting” - (主要) 分布式事件的有序流。
最后但同样重要的是:
避免尝试共享一个Context()
实例,
少"其中"8个进程
零之禅的艺术强烈主张避免任何形式和形式的分享。在这里,通过 multiprocessing.Process
' 引用了同一个 Context()
-实例("shared") s 进程实例化调用签名接口,它不会使进程间-“共享”起作用。
可以让每个派生的进程实例创建它自己的 Context()
实例,并在其自己的生命周期中从其私有 space 内部使用它。
顺便说一句,您的代码忽略了任何 return-codes,记录在本机 API 中,帮助您处理(在更坏的情况下 debug post-mortem )伴随着distributed-computing。 try: ... except: ... finally:
脚手架在这里也有很大帮助。
无论如何,您越早学会停止使用 { .send() | .recv() | .poll() }
方法的阻塞形式,您的代码就可以更好地开始重新使用 ZeroMQ 的实际功能。
我正在尝试采用 ZeroMQ 异步客户端-服务器模式 here with python multiprocessing. A brief description in the ZeroMQ guide
它是 DEALER/ROUTER
用于 客户端到服务器前端的通信 和 DEALER/DEALER
用于服务器后端到服务器工作人员的通信。服务器 frontend 和 backend 使用 zmq.proxy()
-instance 连接。
我不想使用线程,而是想在服务器上使用 multiprocessing
。但是来自客户端的请求不会到达服务器工作人员。但是,它们确实会到达服务器前端。还有后端。但是后端无法连接到服务器工作人员。
我们通常如何调试 pyzmq
中的这些问题?
如何为套接字打开详细日志记录?
我正在使用的python代码片段-
server.py
import zmq
import time
from multiprocessing import Process
def run(context, worker_id):
socket = context.socket(zmq.DEALER)
socket.connect("ipc://backend.ipc")
print(f"Worker {worker_id} started")
try:
while True:
ident, msg = socket.recv_multipart()
print("Worker received %s from %s" % (msg, "ident"))
time.sleep(5)
socket.send_multipart([ident, msg])
print("Worker sent %s from %s" % (msg, ident))
except:
socket.close()
if __name__ == "__main__":
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5570")
backend = context.socket(zmq.DEALER)
backend.bind("ipc://backend.ipc")
N_WORKERS = 7
jobs = []
try:
for worker_id in range(N_WORKERS):
job = Process(target=run, args=(context, worker_id,))
jobs.append(job)
job.start()
zmq.proxy(frontend, backend)
for job in jobs:
job.join()
except:
frontend.close()
backend.close()
context.term()
client.py
import re
import zmq
from uuid import uuid4
if __name__ == "__main__":
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = uuid4()
socket.identity = identity.encode("ascii")
socket.connect("tcp://localhost:5570")
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
request = {
"body": "Some request body.",
}
socket.send_string(json.dumps(request))
while True:
for i in range(5):
sockets = dict(poll.poll(10))
if socket in sockets:
msg = socket.recv()
print(msg)
Q : "How to turn on verbose logging for the sockets?"
开始使用已发布的本机 API socket_monitor()
获取所有相关详细信息,报告为来自套接字的事件-(实例)-监控中。
Q : "How do we generally debug these issues in
pyzmq
?"
做这件事没有一般的策略。进入 distributed-computing 的领域后,您几乎总是会创建自己的、特定于项目的工具,一次“收集”和“viewing/interpreting” - (主要) 分布式事件的有序流。
最后但同样重要的是:
避免尝试共享一个Context()
实例,
少"其中"8个进程
零之禅的艺术强烈主张避免任何形式和形式的分享。在这里,通过 multiprocessing.Process
' 引用了同一个 Context()
-实例("shared") s 进程实例化调用签名接口,它不会使进程间-“共享”起作用。
可以让每个派生的进程实例创建它自己的 Context()
实例,并在其自己的生命周期中从其私有 space 内部使用它。
顺便说一句,您的代码忽略了任何 return-codes,记录在本机 API 中,帮助您处理(在更坏的情况下 debug post-mortem )伴随着distributed-computing。 try: ... except: ... finally:
脚手架在这里也有很大帮助。
无论如何,您越早学会停止使用 { .send() | .recv() | .poll() }
方法的阻塞形式,您的代码就可以更好地开始重新使用 ZeroMQ 的实际功能。