zmq 扩展 pub sub ({pub, pub, ...} -> {xsub->xpub} -> {sub, sub, ...}

zmq Extended pub sub ({pub, pub, ...} -> {xsub->xpub} -> {sub, sub, ...}

我正在尝试实现类似于 图 14 - 扩展的 Pub-Sub 中描述的模型 .

我看过这个 snippet,效果很好。但是当尝试实现第一部分时(即:{pub, ...} -> {xsub 我没有在 xsub 套接字上收到任何数据。

以下代码永远阻塞,而不是接收消息:

use zmq::{Context, SocketType::XSUB, PUB};

const ADDRESS_XSUB: &'static str = "tcp://127.0.0.1:9123";
fn main() {
    let context = Context::new();
    let xsubscriber = {
        let socket = context.socket(XSUB).unwrap();
        socket.bind(ADDRESS_XSUB).unwrap();
        socket
    };

    let publisher = {
        let context = Context::new();
        let socket = context.socket(PUB).unwrap();
        socket.connect(ADDRESS_XSUB).unwrap();
        socket
    };

    publisher.send("", zmq::SNDMORE).unwrap();
    publisher.send("HI", 0x00).unwrap();

    assert_eq!(b"HI".as_ref(), &xsubscriber.recv_bytes(0).unwrap());
}

XSUB 通常与 XPUB 配对以创建一个单独的代理,这允许在许多发布者和订阅者之间路由订阅消息和数据。

如果您想使用 XSUB,您需要向 XSUB 套接字发送订阅消息(第一个字节 0x1,后跟过滤器)。

为了确保代码正常工作,我建议将 XSUB 切换为 SUB 并使用空的“”过滤器调用订阅 API。

在发送消息之前,还要确保订阅者已连接到发布者。发布者仅在建立连接后才对消息进行排队。