每个线程或每个调用一个 ZeroMQ 套接字?
One ZeroMQ socket per thread or per call?
众所周知,ZeroMQ 套接字 shall not be shared among application threads.
context_t
实例可以。
我有一个多线程应用程序,我想让每个线程不时与 REQ/REP
-socket 对手方交换消息(事件,异常等),取决于他们在做什么(他们正在做非 ZeroMQ 的东西)。
要将消息发送到我的 REQ/REP
-socket,我使用以下函数
(半 C++ 半伪代码):
sendMessage:
bool sendMessage(std::string s)
{
zmq::socket_t socket(globalContext(), ZMQ_REQ);
socket.connect("ipc://http-concentrator");
zmq::message_t message(s.size());
memcpy(message.data(), s.data(), s.size());
if (!socket.send(message))
return false;
// poll on socket for POLLIN with timeout
socket.recv(&message);
// do something with message
return true;
}
此函数在需要时从每个线程调用。它创建本地套接字、连接、发送消息并接收响应。在退出时,套接字断开连接并移除(至少我假设它已关闭)。
这样,我就不需要在每个线程中维护套接字了。这是以我每次调用此函数时创建和连接为代价的。
我强调了这段代码,我没有发现重用一个套接字和这个重新连接实现之间有什么区别。 (我每秒有 20k REP/REQ
个事务,包括用例两侧的 JSON-decode/encode 个)
问: 是否有更正确的 ZeroMQ 方式来做到这一点?
我认为一个不同是性能。
使用上面的代码,这意味着您需要执行 20k 次创建套接字、建立连接、发送消息和关闭套接字,在我看来这很耗时,您可以 运行 一些性能工具分析来检查函数 sendMessage()
.
使用了多少时间
另一种方法可能为每个线程创建一个请求套接字,并使用它所属的线程的套接字发送数据。 ZeroMQ不支持多线程,否则会导致断言错误(调试模式)或崩溃等错误。
这是我的(当前)解决方案,在 C++11 中,您可以将对象分配给 thread_local
存储。在一个函数中存储 socket_t
-实例 static
和 thread_local
给了我正在寻找的功能:
class socketPool
{
std::string endpoint_;
public:
socketPool(const std::string &ep) : endpoint_(ep) {}
zmq::socket_t & operator()()
{
thread_local static zmq::socket_t socket(
globalContext(),
ZMQ_REQ);
thread_local static bool connected;
if (!connected) {
connected = true;
socket.connect(endpoint_);
}
return socket;
}
};
// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");
在我的 sendMessage()
-函数中,我只是简单地做
而不是创建和连接
bool sendMessage(std::string s)
{
zmq::socket_t &socket = httpReqPool();
// the rest as above
}
关于性能,好吧,它在我的机器上快了 7 倍。 (每秒 140k REQ/REP
)。
Nota Bene: this answer was posted before O/P was changed from 20k TPS to 140k TPS on ipc:// transport-class
Q: Is there a more correct ZeroMQ-way of doing this?
A:Not easy to say what is "this" and what are the parameters of the "correctness"-metric
鉴于此,
以下要点将更通用
并且适用于系统design-phase推理:
避免资源利用间接费用
这个点是一把dual-edge剑。一些开销总是与 REQ
-AccessPoint 到 [= 的基础设施元素设置和处置(是的,甚至是关闭和拆除)相关联12=]-pattern 和关联的 socket-based transport-class 对 REQ
端主机和 REP
端都施加了一些显着的开销。
公平地说,您已经注意到,您在大约 20k TPS 的水平上对这个进行了定量测试,并且没有观察到这种方法的任何不利影响。不清楚的是,是否在同一 SUT ( System-under-Test ) 上也测试了任何其他场景 in-vivo,以便为每个设计的比较提供一些基线(并允许确定管理费用本身)。
虽然一个设计良好的框架从user-maintained代码中隐藏了这部分系统内部行为,但这并不意味着,它都是便宜的,越少free-of-charge处理。
很明显,在 Context()
-实例线程中有一些工作在幕后执行(...是的,复数在这里是正确的,因为某些 high-performance 代码可能受益于每个 Context()
实例使用多个 I/O-threads 并通过在 pattern-socket 和它的各自的 I/O-thread 处理程序(以便以某种方式平衡,如果不能确定地平衡,预期的 I/O-throughput,包括所有相关的开销)。
如果仍有疑问,请始终记住,命令式编程风格的函数或 object-oriented 方法主要是外部调用者的受害者,外部调用者决定在何时何地发生这种情况“en-slaved" code-execution 单元被调用值班并正在执行。 function/method 没有任何自然方法 back-throtling (抑制)它自己从外部调用者调用的频率,并且健壮的设计根本不能仅仅依赖乐观的假设,即此类调用不要经常超过 XYZ-k TPS(上面引用的 20k 可能适合 in-vitro 测试,但实际部署可能会改变几个数量级的数量级(无论是人为的 - 在测试期间,还是不是 - 在一些 peak-hour 或用户(系统)-panic 或由于某些技术错误或硬件故障(我们都听说过很多次关于 NIC-card 淹没 L1/L2 流量超出所有可以想象的限制等al - 我们只是不知道,也无法知道,下次会在何时/何地再次发生。
避免阻塞的风险
提到的 REQ/REP
可扩展正式通信模式以其陷入外部无法解决的分布式内部 dead-lock 的风险而闻名。这始终是一个需要避免的风险。缓解策略可能取决于实际 use-case 的风险价值(需要认证医疗器械、金融科技 use-cases、control-loop use-cases、学术研究论文代码或私人爱好玩具)。
Ref.: REQ/REP
Deadlocks >>>
Fig.1:
为什么使用幼稚是错误的REQ/REP
所有情况当 [App1]
in_WaitToRecvSTATE_W2R
+ [App2]
in_WaitToRecvSTATE_W2R
主要是REQ-FSA/REP-FSA
不可挽救的分布式相互死锁(两个 Finite-State-Automata 中的每一个都等待 "the other" 移动)并且永远不会到达 "next" in_WaitToSendSTATE_W2S
内部状态。
XTRN_RISK_OF_FSA_DEADLOCKED ~ { NETWORK_LoS
: || NETWORK_LoM
: || SIG_KILL( App2 )
: || ...
: }
:
[App1] ![ZeroMQ] : [ZeroMQ] ![App2]
code-control! code-control : [code-control ! code-control
+===========!=======================+ : +=====================!===========+
| ! ZMQ | : | ZMQ ! |
| ! REQ-FSA | : | REP-FSA! |
| !+------+BUF> .connect()| v |.bind() +BUF>------+! |
| !|W2S |___|>tcp:>---------[*]-----(tcp:)--|___|W2R |! |
| .send()>-o--->|___| | | |___|-o---->.recv() |
| ___/ !| ^ | |___| | | |___| ^ | |! \___ |
| REQ !| | v |___| | | |___| | v |! REP |
| \___.recv()<----o-|___| | | |___|<---o-<.send()___/ |
| !| W2R|___| | | |___| W2S|! |
| !+------<BUF+ | | <BUF+------+! |
| ! | | ! |
| ! ZMQ | | ZMQ ! |
| ! REQ-FSA | | REP-FSA ! |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
| ! /\/\/\/\/\/\/\/\/\/\/\| |/\/\/\/\/\/\/\/\/\/\/! |
| ! \/\/\/\/\/\/\/\/\/\/\/| |\/\/\/\/\/\/\/\/\/\/\! |
+===========!=======================+ +=====================!===========+
另一种方法是让 ZeroMQ 与某些 FIFO 队列进行通信的专用线程(当然,必须用互斥锁或类似的东西保护......)。只要队列为空,这个专用线程就应该处于休眠状态,并在状态发生变化时唤醒(收到适当的信号)。
根据一般需要,每当收到对某些传出消息的响应时,专用线程可以简单地调用一些回调(在每个线程的某个专用对象上);请注意,那时您有不同的线程上下文,因此您可能需要一些同步方法来防止竞争条件。
或者,发送线程可以等待响应,由 ZeroMQ 线程在收到响应时发出信号(好吧,这实际上 是 防止竞争条件的方法之一...).
众所周知,ZeroMQ 套接字 shall not be shared among application threads.context_t
实例可以。
我有一个多线程应用程序,我想让每个线程不时与 REQ/REP
-socket 对手方交换消息(事件,异常等),取决于他们在做什么(他们正在做非 ZeroMQ 的东西)。
要将消息发送到我的 REQ/REP
-socket,我使用以下函数
(半 C++ 半伪代码):
sendMessage:
bool sendMessage(std::string s)
{
zmq::socket_t socket(globalContext(), ZMQ_REQ);
socket.connect("ipc://http-concentrator");
zmq::message_t message(s.size());
memcpy(message.data(), s.data(), s.size());
if (!socket.send(message))
return false;
// poll on socket for POLLIN with timeout
socket.recv(&message);
// do something with message
return true;
}
此函数在需要时从每个线程调用。它创建本地套接字、连接、发送消息并接收响应。在退出时,套接字断开连接并移除(至少我假设它已关闭)。
这样,我就不需要在每个线程中维护套接字了。这是以我每次调用此函数时创建和连接为代价的。
我强调了这段代码,我没有发现重用一个套接字和这个重新连接实现之间有什么区别。 (我每秒有 20k REP/REQ
个事务,包括用例两侧的 JSON-decode/encode 个)
问: 是否有更正确的 ZeroMQ 方式来做到这一点?
我认为一个不同是性能。
使用上面的代码,这意味着您需要执行 20k 次创建套接字、建立连接、发送消息和关闭套接字,在我看来这很耗时,您可以 运行 一些性能工具分析来检查函数 sendMessage()
.
另一种方法可能为每个线程创建一个请求套接字,并使用它所属的线程的套接字发送数据。 ZeroMQ不支持多线程,否则会导致断言错误(调试模式)或崩溃等错误。
这是我的(当前)解决方案,在 C++11 中,您可以将对象分配给 thread_local
存储。在一个函数中存储 socket_t
-实例 static
和 thread_local
给了我正在寻找的功能:
class socketPool
{
std::string endpoint_;
public:
socketPool(const std::string &ep) : endpoint_(ep) {}
zmq::socket_t & operator()()
{
thread_local static zmq::socket_t socket(
globalContext(),
ZMQ_REQ);
thread_local static bool connected;
if (!connected) {
connected = true;
socket.connect(endpoint_);
}
return socket;
}
};
// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");
在我的 sendMessage()
-函数中,我只是简单地做
bool sendMessage(std::string s)
{
zmq::socket_t &socket = httpReqPool();
// the rest as above
}
关于性能,好吧,它在我的机器上快了 7 倍。 (每秒 140k REQ/REP
)。
Nota Bene: this answer was posted before O/P was changed from 20k TPS to 140k TPS on ipc:// transport-class
Q: Is there a more correct ZeroMQ-way of doing this?
A:Not easy to say what is "this" and what are the parameters of the "correctness"-metric
鉴于此,
以下要点将更通用
并且适用于系统design-phase推理:
避免资源利用间接费用
这个点是一把dual-edge剑。一些开销总是与 REQ
-AccessPoint 到 [= 的基础设施元素设置和处置(是的,甚至是关闭和拆除)相关联12=]-pattern 和关联的 socket-based transport-class 对 REQ
端主机和 REP
端都施加了一些显着的开销。
公平地说,您已经注意到,您在大约 20k TPS 的水平上对这个进行了定量测试,并且没有观察到这种方法的任何不利影响。不清楚的是,是否在同一 SUT ( System-under-Test ) 上也测试了任何其他场景 in-vivo,以便为每个设计的比较提供一些基线(并允许确定管理费用本身)。
虽然一个设计良好的框架从user-maintained代码中隐藏了这部分系统内部行为,但这并不意味着,它都是便宜的,越少free-of-charge处理。
很明显,在 Context()
-实例线程中有一些工作在幕后执行(...是的,复数在这里是正确的,因为某些 high-performance 代码可能受益于每个 Context()
实例使用多个 I/O-threads 并通过在 pattern-socket 和它的各自的 I/O-thread 处理程序(以便以某种方式平衡,如果不能确定地平衡,预期的 I/O-throughput,包括所有相关的开销)。
如果仍有疑问,请始终记住,命令式编程风格的函数或 object-oriented 方法主要是外部调用者的受害者,外部调用者决定在何时何地发生这种情况“en-slaved" code-execution 单元被调用值班并正在执行。 function/method 没有任何自然方法 back-throtling (抑制)它自己从外部调用者调用的频率,并且健壮的设计根本不能仅仅依赖乐观的假设,即此类调用不要经常超过 XYZ-k TPS(上面引用的 20k 可能适合 in-vitro 测试,但实际部署可能会改变几个数量级的数量级(无论是人为的 - 在测试期间,还是不是 - 在一些 peak-hour 或用户(系统)-panic 或由于某些技术错误或硬件故障(我们都听说过很多次关于 NIC-card 淹没 L1/L2 流量超出所有可以想象的限制等al - 我们只是不知道,也无法知道,下次会在何时/何地再次发生。
避免阻塞的风险
提到的 REQ/REP
可扩展正式通信模式以其陷入外部无法解决的分布式内部 dead-lock 的风险而闻名。这始终是一个需要避免的风险。缓解策略可能取决于实际 use-case 的风险价值(需要认证医疗器械、金融科技 use-cases、control-loop use-cases、学术研究论文代码或私人爱好玩具)。
Ref.:
REQ/REP
Deadlocks >>>
Fig.1:
为什么使用幼稚是错误的REQ/REP
所有情况当 [App1]
in_WaitToRecvSTATE_W2R
+ [App2]
in_WaitToRecvSTATE_W2R
主要是REQ-FSA/REP-FSA
不可挽救的分布式相互死锁(两个 Finite-State-Automata 中的每一个都等待 "the other" 移动)并且永远不会到达 "next" in_WaitToSendSTATE_W2S
内部状态。
XTRN_RISK_OF_FSA_DEADLOCKED ~ { NETWORK_LoS
: || NETWORK_LoM
: || SIG_KILL( App2 )
: || ...
: }
:
[App1] ![ZeroMQ] : [ZeroMQ] ![App2]
code-control! code-control : [code-control ! code-control
+===========!=======================+ : +=====================!===========+
| ! ZMQ | : | ZMQ ! |
| ! REQ-FSA | : | REP-FSA! |
| !+------+BUF> .connect()| v |.bind() +BUF>------+! |
| !|W2S |___|>tcp:>---------[*]-----(tcp:)--|___|W2R |! |
| .send()>-o--->|___| | | |___|-o---->.recv() |
| ___/ !| ^ | |___| | | |___| ^ | |! \___ |
| REQ !| | v |___| | | |___| | v |! REP |
| \___.recv()<----o-|___| | | |___|<---o-<.send()___/ |
| !| W2R|___| | | |___| W2S|! |
| !+------<BUF+ | | <BUF+------+! |
| ! | | ! |
| ! ZMQ | | ZMQ ! |
| ! REQ-FSA | | REP-FSA ! |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
| ! /\/\/\/\/\/\/\/\/\/\/\| |/\/\/\/\/\/\/\/\/\/\/! |
| ! \/\/\/\/\/\/\/\/\/\/\/| |\/\/\/\/\/\/\/\/\/\/\! |
+===========!=======================+ +=====================!===========+
另一种方法是让 ZeroMQ 与某些 FIFO 队列进行通信的专用线程(当然,必须用互斥锁或类似的东西保护......)。只要队列为空,这个专用线程就应该处于休眠状态,并在状态发生变化时唤醒(收到适当的信号)。
根据一般需要,每当收到对某些传出消息的响应时,专用线程可以简单地调用一些回调(在每个线程的某个专用对象上);请注意,那时您有不同的线程上下文,因此您可能需要一些同步方法来防止竞争条件。
或者,发送线程可以等待响应,由 ZeroMQ 线程在收到响应时发出信号(好吧,这实际上 是 防止竞争条件的方法之一...).