多核 ZeroMQ?

Multi core ZeroMQ?

ZeroMQ 用于接收输入参数..

def server():
    rep = context.socket(zmq.REP)
    rep.bind('tcp://*:{}'.format(PORT))

    while True:
        data = rep.recv_json()
        result = calculate(data)
        rep.send_json(result)

计算方法调用calculate,完成后result通过ZMQ发送给客户端

根据我的测试,它目前只使用机器的1个核心,现在我想使用其他核心。我读过一些关于 multiprocessingmultithreading 的文档,但它们主要关注固定输入,这不是我的情况。

所以我现在需要一些善意的帮助..

以下是如何使用 multiprocessing 让多个工作进程处理并发客户端连接:

import zmq
from multiprocessing import Process

def calculate(data):
    return {"output": data['ok']}

def handle_request(url):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect(url)
    while True:
        data = socket.recv_json()
        print("received {}".format(data))
        out = calculate(data)
        socket.send_json(out)

def server():
    context = zmq.Context()
    # Set up socket for clients to connect to.
    clients = context.socket(zmq.ROUTER)
    clients.bind('tcp://*:{}'.format(5556))

    # Set up ipc socket for workers to connect to
    url_worker = 'ipc:///tmp/workers'
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Start 4 worker processes    
    for _ in range(4):
        p = Process(target=handle_request, args=(url_worker,))
        p.start()

    # Forward requests from clients to the workers via a Queue
    zmq.device(zmq.QUEUE, clients, workers)


if __name__ == "__main__":
    server()

现在,如果您将示例客户端指向它:

import zmq
from threading import Thread

def send_req(request):
    context = zmq.Context()

    print("Connecting to hello world server...")
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5556")
    print("Sending request %s ..." % request)

    socket.send_json({"ok" : "Hello"})

    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

#  Do 10 requests in parallel
for request in range(10):
    Thread(target=send_req, args=(request,)).start()

您得到以下输出:

Connecting to hello world server...
Sending request 0 ...
Connecting to hello world server...
Sending request 1 ...
Connecting to hello world server...
Sending request 2 ...
Connecting to hello world server...
Sending request 3 ...
Connecting to hello world server...
Sending request 4 ...
Connecting to hello world server...
Sending request 5 ...
Connecting to hello world server...
Sending request 6 ...
Connecting to hello world server...
Sending request 7 ...
Connecting to hello world server...
Sending request 8 ...
Connecting to hello world server...
Sending request 9 ...
<5 second delay>
Received reply 0 [ {"output":"Hello"} ]
Received reply 1 [ {"output":"Hello"} ]
 Received reply 3 [ {"output":"Hello"} ]
Received reply 2 [ {"output":"Hello"} ]
<5 second delay>
Received reply 4 [ {"output":"Hello"} ]
Received reply 5 [ {"output":"Hello"} ]
 Received reply 6 [ {"output":"Hello"} ]
Received reply 7 [ {"output":"Hello"} ]
< 5 second delay>
Received reply 8 [ {"output":"Hello"} ]
Received reply 9 [ {"output":"Hello"} ]