如何创建适合发送和消费的 ZeroMQ 套接字?

How to create ZeroMQ socket suitable both for sending and consuming?

您能否为以下场景建议一个 ZeroMQ 套接字架构:

1) 有服务器监听端口

2) 有多个客户端同时连接服务器

3) 服务器接受来自客户端的所有连接并为每个客户端提供双向队列,意味着双方(客户端 N 或服务器)都可以发送或使用消息,即双方都可以是通信的发起者和另一方应该有一个回调来处理消息。

我们是否应该在每个接受的连接上创建额外的 ZeroMQ 套接字以从服务器推送消息?对于这种架构,您能否建议 google 哪种 ZeroMQ 套接字类型?

Q : …create additional ZeroMQ socket on each accepted connection for pushing messages from server?

最好的基于简单组合的设计——兼顾扩展性和安全性

本机 ZeroMQ 原语(智能原语可扩展正式通信模式原型)对我们来说就像乐高积木 - 我们将它们进一步用于我们应用程序中预期目标用途的 Messaging/Signalling 平面-域。

Q : Could you please advice of which ZeroMQ socket type to google for such architecture?

否,因为没有可用于此类建议的详细要求列表。一对 PUSH/PULL-s 本身是不够的,临时进行的(偶发的)REQ/REP 可能有助于客户(重新)发现阶段,其他共存的、持续的或偶发的也可能如此原型用于组成任何额外的 System/Service-Planes.

我遇到了基本相同的问题。对我来说,问题似乎是即使在 python 中有 GIL 的保护,你也不能同时从两个线程操作一个套接字。这将导致解释器崩溃。因此,一个线程不能有发送和接收线程,只有一个线程必须能够同时接收和发送,而 pyzmq 无法同时提供 poll() 套接字和非套接字的方法。

user3666197 也回复了我的问题,但是两个回复都没有帮助。尤其是因为不能假设有 1000 多个免费和开放的端口来为所有客户端连接生成主机套接字。

有一个相对丑陋的解决方案。这是一个缺少适当的 NOWAIT 标志等的草稿。 make_process_pull_socket() 创建一个用于进程内部通信的套接字。如下:

class TwoWay(threading.Thread):
def __init__(self, ip_string, port, inque:queue.Queue):
    super(TwoWay, self).__init__()
    self.sock = cntxt.socket(zmq.ROUTER)
    addr = "tcp://{}:{}".format(ip_string, str(port))
    self.sock.bind(addr)
    self.pull = make_process_pull_socket(4456)
    self.push = make_process_push_socket(4456)
    self.inque = inque

def run(self):
    pl = zmq.Poller()
    pl.register(self.sock,zmq.POLLIN)
    pl.register(self.pull,zmq.POLLIN)
    while True:
        try:
            p = pl.poll(timeout=2000)
            if not p:
                continue
            s,i = p[0]
            if s == self.sock:
                a,m = s.recv_multipart()
                self.inque.put((a,m))
            if s == self.pull:
                r = s.recv()
                if len(r) > 5:
                    self.sock.send_multipart([r[:5], r[5:]])
        except Exception as e:
            print(e)

然后您将传入消息放入作为参数提供的队列中,传出消息(带有标识符)可以使用 TwoWay.push.send(id+msg)

发送