ZeroMQ 并发发布和订阅
ZeroMQ Publish and Subscribe concurrently
我正在开发一个 C++ 程序,它需要能够从任意数量的其他客户端发送/接收 JSON
-有效负载。
起初,我尝试实现 PubNub 服务,但我发现我无法同时获取和发布消息(即使在不同的线程上使用两个不同的上下文)。我需要能够做到这一点。我还发现 PubNub 的延迟太长,我不喜欢。
我遇到了 ZeroMQ library which has a PUB/SUB
model that would suit my needs. But all examples 我遇到了解释如何以一个进程是发布者或订阅者的方式实现它,而不是同时两者.
理想情况下,我想编写一个服务器,将来自任何人的所有消息转发给订阅消息中指定的特定频道的任何人。 任何人都应该能够接收消息并向网络上的任何其他人发布消息, 只要他们订阅了正确的频道。
UPDATE 1:
注意:我不需要接收保险,因为有效负载 N+1 将优先于有效负载 N。我想要一个发送后忘记的通信方式(类似 UDP) .
根据要求:PubNub 限制 32 kB
每个 JSON
-payload 对我来说是完美的,我不需要更多。事实上,我的有效载荷平均在 4 kB
左右。所有客户端实例都将 运行 在同一个本地网络上,因此理想情况下延迟应小于 5 ms
。至于客户端的数量,一次不会超过4个客户端订阅同一个channel/topic。
UPDATE 2 :
我无法提前预测会存在多少 channels/topics,但大约有几十个(大部分时间)、数百个(高峰期)。不是几千。
问题:
Q1:
- 我可以使用 ZeroMQ
实现这样的行为吗?
Q2:
- 是否有任何工作示例证明(最好在 C++
中)?
Q3:
- 如果没有,对 C++
中的图书馆有什么建议吗?
Q1:
- Can I implement such a behavior using ZeroMQ ?
当然可以;但可能不使用 PUB/SUB
套接字。
使用 PUB/SUB
的方法是这样的:对于系统中的每个节点,你制作一个 PUB
套接字和一个 SUB
套接字,并连接单个 SUB
套接字到 所有 其他节点的 PUB
套接字,并相应地设置您的订阅过滤器。这在其用途上是有限的,因为(我认为)您需要为所有连接设置相同的过滤器。请注意,您绝对应该不在每个节点中创建多个上下文。
如果您的节点总数较少(例如 10-20 个或更少),您可以为每个节点创建一个 PUB
套接字和 N-1 SUB
个套接字(仍然全部在一个上下文中,) 并将每个 SUB
套接字连接到其他节点的每个 PUB
套接字。
如果您对客户端和服务器节点有清晰的概念,您可以使用较新的 CLIENT/SERVER
套接字(我相信在 4.2
或 4.1
中可用。)优雅且可能更易于管理,但您必须自己实施内容过滤 ("channels");这可能非常简单,也可能有点复杂,具体取决于您想要做什么。
Q2:
- Is there any working sample demonstrating that (preferably in C++) ?
据我所知没有。
Q3:
- If not, any suggestions for a library in C++ ?
我仍然建议 ZeroMQ
,因为它的重量相对较轻,界面简洁优雅,功能全面,并且能够使用多种语言工作。有很多套接字组合可供选择。如果情况变得更糟,您始终可以在任何地方使用 PAIR
套接字。
nanomsg
与 BUS
协议,参见 http://nanomsg.org/documentation-zeromq.html
ZeroMQ :
is capable of serving this task well within scales given above
nanomsg :
is capable of serving this task too, a need to cross-check ports/bindings for clients
Design review:
- client 实例不是持久的,可能会自行出现,可能会自行消失或出错
- client 实例自行决定它要
PUB
-将其定义为消息负载
- client 实例自行决定,它将要
SUB
-scribe 作为实际传入的消息流 TOPIC
-filter
- 客户端实例交换(发送),它自己,一个普通的,非多部分的,
JSON
-它准备/产生的格式消息
- client 实例收集(接收)消息,它假定这些消息具有相同的、非多部分、
JSON
格式的形状,并且尝试对其进行在接收完成后进行本地处理
- 客户端-实例的最大数量不超过数百
- 任何
JSON
格式的有效负载的最大大小小于32 kB
,大约4 kB
平均
- 跨公共 LAN 冲突域的 E2E 进程到进程交付可接受的最大延迟低于
5,000 [usec]
- 服务器实例是一个中心角色和持久实体
- server 实例为所有 late-joiners' 提供已知的传输-class
URL
-目标.connect()
-s
Proposal:
server may deploy multiple behaviours to meet the given goals, using both the PUB
and SUB
behaviours, and provides a code-driven, fast, SUB
-side attached, non-blocking event-loop .poll()
with aligned re-transmission of any of it's SUB
-side .recv()
-ed payloads to it's PUB
-side, currently .connect()
-ed, audience ( live client instances ):
set s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
and s_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads:
map s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
and s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
set s_PUB_send.bind( "tcp://localhost:8899" );
+
set s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
set s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
set s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
and s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()
Similarly,
client instance may deploy a reverse-facing tandem of both a PUB
-endpoint and SUB
-endpoint, ready to .connect()
to a known transport-target-URL
.
The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior to ZeroMQ v.3.1 API
the plentitude of all messages will get delivered to each client instance over the transport class, however since API v.3.1+
, the topic-filter is being operated on the PUB
-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases the PUB
-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above )
set c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
and c_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate the ZeroMQ
low-level I/O-threads here:
map c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
and c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
set c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
set c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
set c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
set c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
set c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
and c_SUB_recv.connect( "tcp://server:8899" );
讨论:
对于业余项目,消息传递基础结构不需要太多,但是对于更重要的领域,服务器和都有额外的服务client 实例应该添加一些进一步的正式通信模式行为。
- r/KBD
用于远程键盘,具有类似 CLI
的临时检查实用程序
- KEEP_ALIVE
允许系统范围状态/性能监控的转发器
- SIG_EXIT
允许系统范围/实例特定的处理程序 SIG_EXIT
s
- distributed syslog
服务允许安全地收集/存储日志记录的非阻塞副本(无论是在调试阶段还是性能-tuninc 阶段或生产级记录-证据收集 )
- Identity Management
审计追踪等工具
- WhiteList/BlackList
用于增加基础架构的稳健性,使其更好地抵抗 DoS 攻击/中毒错误的 NIC 流量突发等
- Adaptive Node re-Discovery
用于更智能/临时基础设施设计和状态监控或多角色/(N + M)阴影主热备role-handover/takeover场景等出场
总结
A1
:是的,完全在 ZeroMQ 能力范围内
A2
:是的,ZeroMQ 书籍/指南中的 C++ 代码示例可用
A3
: Ref.: A1, 加可以喜欢深入评论 in Martin SUSTRIK's post on "Differences between nanomsg
and ZeroMQ
"
希望您能享受分布式处理的力量,无论是 ZeroMQ
还是 nanomsg
或两者都支持。
只有自己的想象力才是极限
如果有兴趣further details, one might love the book referred to in the The Best Next Step section of this post
我正在开发一个 C++ 程序,它需要能够从任意数量的其他客户端发送/接收 JSON
-有效负载。
起初,我尝试实现 PubNub 服务,但我发现我无法同时获取和发布消息(即使在不同的线程上使用两个不同的上下文)。我需要能够做到这一点。我还发现 PubNub 的延迟太长,我不喜欢。
我遇到了 ZeroMQ library which has a PUB/SUB
model that would suit my needs. But all examples 我遇到了解释如何以一个进程是发布者或订阅者的方式实现它,而不是同时两者.
理想情况下,我想编写一个服务器,将来自任何人的所有消息转发给订阅消息中指定的特定频道的任何人。 任何人都应该能够接收消息并向网络上的任何其他人发布消息, 只要他们订阅了正确的频道。
UPDATE 1:
注意:我不需要接收保险,因为有效负载 N+1 将优先于有效负载 N。我想要一个发送后忘记的通信方式(类似 UDP) .
根据要求:PubNub 限制 32 kB
每个 JSON
-payload 对我来说是完美的,我不需要更多。事实上,我的有效载荷平均在 4 kB
左右。所有客户端实例都将 运行 在同一个本地网络上,因此理想情况下延迟应小于 5 ms
。至于客户端的数量,一次不会超过4个客户端订阅同一个channel/topic。
UPDATE 2 :
我无法提前预测会存在多少 channels/topics,但大约有几十个(大部分时间)、数百个(高峰期)。不是几千。
问题:
Q1:
- 我可以使用 ZeroMQ
实现这样的行为吗?
Q2:
- 是否有任何工作示例证明(最好在 C++
中)?
Q3:
- 如果没有,对 C++
中的图书馆有什么建议吗?
Q1:
- Can I implement such a behavior using ZeroMQ ?
当然可以;但可能不使用 PUB/SUB
套接字。
使用 PUB/SUB
的方法是这样的:对于系统中的每个节点,你制作一个 PUB
套接字和一个 SUB
套接字,并连接单个 SUB
套接字到 所有 其他节点的 PUB
套接字,并相应地设置您的订阅过滤器。这在其用途上是有限的,因为(我认为)您需要为所有连接设置相同的过滤器。请注意,您绝对应该不在每个节点中创建多个上下文。
如果您的节点总数较少(例如 10-20 个或更少),您可以为每个节点创建一个 PUB
套接字和 N-1 SUB
个套接字(仍然全部在一个上下文中,) 并将每个 SUB
套接字连接到其他节点的每个 PUB
套接字。
如果您对客户端和服务器节点有清晰的概念,您可以使用较新的 CLIENT/SERVER
套接字(我相信在 4.2
或 4.1
中可用。)优雅且可能更易于管理,但您必须自己实施内容过滤 ("channels");这可能非常简单,也可能有点复杂,具体取决于您想要做什么。
Q2:
- Is there any working sample demonstrating that (preferably in C++) ?
据我所知没有。
Q3:
- If not, any suggestions for a library in C++ ?
我仍然建议 ZeroMQ
,因为它的重量相对较轻,界面简洁优雅,功能全面,并且能够使用多种语言工作。有很多套接字组合可供选择。如果情况变得更糟,您始终可以在任何地方使用 PAIR
套接字。
nanomsg
与 BUS
协议,参见 http://nanomsg.org/documentation-zeromq.html
ZeroMQ :
is capable of serving this task well within scales given above
nanomsg :
is capable of serving this task too, a need to cross-check ports/bindings for clientsDesign review:
- client 实例不是持久的,可能会自行出现,可能会自行消失或出错
- client 实例自行决定它要
PUB
-将其定义为消息负载 - client 实例自行决定,它将要
SUB
-scribe 作为实际传入的消息流TOPIC
-filter - 客户端实例交换(发送),它自己,一个普通的,非多部分的,
JSON
-它准备/产生的格式消息 - client 实例收集(接收)消息,它假定这些消息具有相同的、非多部分、
JSON
格式的形状,并且尝试对其进行在接收完成后进行本地处理 - 客户端-实例的最大数量不超过数百
- 任何
JSON
格式的有效负载的最大大小小于32 kB
,大约4 kB
平均 - 跨公共 LAN 冲突域的 E2E 进程到进程交付可接受的最大延迟低于
5,000 [usec]
- 服务器实例是一个中心角色和持久实体
- server 实例为所有 late-joiners' 提供已知的传输-class
URL
-目标.connect()
-s
Proposal:
server may deploy multiple behaviours to meet the given goals, using both thePUB
andSUB
behaviours, and provides a code-driven, fast,SUB
-side attached, non-blocking event-loop.poll()
with aligned re-transmission of any of it'sSUB
-side.recv()
-ed payloads to it'sPUB
-side, currently.connect()
-ed, audience ( live client instances ):
sets_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
ands_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads:
maps_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
ands_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
sets_PUB_send.bind( "tcp://localhost:8899" );
+
sets_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
sets_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
sets_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
sets_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
sets_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
ands_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()
Similarly,
client instance may deploy a reverse-facing tandem of both aPUB
-endpoint andSUB
-endpoint, ready to.connect()
to a known transport-target-URL
.
The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior toZeroMQ v.3.1 API
the plentitude of all messages will get delivered to each client instance over the transport class, however sinceAPI v.3.1+
, the topic-filter is being operated on thePUB
-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases thePUB
-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above )
setc_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
andc_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate theZeroMQ
low-level I/O-threads here:
mapc_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
andc_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
setc_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
setc_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
setc_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
setc_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
setc_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
setc_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
andc_SUB_recv.connect( "tcp://server:8899" );
讨论:
对于业余项目,消息传递基础结构不需要太多,但是对于更重要的领域,服务器和都有额外的服务client 实例应该添加一些进一步的正式通信模式行为。
- r/KBD
用于远程键盘,具有类似 CLI
的临时检查实用程序
- KEEP_ALIVE
允许系统范围状态/性能监控的转发器
- SIG_EXIT
允许系统范围/实例特定的处理程序 SIG_EXIT
s
- distributed syslog
服务允许安全地收集/存储日志记录的非阻塞副本(无论是在调试阶段还是性能-tuninc 阶段或生产级记录-证据收集 )
- Identity Management
审计追踪等工具
- WhiteList/BlackList
用于增加基础架构的稳健性,使其更好地抵抗 DoS 攻击/中毒错误的 NIC 流量突发等
- Adaptive Node re-Discovery
用于更智能/临时基础设施设计和状态监控或多角色/(N + M)阴影主热备role-handover/takeover场景等出场
总结
A1
:是的,完全在 ZeroMQ 能力范围内
A2
:是的,ZeroMQ 书籍/指南中的 C++ 代码示例可用
A3
: Ref.: A1, 加可以喜欢深入评论 in Martin SUSTRIK's post on "Differences between nanomsg
and ZeroMQ
"
希望您能享受分布式处理的力量,无论是 ZeroMQ
还是 nanomsg
或两者都支持。
只有自己的想象力才是极限
如果有兴趣further details, one might love the book referred to in the The Best Next Step section of this post