pyzmq SUB 订阅者如何检测离线 PUB 发布者?

How can a pyzmq SUB subscriber detect an offline PUB publisher?

SUB-抄写员如何才能真正确定PUB-刻录机另一边,不然根本无法开始?

详情:
虽然一切都适用于我的 SUB-scriber 代码 运行 远程 PUB-lisher,当我 .connect()+ 将我的客户端订阅到虚拟服务器时说 localhost,它没有注意到没有 PUB-listers 运行,它只是开始等待。

我使用标准程序进行:

    sock = context.socket(zmq.SUB)
    sock.connect("tcp://{}:{}".format(host, port))
    topic_filter = 'blah'
    sock.setsockopt_string(zmq.SUBSCRIBE, topic_filter)
    # here should come something that warns about offline publisher...

Sole PUB/SUB 没有办法创建这个

ZeroMQ 在概念上和实践上都是一个强大的工具箱。人们不应尝试 "bend" 库原语——它们本身可以被理解,而不仅仅是用于更复杂的消息传递和信号发送目的的构建块,而不是现实生活中的解决方案——以便做事最初的S可扩展F普通C通信P 图案设计原型。

PUB/SUB 原型 只是 PUB/SUB.

PUB-向当时在场的所有 SUB 广播,如果有的话,向每个 广播SUB 侦听到达的内容(如果有任何内容到达,它会应用过滤器...是的,在 SUB 端加载网络上进行过滤)。


仍然 - 可以设计一种多原型方法来解决这个问题。

让我为最直接的方案画一个基本方案。

假设您完全控制双方(设计方面和实施方面)。

# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
# [Side A]
#        |_____aKnockSOCK = context.socket( zmq.PAIR )
#        |     + .setsockopt( zmq.CONFLATE )
#        |     + .bind()
#        |
#        |_____anEmitSOCK = context.socket( zmq.PUB )
#              + .bind()
#
isNotReceivedSigEXIT  = False
while( isNotReceviedSigEXIT ):
       if ( 0 == aKnockSOCK.poll( aFastPollIN, zmq.POLLIN ) ):
           # nobody new knocking to setup ...
           # ------------------------------------------------
           # do the main job,
           #    with countdown segmentation
           #    to escape to the outer loop
           #    so as to check for new SUB-s
           #    knocking as they come to the show
           # ------------------------------------------------
       else:
           aKnockSOCK.send( "aKnockSOCK ACK: service is ready. Go .connect()" )
#
# --------------------------------------------------------
# ZeroMQ resources graceful termination 
#        socket closes + context dismantle & clean exit



# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
#[Side B]
#        |_____aKnockSOCK = context.socket( zmq.PAIR )
#        |     + .setsockopt( zmq.CONFLATE )
#        |
#        |_____aRecvrSOCK = context.socket( zmq.SUB )
#
#
isToBreakEXIT = False
for nthAttempt in range( 10 ):
    if ( isToBreakEXIT ):
         break
    try:
          aKnockSOCK.connect( ... )
    except:
          ReportConnectERROR( ... )
          sleep( ... )
          continue
    # ---------------------------------- once local aKnockSOCK got instantiated
    for kthPoll in range( 10 ):
        if ( 0 == aKnockSOCK.poll( aLongPollIN, zmq.POLLIN ) ):
           sleep( thisCouldBeAddedToLongPollIN )
        else:
           # ----------------------------------------- .recv() + dismantle
           aKnockSOCK.recv()
           aKnockSOCK.setsockopt( zmq.LINGER, 0 )
           aKnockSOCK.close()
           # ----------------------------------------- .connect() + use
           aRecvrSOCK.connect( ... )
           aRecvrSOCK.setsockopt( zmq.SUBSCRIBE, ... )
           # ----------------------------------------- [Side B] main job start
           # ----------------------------------------- [Side B] main job end
           isToBreakEXIT = True
           break
    pass
# --------------------------------------------------------
# ZeroMQ resources graceful termination 
#        socket closes + context dismantle & clean exit