XSUB 未在 ZeroMQ XSUB/PUB 设置中接收
XSUB not receiving in ZeroMQ XSUB/PUB setup
我想使用 XSUB/XPUB 启用多个 ZMQ 发布者和订阅者。当我使用 zmq.proxy(xpub_socket, xsub_socket)
时一切正常,但我需要一些自定义的东西,因为我需要在 XSUB 和 XPUB 之间编写代码来检查消息。
这是我所在的位置:
import time
import zmq
context = zmq.Context()
address = '127.0.0.1'
pub_port = '3000'
sub_port = '3001'
# XSUB socket
xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind(f'tcp://{address}:{pub_port}')
# XPUB socket
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind(f'tcp://{address}:{sub_port}')
time.sleep(1)
# PUB socket
pub_socket = context.socket(zmq.PUB)
pub_socket.connect(f'tcp://{address}:{pub_port}')
# SUB socket
sub_socket = context.socket(zmq.SUB)
sub_socket.subscribe('')
sub_socket.connect(f'tcp://{address}:{sub_port}')
time.sleep(1)
pub_socket.send_string('test')
time.sleep(1)
print(poller.poll(0))
从 PUB 套接字发送的值没有到达 XSUB 套接字。
我读到 here 第一个字节需要是 1。这两个也不行:
pub_socket.send(b'\x01')
pub_socket.send_multipart([b'\x01', 'test'.encode('utf-8')])
我做错了什么?
A PUB
套接字不会向 XSUB
套接字发送任何消息,除非它已收到订阅请求,您可以通过在 [=16= 上调用 subscribe
获得订阅请求]套接字。
传递这些订阅消息的唯一方法是设置 XSUB/XPUB
代理。
这是一个连接 XPUB
和 XSUB
套接字的简单代理,打印出它在任一方向接收到的消息:
import zmq
ctx = zmq.Context()
xpub_sock = ctx.socket(zmq.XPUB)
xpub_sock.bind("tcp://127.0.0.1:3000")
xsub_sock = ctx.socket(zmq.XSUB)
xsub_sock.bind("tcp://127.0.0.1:3001")
poller = zmq.Poller()
poller.register(xpub_sock, zmq.POLLIN)
poller.register(xsub_sock, zmq.POLLIN)
while True:
socks = dict(poller.poll())
if xpub_sock in socks and socks[xpub_sock] == zmq.POLLIN:
msg = xpub_sock.recv_multipart()
print("(sub)", msg)
xsub_sock.send_multipart(msg)
elif xsub_sock in socks and socks[xsub_sock] == zmq.POLLIN:
msg = xsub_sock.recv_multipart()
print("(pub)", msg)
xpub_sock.send_multipart(msg)
如果我用 PUB
套接字连接到它,就像这样...
import zmq
import time
ctx = zmq.Context()
pub_sock = ctx.socket(zmq.PUB)
pub_sock.connect("tcp://localhost:3001")
while True:
pub_sock.send_string("test")
time.sleep(1)
...我不会看到任何消息到达 XSUB
套接字,因为
没有活跃的订阅。但是,如果我连接 SUB
套接字到 XPUB
套接字并设置订阅...
import zmq
ctx = zmq.Context()
sub_sock = ctx.socket(zmq.SUB)
sub_sock.connect("tcp://localhost:3000")
sub_sock.subscribe("")
while True:
msg = sub_sock.recv()
print(msg)
...然后我将开始看到消息从 PUB
套接字传递到
XSUB
插座,然后从 XPUB
插座到 SUB
套接字。
我想使用 XSUB/XPUB 启用多个 ZMQ 发布者和订阅者。当我使用 zmq.proxy(xpub_socket, xsub_socket)
时一切正常,但我需要一些自定义的东西,因为我需要在 XSUB 和 XPUB 之间编写代码来检查消息。
这是我所在的位置:
import time
import zmq
context = zmq.Context()
address = '127.0.0.1'
pub_port = '3000'
sub_port = '3001'
# XSUB socket
xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind(f'tcp://{address}:{pub_port}')
# XPUB socket
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind(f'tcp://{address}:{sub_port}')
time.sleep(1)
# PUB socket
pub_socket = context.socket(zmq.PUB)
pub_socket.connect(f'tcp://{address}:{pub_port}')
# SUB socket
sub_socket = context.socket(zmq.SUB)
sub_socket.subscribe('')
sub_socket.connect(f'tcp://{address}:{sub_port}')
time.sleep(1)
pub_socket.send_string('test')
time.sleep(1)
print(poller.poll(0))
从 PUB 套接字发送的值没有到达 XSUB 套接字。
我读到 here 第一个字节需要是 1。这两个也不行:
pub_socket.send(b'\x01')
pub_socket.send_multipart([b'\x01', 'test'.encode('utf-8')])
我做错了什么?
A PUB
套接字不会向 XSUB
套接字发送任何消息,除非它已收到订阅请求,您可以通过在 [=16= 上调用 subscribe
获得订阅请求]套接字。
传递这些订阅消息的唯一方法是设置 XSUB/XPUB
代理。
这是一个连接 XPUB
和 XSUB
套接字的简单代理,打印出它在任一方向接收到的消息:
import zmq
ctx = zmq.Context()
xpub_sock = ctx.socket(zmq.XPUB)
xpub_sock.bind("tcp://127.0.0.1:3000")
xsub_sock = ctx.socket(zmq.XSUB)
xsub_sock.bind("tcp://127.0.0.1:3001")
poller = zmq.Poller()
poller.register(xpub_sock, zmq.POLLIN)
poller.register(xsub_sock, zmq.POLLIN)
while True:
socks = dict(poller.poll())
if xpub_sock in socks and socks[xpub_sock] == zmq.POLLIN:
msg = xpub_sock.recv_multipart()
print("(sub)", msg)
xsub_sock.send_multipart(msg)
elif xsub_sock in socks and socks[xsub_sock] == zmq.POLLIN:
msg = xsub_sock.recv_multipart()
print("(pub)", msg)
xpub_sock.send_multipart(msg)
如果我用 PUB
套接字连接到它,就像这样...
import zmq
import time
ctx = zmq.Context()
pub_sock = ctx.socket(zmq.PUB)
pub_sock.connect("tcp://localhost:3001")
while True:
pub_sock.send_string("test")
time.sleep(1)
...我不会看到任何消息到达 XSUB
套接字,因为
没有活跃的订阅。但是,如果我连接 SUB
套接字到 XPUB
套接字并设置订阅...
import zmq
ctx = zmq.Context()
sub_sock = ctx.socket(zmq.SUB)
sub_sock.connect("tcp://localhost:3000")
sub_sock.subscribe("")
while True:
msg = sub_sock.recv()
print(msg)
...然后我将开始看到消息从 PUB
套接字传递到
XSUB
插座,然后从 XPUB
插座到 SUB
套接字。