ZMQ client-worker 通信模式

ZMQ client-worker communication pattern

通读 ZeroMQ 文档,发现这三种套接字组合时,我有点迷茫。他们是:

我了解到 DEALER 和 ROUTER 是同步 REQ/REP 通信的替代品,因此它们变为异步并且可以连接多个节点。我不明白的是,DEALER 如何替代 DEALER 到 DEALER 中的 REQ 和 REP(以及 ROUTER 到 ROUTER 中的路由器)。

我正在寻找一种模式,允许任意数量的客户端将作业提交给任意数量的工作进程(通过负载平衡)处理这些作业并将响应(和中间结果)返回给客户端(异步,但发送多条消息返回)。客户可能还需要能够提前终止工作。我发现 documentation 在这方面有点浅薄(我无论如何都不是专家,可能错过了相关部分)。

我很乐意自己解决细节问题,但每次我认为找到合适的模式时,我都会发现另一个可能同样合适的模式(例如,这 3 种模式在我看来同样合适:http://zguide.zeromq.org/page:all#ROUTER-Broker-and-REQ-Workers, http://zguide.zeromq.org/page:all#ROUTER-Broker-and-DEALER-Workers, http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker).

任何有关结构(哪个套接字与哪个组件通信)的建议都将不胜感激。

更新

这是我到目前为止想出的:

import multiprocessing
import zmq
import time

router_url_b = 'tcp://*:5560'
router_url = 'tcp://localhost:5560'

dealer_url_b = 'tcp://*:5561'
dealer_url = 'tcp://localhost:5561'


def broker():
    context = zmq.Context()
    router = context.socket(zmq.ROUTER)
    router.bind(router_url_b)

    dealer = context.socket(zmq.DEALER)
    dealer.bind(dealer_url_b)

    poll = zmq.Poller()
    poll.register(router, zmq.POLLIN)
    poll.register(dealer, zmq.POLLIN)

    while True:
        poll_result = dict(poll.poll())
        if router in poll_result:
            ident, msg = router.recv_multipart()
            print 'router: ident=%s, msg=%s' % (ident, msg)
            # print 'router received "%s" and ident %s' % (msg, ident)
            dealer.send_multipart([ident, msg])
            # dealer.send(msg)
        if dealer in poll_result:
            ident, msg = dealer.recv_multipart()
            print 'dealer: ident=%s, msg=%s' % (ident, msg)
            router.send_multipart([ident, msg])


def client(client_id):
    context = zmq.Context()
    req = context.socket(zmq.DEALER)
    # setting identity doesn't seem to make a difference
    req.setsockopt(zmq.IDENTITY, b"%s" % client_id)
    req.connect(router_url)

    req.send('work %d' % client_id)
    while True:
        msg = req.recv()
        print 'client %d received response: %s' % (client_id, msg)


def worker(worker_id):
    context = zmq.Context()
    # to allow asynchronous sending of responses.
    rep = context.socket(zmq.ROUTER)
    # not sure if this is required...
    # rep.setsockopt(zmq.IDENTITY, b"%s" % (10+worker_id))
    rep.connect(dealer_url)

    while True:
        msg = rep.recv_multipart()
        ident, msg = msg[:-1], msg[-1]
        print 'worker %d received: "%s", ident="%s"' % (worker_id, msg, ident)
        # do some work...
        time.sleep(10)
        rep.send_multipart(ident + ['result A from worker %d (%s)' % (worker_id, msg)])
        # do more work...
        time.sleep(10)
        rep.send_multipart(ident + ['result B from worker %d (%s)' % (worker_id, msg)])
    print 'finished worker', worker_id


def main():

    print 'creating workers'
    for i in xrange(2):
        p = multiprocessing.Process(target=worker, args=(i, ))
        p.daemon = True
        p.start()

    print 'creating clients'
    for i in xrange(5):
        p = multiprocessing.Process(target=client, args=(i, ))
        p.daemon = True
        p.start()

    broker()


if __name__ == '__main__':
    main()

它似乎工作得很好。唯一缺少的是工作人员开始处理工作后从客户端到工作人员的通信。我想最好的办法是创建某种新的控制通道 (pub/sub) 以在需要时终止 worker。

还有几个问题:

除了我的更新之外,我发现工作人员可以使用 DEALER 连接到服务器的后端。可以找到模式和解释 here.

客户端使用 DEALER 套接字,服务器在前端(异步 + 许多客户端)作为 ROUTER 接收请求,使用 DEALER 套接字(异步)将它们代理给工作人员(后端),工作人员听取DEALER 套接字上的服务器后端(异步,不需要路由,尽管 ROUTER 也可以工作)。

If the workers were strictly synchronous, we’d use REP, but since we want to send multiple replies we need an async socket. We do not want to route replies; they always go to the single server thread that sent us the request.

进一步的修改是用 zmq.proxy(router, dealer) 替换 router/dealer 消息的隐式调度(while Truebroker() 中循环)。

更新

显然,这个模式使用了 ZMQ 的标准循环路由。自定义任务分配可以通过 ROUTER 到 ROUTER 模式来实现。在这种情况下,客户端从发送请求开始,工作人员从发送就绪消息开始。经纪人管理准备就绪的工作人员列表,如果 none 可用,则关闭对新客户端消息的轮询(因此使用 ZMQ 的内部消息缓冲区)。