如何创建适合发送和消费的 ZeroMQ 套接字?
How to create ZeroMQ socket suitable both for sending and consuming?
您能否为以下场景建议一个 ZeroMQ 套接字架构:
1) 有服务器监听端口
2) 有多个客户端同时连接服务器
3) 服务器接受来自客户端的所有连接并为每个客户端提供双向队列,意味着双方(客户端 N 或服务器)都可以发送或使用消息,即双方都可以是通信的发起者和另一方应该有一个回调来处理消息。
我们是否应该在每个接受的连接上创建额外的 ZeroMQ 套接字以从服务器推送消息?对于这种架构,您能否建议 google 哪种 ZeroMQ 套接字类型?
Q : …create additional ZeroMQ socket on each accepted connection for pushing messages from server?
最好的基于简单组合的设计——兼顾扩展性和安全性
本机 ZeroMQ 原语(智能原语可扩展正式通信模式原型)对我们来说就像乐高积木 - 我们将它们进一步用于我们应用程序中预期目标用途的 Messaging/Signalling 平面-域。
Q : Could you please advice of which ZeroMQ socket type to google for such architecture?
否,因为没有可用于此类建议的详细要求列表。一对 PUSH/PULL
-s 本身是不够的,临时进行的(偶发的)REQ/REP
可能有助于客户(重新)发现阶段,其他共存的、持续的或偶发的也可能如此原型用于组成任何额外的 System/Service-Planes.
我遇到了基本相同的问题。对我来说,问题似乎是即使在 python 中有 GIL 的保护,你也不能同时从两个线程操作一个套接字。这将导致解释器崩溃。因此,一个线程不能有发送和接收线程,只有一个线程必须能够同时接收和发送,而 pyzmq 无法同时提供 poll() 套接字和非套接字的方法。
user3666197 也回复了我的问题,但是两个回复都没有帮助。尤其是因为不能假设有 1000 多个免费和开放的端口来为所有客户端连接生成主机套接字。
有一个相对丑陋的解决方案。这是一个缺少适当的 NOWAIT 标志等的草稿。 make_process_pull_socket() 创建一个用于进程内部通信的套接字。如下:
class TwoWay(threading.Thread):
def __init__(self, ip_string, port, inque:queue.Queue):
super(TwoWay, self).__init__()
self.sock = cntxt.socket(zmq.ROUTER)
addr = "tcp://{}:{}".format(ip_string, str(port))
self.sock.bind(addr)
self.pull = make_process_pull_socket(4456)
self.push = make_process_push_socket(4456)
self.inque = inque
def run(self):
pl = zmq.Poller()
pl.register(self.sock,zmq.POLLIN)
pl.register(self.pull,zmq.POLLIN)
while True:
try:
p = pl.poll(timeout=2000)
if not p:
continue
s,i = p[0]
if s == self.sock:
a,m = s.recv_multipart()
self.inque.put((a,m))
if s == self.pull:
r = s.recv()
if len(r) > 5:
self.sock.send_multipart([r[:5], r[5:]])
except Exception as e:
print(e)
然后您将传入消息放入作为参数提供的队列中,传出消息(带有标识符)可以使用 TwoWay.push.send(id+msg)
发送
您能否为以下场景建议一个 ZeroMQ 套接字架构:
1) 有服务器监听端口
2) 有多个客户端同时连接服务器
3) 服务器接受来自客户端的所有连接并为每个客户端提供双向队列,意味着双方(客户端 N 或服务器)都可以发送或使用消息,即双方都可以是通信的发起者和另一方应该有一个回调来处理消息。
我们是否应该在每个接受的连接上创建额外的 ZeroMQ 套接字以从服务器推送消息?对于这种架构,您能否建议 google 哪种 ZeroMQ 套接字类型?
Q : …create additional ZeroMQ socket on each accepted connection for pushing messages from server?
最好的基于简单组合的设计——兼顾扩展性和安全性
本机 ZeroMQ 原语(智能原语可扩展正式通信模式原型)对我们来说就像乐高积木 - 我们将它们进一步用于我们应用程序中预期目标用途的 Messaging/Signalling 平面-域。
Q : Could you please advice of which ZeroMQ socket type to google for such architecture?
否,因为没有可用于此类建议的详细要求列表。一对 PUSH/PULL
-s 本身是不够的,临时进行的(偶发的)REQ/REP
可能有助于客户(重新)发现阶段,其他共存的、持续的或偶发的也可能如此原型用于组成任何额外的 System/Service-Planes.
我遇到了基本相同的问题。对我来说,问题似乎是即使在 python 中有 GIL 的保护,你也不能同时从两个线程操作一个套接字。这将导致解释器崩溃。因此,一个线程不能有发送和接收线程,只有一个线程必须能够同时接收和发送,而 pyzmq 无法同时提供 poll() 套接字和非套接字的方法。
user3666197 也回复了我的问题,但是两个回复都没有帮助。尤其是因为不能假设有 1000 多个免费和开放的端口来为所有客户端连接生成主机套接字。
有一个相对丑陋的解决方案。这是一个缺少适当的 NOWAIT 标志等的草稿。 make_process_pull_socket() 创建一个用于进程内部通信的套接字。如下:
class TwoWay(threading.Thread):
def __init__(self, ip_string, port, inque:queue.Queue):
super(TwoWay, self).__init__()
self.sock = cntxt.socket(zmq.ROUTER)
addr = "tcp://{}:{}".format(ip_string, str(port))
self.sock.bind(addr)
self.pull = make_process_pull_socket(4456)
self.push = make_process_push_socket(4456)
self.inque = inque
def run(self):
pl = zmq.Poller()
pl.register(self.sock,zmq.POLLIN)
pl.register(self.pull,zmq.POLLIN)
while True:
try:
p = pl.poll(timeout=2000)
if not p:
continue
s,i = p[0]
if s == self.sock:
a,m = s.recv_multipart()
self.inque.put((a,m))
if s == self.pull:
r = s.recv()
if len(r) > 5:
self.sock.send_multipart([r[:5], r[5:]])
except Exception as e:
print(e)
然后您将传入消息放入作为参数提供的队列中,传出消息(带有标识符)可以使用 TwoWay.push.send(id+msg)
发送