如何将队列连接到 ZeroMQ PUB/SUB

How to connect queues to a ZeroMQ PUB/SUB

考虑以下因素:

逻辑服务,比方说 S1,发布一条消息 M1,逻辑服务 S2S3 感兴趣。每个逻辑服务只有一个进程必须接收 M1,所以假设 S2P1S3P2.

我尝试了以下方法,但没有成功:

我认为 XSUB/XPUB 代理会给我 publish/subscribe 语义,而 ROUTER/DEALER 代理会在 REP 套接字之间引入对发送消息的竞争XSUB/XPUB 代理。

如何组合 ZeroMQ 个套接字来完成此操作?

更新1

我知道 "without success" 没有帮助,我尝试了不同的配置并遇到了不同的错误。我试过的最新配置如下:

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

复制循环是这样的:

public void start() {
    context = ZMQ.context(1);

    subSocket = context.socket(ZMQ.SUB);
    subSocket.connect(subSocketUrl);
    subSocket.subscribe("".getBytes());

    reqSocket = context.socket(ZMQ.REQ);
    reqSocket.connect(reqSocketUrl);

    while (!Thread.currentThread().isInterrupted()) {
        final Message msg = receiveNextMessage();
        resendMessage(msg);
    }
}

private Message receiveNextMessage() {
    final String header = subSocket.recvStr();
    final String entity = subSocket.recvStr();

    return new Message(header, entity);
}

private void resendMessage(Message msg) {
    reqSocket.sendMore(msg.getKey());
    reqSocket.send(msg.getData(), 0);
}

我得到的异常如下:

java.lang.IllegalStateException: Cannot send another request
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]

我是 运行 JeroMQ 0.3.4,Oracle Java 8 JVM 和 Windows 7.

您的 ROUTER 连接似乎增加了一些复杂性 - 您应该能够直接连接到您的发布商进行所有操作。

您目前 运行 遇到的错误是 REQ 套接字具有严格的消息排序模式 - 您不允许连续 send() 两次,您必须 send/receive/send/receive/etc(同样,REP 套接字必须 receive/send/receive/send/etc)。从它的外观来看,您只是在 REQ 套接字上执行 send/send/send/etc 而没有收到响应。如果您不关心来自同伴的响应,那么您必须接收并丢弃它或使用 DEALER(或 ROUTER,但 DEALER 在您当前的图表中更有意义)。

我在下面创建了一个图表,说明我将如何完成此架构 - 使用您的基本流程结构。

Broker T1         Broker T2                Broker T3                Broker T4
(PUB*)------>(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)
       |_____________________||____|                  ||    |                  ||
       |_____________________||_______________________||____|                  ||
                             ||                       ||                       ||
     ========================||     ==================||            ===========||=
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
(REP*)         (REP*)          (REP*)          (REP*)          (REP*)          (REP*)
 S1P1           S1P2            S2P1            S2P2            S3P1            S3P2

因此,主要区别在于我放弃了您的 (SUB copyLoop=> REQ) 步骤。您是否选择 XPUB/XSUB 还是 PUB/SUB 取决于您,但我倾向于从更简单的开始,除非您目前想要利用 XPUB/XSUB.

的额外功能

显然,此图并未处理信息如何进入您的经纪人,您目前在其中显示了一个 XSUB 套接字 - 这超出了您目前提供的信息的范围,大概您能够已经成功将信息接收到您的经纪人,所以我不会处理那个。

我假设专用于每个服务的代理线程正在就是否将消息发送到它们的服务做出明智的选择?如果是这样,那么您让他们订阅所有内容的选择应该可以正常工作,否则可能需要更智能的订阅设置。

如果您在服务进程上使用 REP 套接字,则服务进程必须接收该消息并异步处理它,从不 回传任何消息有关发送给经纪人的消息的详细信息。然后它必须用确认(如 "RECEIVED")响应每条消息,以便它遵循 REP 套接字的严格 receive/send/receive/send 模式。

如果您想要关于服务如何处理发送回代理的消息的任何其他类型的通信,REP 不再是适合您的服务进程的套接字类型,并且 DEALER 可能不再是您经纪人的正确套接字类型。如果您想要某种形式的负载平衡以便发送到下一个打开的服务进程,则需要使用 ROUTER/REQ 并让每个服务指示其可用性并让代理保留消息直到下一个服务过程通过发回结果表示它可用。如果您想要一些其他类型的消息处理,您必须指出那是什么,以便可以提出合适的体系结构。

显然我混淆了一些元素:

  • 套接字具有相同的 API 无论您将其用作客户端套接字 (Socket.connect) 还是服务器端套接字 (Socket.bind)
  • 无论类型如何,套接字都具有相同的 API(例如,不应在 PUSH 套接字上调用 Socket.subscribe
  • 某些套接字类型需要 send/receive 响应循环(例如 REQ/REP
  • 沟通模式的一些细微差别(PUSH/PULL vs ROUTER/DEALER
  • 调试 ZeroMQ 设置的困难(不可能?)

非常感谢 Jason 非常详细的回答(以及很棒的图表!),这为我指明了正确的方向。

我最终得到了以下设计:

  • 代理线程 1 是 运行 bind(localhost:6000)bind(localhost:6001)
  • 上的扇出 XSUB/XPUB 代理
  • 代理线程 2 是 运行 在 connect(localhost:6001)bind(localhost:6002) 上排队的 SUB/PUSH 代理;代理线程 3 和 4 使用类似的设计,但绑定端口号不同
  • 消息生产者使用 connect(localhost:6000)
  • 上的 PUB 套接字连接到代理
  • 消息消费者使用 connect(localhost:6002)
  • 上的 PULL 套接字连接到代理队列代理

在这种特定于服务的排队机制之上,我能够相当简单地添加类似的特定于服务的扇出机制:

  • 代理线程在 connect(localhost:6001)bind(localhost:6003)
  • 上运行 SUB/PUB 代理
  • 消息生产者仍然使用 connect(localhost:6000)
  • 上的 PUB 套接字连接到代理
  • 消息消费者使用 connect(localhost:6003)
  • 上的 SUB 套接字连接到代理扇出代理

这是一次有趣的旅程。