每个线程或每个调用一个 ZeroMQ 套接字?

One ZeroMQ socket per thread or per call?

众所周知,ZeroMQ 套接字 shall not be shared among application threads.
context_t 实例可以。

我有一个多线程应用程序,我想让每个线程不时与 REQ/REP-socket 对手方交换消息(事件,异常等),取决于他们在做什么(他们正在做非 ZeroMQ 的东西)。

要将消息发送到我的 REQ/REP-socket,我使用以下函数
(半 C++ 半伪代码):

sendMessage:

bool sendMessage(std::string s)
{
    zmq::socket_t socket(globalContext(), ZMQ_REQ);
    socket.connect("ipc://http-concentrator");

    zmq::message_t message(s.size());
    memcpy(message.data(), s.data(), s.size());
    if (!socket.send(message))
        return false;

    // poll on socket for POLLIN with timeout

    socket.recv(&message);
    // do something with message

    return true;
}

此函数在需要时从每个线程调用。它创建本地套接字、连接、发送消息并接收响应。在退出时,套接字断开连接并移除(至少我假设它已关闭)。

这样,我就不需要在每个线程中维护套接字了。这是以我每次调用此函数时创建和连接为代价的。

我强调了这段代码,我没有发现重用一个套接字和这个重新连接实现之间有什么区别。 (我每秒有 20k REP/REQ 个事务,包括用例两侧的 JSON-decode/encode 个)

问: 是否有更正确的 ZeroMQ 方式来做到这一点?

我认为一个不同是性能。

使用上面的代码,这意味着您需要执行 20k 次创建套接字、建立连接、发送消息和关闭套接字,在我看来这很耗时,您可以 运行 一些性能工具分析来检查函数 sendMessage().

使用了多少时间

另一种方法可能为每个线程创建一个请求套接字,并使用它所属的线程的套接字发送数据。 ZeroMQ不支持多线程,否则会导致断言错误(调试模式)或崩溃等错误。

这是我的(当前)解决方案,在 C++11 中,您可以将对象分配给 thread_local 存储。在一个函数中存储 socket_t-实例 staticthread_local 给了我正在寻找的功能:

class socketPool
{
    std::string endpoint_;

public:
    socketPool(const std::string &ep) : endpoint_(ep) {}

    zmq::socket_t & operator()()
    {
        thread_local static zmq::socket_t socket(
                globalContext(), 
                ZMQ_REQ);
        thread_local static bool connected;

        if (!connected) {
            connected = true;
            socket.connect(endpoint_);
        }

        return socket;
    }
};

// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");

在我的 sendMessage()-函数中,我只是简单地做

而不是创建和连接
bool sendMessage(std::string s)
{
    zmq::socket_t &socket = httpReqPool();

    // the rest as above
}

关于性能,好吧,它在我的机器上快了 7 倍。 (每秒 140k REQ/REP)。

Nota Bene: this answer was posted before O/P was changed from 20k TPS to 140k TPS on ipc:// transport-class

Q: Is there a more correct ZeroMQ-way of doing this?

A:Not easy to say what is "this" and what are the parameters of the "correctness"-metric

鉴于此,
以下要点将更通用
并且适用于系统design-phase推理:


避免资源利用间接费用

这个点是一把dual-edge剑。一些开销总是与 REQ-AccessPoint 到 [= 的基础设施元素设置和处置(是的,甚至是关闭和拆除)相关联12=]-pattern 和关联的 socket-based transport-class 对 REQ 端主机和 REP 端都施加了一些显着的开销。

公平地说,您已经注意到,您在大约 20k TPS 的水平上对这个进行了定量测试,并且没有观察到这种方法的任何不利影响。不清楚的是,是否在同一 SUT ( System-under-Test ) 上也测试了任何其他场景 in-vivo,以便为每个设计的比较提供一些基线(并允许确定管理费用本身)。

虽然一个设计良好的框架从user-maintained代码中隐藏了这部分系统内部行为,但这并不意味着,它都是便宜的,越少free-of-charge处理。

很明显,在 Context()-实例线程中有一些工作在幕后执行(...是的,复数在这里是正确的,因为某些 high-performance 代码可能受益于每个 Context() 实例使用多个 I/O-threads 并通过在 pattern-socket 和它的各自的 I/O-thread 处理程序(以便以某种方式平衡,如果不能确定地平衡,预期的 I/O-throughput,包括所有相关的开销)。

如果仍有疑问,请始终记住,命令式编程风格的函数或 object-oriented 方法主要是外部调用者的受害者,外部调用者决定在何时何地发生这种情况“en-slaved" code-execution 单元被调用值班并正在执行。 function/method 没有任何自然方法 back-throtling (抑制)它自己从外部调用者调用的频率,并且健壮的设计根本不能仅仅依赖乐观的假设,即此类调用不要经常超过 XYZ-k TPS(上面引用的 20k 可能适合 in-vitro 测试,但实际部署可能会改变几个数量级的数量级(无论是人为的 - 在测试期间,还是不是 - 在一些 peak-hour 或用户(系统)-panic 或由于某些技术错误或硬件故障(我们都听说过很多次关于 NIC-card 淹没 L1/L2 流量超出所有可以想象的限制等al - 我们只是不知道,也无法知道,下次会在何时/何地再次发生。

避免阻塞的风险

提到的 REQ/REP 可扩展正式通信模式以其陷入外部无法解决的分布式内部 dead-lock 的风险而闻名。这始终是一个需要避免的风险。缓解策略可能取决于实际 use-case 的风险价值(需要认证医疗器械、金融科技 use-cases、control-loop use-cases、学术研究论文代码或私人爱好玩具)。

Ref.: REQ/REP Deadlocks >>>

Fig.1:为什么使用幼稚是错误的REQ/REP
所有情况当 [App1]in_WaitToRecvSTATE_W2R + [App2]in_WaitToRecvSTATE_W2R
主要是REQ-FSA/REP-FSA 不可挽救的分布式相互死锁(两个 Finite-State-Automata 中的每一个都等待 "the other" 移动)并且永远不会到达 "next" in_WaitToSendSTATE_W2S 内部状态。

               XTRN_RISK_OF_FSA_DEADLOCKED ~ {  NETWORK_LoS
                                         :   || NETWORK_LoM
                                         :   || SIG_KILL( App2 )
                                         :   || ...
                                         :      }
                                         :
[App1]      ![ZeroMQ]                    :    [ZeroMQ]              ![App2] 
code-control! code-control               :    [code-control         ! code-control
+===========!=======================+    :    +=====================!===========+
|           ! ZMQ                   |    :    |              ZMQ    !           |
|           ! REQ-FSA               |    :    |              REP-FSA!           |
|           !+------+BUF> .connect()|    v    |.bind()  +BUF>------+!           |
|           !|W2S   |___|>tcp:>---------[*]-----(tcp:)--|___|W2R   |!           |
|     .send()>-o--->|___|           |         |         |___|-o---->.recv()     |
| ___/      !| ^  | |___|           |         |         |___| ^  | |!      \___ |
| REQ       !| |  v |___|           |         |         |___| |  v |!       REP |
| \___.recv()<----o-|___|           |         |         |___|<---o-<.send()___/ |
|           !|   W2R|___|           |         |         |___|   W2S|!           |
|           !+------<BUF+           |         |         <BUF+------+!           |
|           !                       |         |                     !           |
|           ! ZMQ                   |         |   ZMQ               !           |
|           ! REQ-FSA               |         |   REP-FSA           !           |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
|           ! /\/\/\/\/\/\/\/\/\/\/\|         |/\/\/\/\/\/\/\/\/\/\/!           |
|           ! \/\/\/\/\/\/\/\/\/\/\/|         |\/\/\/\/\/\/\/\/\/\/\!           |
+===========!=======================+         +=====================!===========+

另一种方法是让 ZeroMQ 与某些 FIFO 队列进行通信的专用线程(当然,必须用互斥锁或类似的东西保护......)。只要队列为空,这个专用线程就应该处于休眠状态,并在状态发生变化时唤醒(收到适当的信号)。

根据一般需要,每当收到对某些传出消息的响应时,专用线程可以简单地调用一些回调(在每个线程的某个专用对象上);请注意,那时您有不同的线程上下文,因此您可能需要一些同步方法来防止竞争条件。

或者,发送线程可以等待响应,由 ZeroMQ 线程在收到响应时发出信号(好吧,这实际上 防止竞争条件的方法之一...).