如何将队列连接到 ZeroMQ PUB/SUB
How to connect queues to a ZeroMQ PUB/SUB
考虑以下因素:
- 一组 3 个逻辑服务:
S1
、S2
和 S3
- 每个服务的两个实例是运行,所以我们有以下流程:
S1P1
、S1P2
、S2P1
、S2P2
、S3P1
, S3P2
- 一个
ZeroMQ
代理 运行 在单个进程中,所有服务进程都可以访问
逻辑服务,比方说 S1
,发布一条消息 M1
,逻辑服务 S2
和 S3
感兴趣。每个逻辑服务只有一个进程必须接收 M1
,所以假设 S2P1
和 S3P2
.
我尝试了以下方法,但没有成功:
- 代理线程 1 是 运行 一个
XSUB/XPUB
代理
- 代理线程 2 是 运行 一个
ROUTER/DEALER
代理,ROUTER
连接到 XPUB
套接字并订阅了一切(逻辑 S1
)
- 代理线程 3 是 运行 一个
ROUTER/DEALER
代理,ROUTER
连接到 XPUB
套接字并订阅了所有内容(对于逻辑 S2
)
- 代理线程 4 是 运行 一个
ROUTER/DEALER
代理,ROUTER
连接到 XPUB 套接字并订阅了所有内容(对于逻辑 S3
)
- 每个逻辑服务进程都是运行一个连接到代理
DEALER
套接字 的REP
套接字线程
我认为 XSUB/XPUB
代理会给我 publish/subscribe 语义,而 ROUTER/DEALER
代理会在 REP
套接字之间引入对发送消息的竞争XSUB/XPUB
代理。
如何组合 ZeroMQ
个套接字来完成此操作?
更新1
我知道 "without success" 没有帮助,我尝试了不同的配置并遇到了不同的错误。我试过的最新配置如下:
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
复制循环是这样的:
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
我得到的异常如下:
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
我是 运行 JeroMQ 0.3.4,Oracle Java 8 JVM 和 Windows 7.
您的 ROUTER
连接似乎增加了一些复杂性 - 您应该能够直接连接到您的发布商进行所有操作。
您目前 运行 遇到的错误是 REQ
套接字具有严格的消息排序模式 - 您不允许连续 send()
两次,您必须 send/receive/send/receive/etc(同样,REP
套接字必须 receive/send/receive/send/etc)。从它的外观来看,您只是在 REQ
套接字上执行 send/send/send/etc 而没有收到响应。如果您不关心来自同伴的响应,那么您必须接收并丢弃它或使用 DEALER
(或 ROUTER
,但 DEALER
在您当前的图表中更有意义)。
我在下面创建了一个图表,说明我将如何完成此架构 - 使用您的基本流程结构。
Broker T1 Broker T2 Broker T3 Broker T4
(PUB*)------>(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*)
|_____________________||____| || | ||
|_____________________||_______________________||____| ||
|| || ||
========================|| ==================|| ===========||=
|| || || || || ||
|| || || || || ||
|| || || || || ||
(REP*) (REP*) (REP*) (REP*) (REP*) (REP*)
S1P1 S1P2 S2P1 S2P2 S3P1 S3P2
因此,主要区别在于我放弃了您的 (SUB copyLoop=> REQ)
步骤。您是否选择 XPUB/XSUB
还是 PUB/SUB
取决于您,但我倾向于从更简单的开始,除非您目前想要利用 XPUB/XSUB
.
的额外功能
显然,此图并未处理信息如何进入您的经纪人,您目前在其中显示了一个 XSUB
套接字 - 这超出了您目前提供的信息的范围,大概您能够已经成功将信息接收到您的经纪人,所以我不会处理那个。
我假设专用于每个服务的代理线程正在就是否将消息发送到它们的服务做出明智的选择?如果是这样,那么您让他们订阅所有内容的选择应该可以正常工作,否则可能需要更智能的订阅设置。
如果您在服务进程上使用 REP
套接字,则服务进程必须接收该消息并异步处理它,从不 回传任何消息有关发送给经纪人的消息的详细信息。然后它必须用确认(如 "RECEIVED")响应每条消息,以便它遵循 REP
套接字的严格 receive/send/receive/send 模式。
如果您想要关于服务如何处理发送回代理的消息的任何其他类型的通信,REP
不再是适合您的服务进程的套接字类型,并且 DEALER
可能不再是您经纪人的正确套接字类型。如果您想要某种形式的负载平衡以便发送到下一个打开的服务进程,则需要使用 ROUTER/REQ
并让每个服务指示其可用性并让代理保留消息直到下一个服务过程通过发回结果表示它可用。如果您想要一些其他类型的消息处理,您必须指出那是什么,以便可以提出合适的体系结构。
显然我混淆了一些元素:
- 套接字具有相同的 API 无论您将其用作客户端套接字 (
Socket.connect
) 还是服务器端套接字 (Socket.bind
)
- 无论类型如何,套接字都具有相同的 API(例如,不应在
PUSH
套接字上调用 Socket.subscribe
)
- 某些套接字类型需要 send/receive 响应循环(例如
REQ/REP
)
- 沟通模式的一些细微差别(
PUSH/PULL
vs ROUTER/DEALER
)
- 调试 ZeroMQ 设置的困难(不可能?)
非常感谢 Jason 非常详细的回答(以及很棒的图表!),这为我指明了正确的方向。
我最终得到了以下设计:
- 代理线程 1 是 运行
bind(localhost:6000)
和 bind(localhost:6001)
上的扇出 XSUB/XPUB
代理
- 代理线程 2 是 运行 在
connect(localhost:6001)
和 bind(localhost:6002)
上排队的 SUB/PUSH
代理;代理线程 3 和 4 使用类似的设计,但绑定端口号不同
- 消息生产者使用
connect(localhost:6000)
上的 PUB
套接字连接到代理
- 消息消费者使用
connect(localhost:6002)
上的 PULL
套接字连接到代理队列代理
在这种特定于服务的排队机制之上,我能够相当简单地添加类似的特定于服务的扇出机制:
- 代理线程在
connect(localhost:6001)
和 bind(localhost:6003)
上运行 SUB/PUB
代理
- 消息生产者仍然使用
connect(localhost:6000)
上的 PUB
套接字连接到代理
- 消息消费者使用
connect(localhost:6003)
上的 SUB
套接字连接到代理扇出代理
这是一次有趣的旅程。
考虑以下因素:
- 一组 3 个逻辑服务:
S1
、S2
和S3
- 每个服务的两个实例是运行,所以我们有以下流程:
S1P1
、S1P2
、S2P1
、S2P2
、S3P1
,S3P2
- 一个
ZeroMQ
代理 运行 在单个进程中,所有服务进程都可以访问
逻辑服务,比方说 S1
,发布一条消息 M1
,逻辑服务 S2
和 S3
感兴趣。每个逻辑服务只有一个进程必须接收 M1
,所以假设 S2P1
和 S3P2
.
我尝试了以下方法,但没有成功:
- 代理线程 1 是 运行 一个
XSUB/XPUB
代理 - 代理线程 2 是 运行 一个
ROUTER/DEALER
代理,ROUTER
连接到XPUB
套接字并订阅了一切(逻辑S1
) - 代理线程 3 是 运行 一个
ROUTER/DEALER
代理,ROUTER
连接到XPUB
套接字并订阅了所有内容(对于逻辑S2
) - 代理线程 4 是 运行 一个
ROUTER/DEALER
代理,ROUTER
连接到 XPUB 套接字并订阅了所有内容(对于逻辑S3
) - 每个逻辑服务进程都是运行一个连接到代理
DEALER
套接字 的
REP
套接字线程
我认为 XSUB/XPUB
代理会给我 publish/subscribe 语义,而 ROUTER/DEALER
代理会在 REP
套接字之间引入对发送消息的竞争XSUB/XPUB
代理。
如何组合 ZeroMQ
个套接字来完成此操作?
更新1
我知道 "without success" 没有帮助,我尝试了不同的配置并遇到了不同的错误。我试过的最新配置如下:
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
复制循环是这样的:
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
我得到的异常如下:
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
我是 运行 JeroMQ 0.3.4,Oracle Java 8 JVM 和 Windows 7.
您的 ROUTER
连接似乎增加了一些复杂性 - 您应该能够直接连接到您的发布商进行所有操作。
您目前 运行 遇到的错误是 REQ
套接字具有严格的消息排序模式 - 您不允许连续 send()
两次,您必须 send/receive/send/receive/etc(同样,REP
套接字必须 receive/send/receive/send/etc)。从它的外观来看,您只是在 REQ
套接字上执行 send/send/send/etc 而没有收到响应。如果您不关心来自同伴的响应,那么您必须接收并丢弃它或使用 DEALER
(或 ROUTER
,但 DEALER
在您当前的图表中更有意义)。
我在下面创建了一个图表,说明我将如何完成此架构 - 使用您的基本流程结构。
Broker T1 Broker T2 Broker T3 Broker T4
(PUB*)------>(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*)
|_____________________||____| || | ||
|_____________________||_______________________||____| ||
|| || ||
========================|| ==================|| ===========||=
|| || || || || ||
|| || || || || ||
|| || || || || ||
(REP*) (REP*) (REP*) (REP*) (REP*) (REP*)
S1P1 S1P2 S2P1 S2P2 S3P1 S3P2
因此,主要区别在于我放弃了您的 (SUB copyLoop=> REQ)
步骤。您是否选择 XPUB/XSUB
还是 PUB/SUB
取决于您,但我倾向于从更简单的开始,除非您目前想要利用 XPUB/XSUB
.
显然,此图并未处理信息如何进入您的经纪人,您目前在其中显示了一个 XSUB
套接字 - 这超出了您目前提供的信息的范围,大概您能够已经成功将信息接收到您的经纪人,所以我不会处理那个。
我假设专用于每个服务的代理线程正在就是否将消息发送到它们的服务做出明智的选择?如果是这样,那么您让他们订阅所有内容的选择应该可以正常工作,否则可能需要更智能的订阅设置。
如果您在服务进程上使用 REP
套接字,则服务进程必须接收该消息并异步处理它,从不 回传任何消息有关发送给经纪人的消息的详细信息。然后它必须用确认(如 "RECEIVED")响应每条消息,以便它遵循 REP
套接字的严格 receive/send/receive/send 模式。
如果您想要关于服务如何处理发送回代理的消息的任何其他类型的通信,REP
不再是适合您的服务进程的套接字类型,并且 DEALER
可能不再是您经纪人的正确套接字类型。如果您想要某种形式的负载平衡以便发送到下一个打开的服务进程,则需要使用 ROUTER/REQ
并让每个服务指示其可用性并让代理保留消息直到下一个服务过程通过发回结果表示它可用。如果您想要一些其他类型的消息处理,您必须指出那是什么,以便可以提出合适的体系结构。
显然我混淆了一些元素:
- 套接字具有相同的 API 无论您将其用作客户端套接字 (
Socket.connect
) 还是服务器端套接字 (Socket.bind
) - 无论类型如何,套接字都具有相同的 API(例如,不应在
PUSH
套接字上调用Socket.subscribe
) - 某些套接字类型需要 send/receive 响应循环(例如
REQ/REP
) - 沟通模式的一些细微差别(
PUSH/PULL
vsROUTER/DEALER
) - 调试 ZeroMQ 设置的困难(不可能?)
非常感谢 Jason 非常详细的回答(以及很棒的图表!),这为我指明了正确的方向。
我最终得到了以下设计:
- 代理线程 1 是 运行
bind(localhost:6000)
和bind(localhost:6001)
上的扇出 - 代理线程 2 是 运行 在
connect(localhost:6001)
和bind(localhost:6002)
上排队的SUB/PUSH
代理;代理线程 3 和 4 使用类似的设计,但绑定端口号不同 - 消息生产者使用
connect(localhost:6000)
上的 - 消息消费者使用
connect(localhost:6002)
上的
XSUB/XPUB
代理
PUB
套接字连接到代理
PULL
套接字连接到代理队列代理
在这种特定于服务的排队机制之上,我能够相当简单地添加类似的特定于服务的扇出机制:
- 代理线程在
connect(localhost:6001)
和bind(localhost:6003)
上运行 - 消息生产者仍然使用
connect(localhost:6000)
上的 - 消息消费者使用
connect(localhost:6003)
上的
SUB/PUB
代理
PUB
套接字连接到代理
SUB
套接字连接到代理扇出代理
这是一次有趣的旅程。