ZMQ 经销商 - 路由器通讯

ZMQ DEALER - ROUTER Communication

我目前正在从事一个项目,该项目需要通过来自分布式系统的某些实体的不同数据类型的网络进行一些通信,并且我正在使用 ZMQ。

该项目的主要目标是拥有一个中央节点,为可以随时连接的客户端提供服务。对于每个连接的客户端,中心节点应该管理两者之间的消息通信。

目前,所有通信都通过 TCP 进行。

客户端需要随时收发消息所以是ZMQ_DEALER类型的套接字,中心节点是ZMQ_ROUTER

最初,目标是来自某个客户端的一条消息,这条消息到达其他客户端。这意味着其他客户端可以看到所有相同的数据。

我一直在使用 Asynchronous Client/Server pattern,因为我有兴趣让多个客户端以协作方式相互交谈,可能有服务器代理或中间件。

我有一个连接到 ZMQ_ROUTER 套接字服务器的 ZMQ_DEALER 套接字客户端

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main(int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t client(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    client.connect(endpoint);
    for (int request = 0; request < 10; request++)
    {

        s_sendmore(client, "");
        s_send(client, "Testing sending some data");

        std::string string = s_recv(client);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }
}

在我的服务器代码上,我有一个 ZMQ_ROUTER 用于接收和管理消息,将其绑定到一个良好的端口。此服务器是在 Python

中制作的
import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)

print("Creating Server Network Manager Router")

while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        print(message)
        frontend.send_multipart(message)

在我的另一个 peer/client 我有以下内容:

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main (int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t peer2(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    peer2.connect(endpoint);
    //s_sendmore(peer2, "");
    //s_send(peer2, "Probando");

    //std::string string = s_recv(peer2);

    //std::cout << "Received reply " << " [" << string << "]" << std::endl;

    for (int request = 0; request < 10; request++)
    {

        s_sendmore(peer2, "");
        s_send(peer2, "Probando");

        std::string string = s_recv(peer2);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }

}

更新

但是每次我执行某个客户端时,它们各自的消息都不会到达另一个对等客户端。 消息到达 ZMQ_ROUTER,并 returned 到 ZMQ_DEALER 发件人来源。

这是因为身份帧在接收时先于ROUTER,消息是通过ROUTER发回的;它删除身份并使用该值将消息路由回相关经销商,according to the ZMQ_ROUTER section to the end page here.

这是逻辑,我正在将我的 DEALER 的身份发送到 ROUTERROUTER 将那个身份框架和 return 发送到我的 DEALER 消息

首先,从我的实施开始,我需要任何经销商发送的一些消息,这将被任何其他经销商可视化,无论有多少经销商(一个或多个)连接到 ZMQ_ROUTER。 从这个意义上说...是否有必要满足其他DEALER或其他DEALERS的身份框架?

如果我有DEALER ADEALER BDEALER CROUTER

然后:

DEALER A 发送消息... 我希望来自经销商 A 的消息到达 DEALER BDEALER C 以及其他 DEALERS 可以加入我的会话对话...

在这个想法顺序中,是否有必要在DEALER A这边满足DEALER BDEALER C的身份框架,以便此消息到达他?

如何知道我的实现中存在的每个 DEALER 的身份框架? 这是在ROUTER端做的? 我还没有清除这个

您可以让所有客户端在 start-up 发送一条 "I am here" 消息。然后中央服务器可以存储所有 ID,c.f。工人和路由器之间的初始通信在这里:http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker. The server would send out any received message to all currently known clients. You should add some heart beating in order to detect disconnected clients, c.f. http://zguide.zeromq.org/page:all#Heartbeating.

但是ZeroMQ已经自带了这样的通信模式:PUBSUB。本质上,每个客户端都会有一个 DEALER 和一个 SUB 套接字连接到服务器 ROUTERPUB 套接字。服务器简单地发送任何接收到的 通过 PUB 套接字向 所有 客户端发送消息。如果这对原始客户端来说是个问题,您可以在消息中包含客户端 ID,以便每个客户端都可以过滤掉具有自己 ID 的消息。另请参阅指南中的示例 http://zguide.zeromq.org/page:all#Getting-an-Out-of-Band-Snapshot

另一个有趣的模式是 Republishing Updates from Clients

此处PUSH--PULL用于向服务器发送更新。如果不需要来自服务器的回复消息,这是有意义的。如果您不需要该示例中的状态请求,则可以省略 ROUTER--DEALER 部分。为了简洁起见,这里使用 Python 的示例实现。服务器监听 PULL 套接字并通过 PUB 套接字发送所有内容:

import zmq

def main():
    # context and sockets
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    while True:
        message = collector.recv()
        print "I: publishing update %s" % message
        publisher.send(message)

if __name__ == '__main__':
    main()

客户端监听 PUB 套接字一段时间。如果收到一条消息,它会被记录下来。如果达到超时,将以十分之一的几率生成一条消息:

import random
import time

import zmq

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    while True:
        if subscriber.poll(100) & zmq.POLLIN:
            message = subscriber.recv()
            print "I: received message %s" % message
        else:
            rand = random.randint(1, 100)
            if rand < 10:
                publisher.send("%d" % rand)
                print "I: sending message %d" % rand

if __name__ == '__main__':
    main()

(过早地) 授予的答案不符合定义的属性。

分布式系统需要智能高效操作,因为代理是分布式的,error-analyses和部署production-issues 分析/测试/调试非常昂贵。

因此 problem-incompatible 想法的 copy/paste re-use 既不是实现前者的方法,也不是实现后者的方法。


所以,我们先回顾一下效率:

client-[A].send()-s一个消息,说O/P想变成server-side-[S].recv()-ed 和 re-broadcast 所有 other 客户-[B,C,...],除了 [A]-本身。

最 resources-efficient 的方法是正确配置基础设施工具来做到这一点,没有 re-inventing wheel and/or 使用脆弱的 performance-devastating 脚手架代码(s).

所以:

在客户端-[*] 端最好使用下面的草图原语agent-concept。更复杂的设置,比如使用聪明的 event-handling 工具,因为 Tkinter 已经发展到已经打包到 .mainloop() soft-real-time 系统中,更好,但它在多个方面开始 design-battles 并不容易,所以现在让我们把事情简单化:

zmq_VERSION      = zmq.zmq_version_info()
anAgentsIDENTITY = whateverHashOrHumanReadableSTRING
notMINE          = anAgentsIDENTITY

if     zmq_VERSION[0] < 4:
           print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )
aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#SUB ---------------------------------------------------------------------------
aSUB = aCTX.socket( zmq.SUB )
aSUB.setsockopt(    zmq.LINGER,          0 )   # protect your agent
aSUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your agent from DoS
aSUB.setsockopt(    zmq.AFFINITY,        1 )   # protect your server resources
aSUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aSUB.setsockopt(    zmq.SUBSCRIBE,       notMINE )  #  NEVER .recv()-s  data back
...
#SUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

aSUB.connect(      "tcp://localhost:5557" )

#PUSH --------------------------------------------------------------------------
aPUSH = aCTX.socket( zmq.PUSH )
...
#PUSH PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True; anAgentSignsWithIdentityPREFIX = anAgentsIDENTITY
while notSoftFLAG:

    if aReasonToSendSomethingToServer:
       aPUSH.send( anAgentSignsWithIdentityPREFIX
                 + ":::"
                 + aMsgPAYLOAD,
                   zmq.DONTWAIT
                   )                          # inspect ZMQError
       ...
       pass

    if aSUB.poll( 100 ):
       message = aSUB.recv( zmq.DONTWAIT )    #  NEVER .recv()-s own data back
       ...
       pass


    if aReasonToFlagLoopEXIT:
       notSoftFLAG = False
       ...
       pass

    if ...:
       ...
       pass

#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aSUB, aPUSH, ) ]
...

aCTX.term()
#   .term()
#########

服务器可以避免 ALL 任何需要任何 ad-hoc 处理的麻烦:

所有在 ZeroMQ 基础架构中都得到了很好的调整:

pass;  zmq_VERSION = zmq.zmq_version_info()
if     zmq_VERSION[0] < 4:
           print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )

aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#PUB ---------------------------------------------------------------------------
aPUB = aCTX.socket( zmq.PUB )
aPUB.setsockopt(    zmq.LINGER,          0 )   # protect your server
aPUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your server from DoS
aPUB.setsockopt(    zmq.AFFINITY,        3 )   # protect your server resources
aPUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aPUB.setsockopt(    zmq.IMMEDIATE,       1 )   # avoid Queueing for dead-ends
aPUB.setsockopt(    zmq.TOS,             tos ) # allow for L3-router TOS-policies
...
#PUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPUB.bind(   "tcp://*:5557" )                  # expose AccessPoint on tcp://

#PULL --------------------------------------------------------------------------
aPULL = aCTX.socket( zmq.PULL )
aPULL.setsockopt(    zmq.LINGER,          0 )  # protect your server
aPULL.setsockopt(    zmq.MAXMSGSIZE,      m )  # protect your server from DoS
aPULL.setsockopt(    zmq.AFFINITY,        3 )  # protect your server resources
aPULL.setsockopt(    zmq.HEARTBEAT_IVL,   ivl )#     server L3-helper Heartbeats
aPULL.setsockopt(    zmq.HEARTBEAT_TTL,   ttl )#     server L3-helper Heartbeats
...
#PULL PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPULL.bind(   "tcp://*:5558" )                 # expose AccessPoint on tcp://
...

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True
while notSoftFLAG:
    NOP_SLEEP = 10                            #  set a 10 [ms] sleep in case NOP
    if aPULL.poll( 0 ):                       #  NEVER block/wait
       aMSG = aPULL.recv( zmq.DONTWAIT )      #  NEVER .recv()-s own data back
       #CPY = zmq_msg_copy( &aMSG );          // WARNING ABOUT NATIVE C-API
       #                                      // HANDLING, NEED .COPY()
       #                                      //           NEED .CLOSE()
       aPUB.send( aMSG,   zmq.DONTWAIT )      #  re-PUB-lish to all others but sender
       ...< process aMSG payload on server-side, if needed >...

       NOP_SLEEP = 0                          # !NOP, avoid 10[ms] NOP-loop sleep
       pass

    if aReasonToFlagLoopEXIT:
       notSoftFLAG = False
       ...
       NOP_SLEEP = 0
       pass

    if ...:
       ...
       pass

    sleep( NOP_SLEEP )                        # a soft-real-time controlled sleep on NOP
#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aPUB, aPULL, ) ]
...

aCTX.term()
#   .term()
#########