pyzmq SUB 订阅者如何检测离线 PUB 发布者?
How can a pyzmq SUB subscriber detect an offline PUB publisher?
SUB
-抄写员如何才能真正确定PUB
-刻录机另一边,不然根本无法开始?
详情:
虽然一切都适用于我的 SUB
-scriber 代码 运行 远程 PUB
-lisher,当我 .connect()
+ 将我的客户端订阅到虚拟服务器时说 localhost
,它没有注意到没有 PUB
-listers 运行,它只是开始等待。
我使用标准程序进行:
sock = context.socket(zmq.SUB)
sock.connect("tcp://{}:{}".format(host, port))
topic_filter = 'blah'
sock.setsockopt_string(zmq.SUBSCRIBE, topic_filter)
# here should come something that warns about offline publisher...
Sole PUB/SUB 没有办法创建这个
ZeroMQ 在概念上和实践上都是一个强大的工具箱。人们不应尝试 "bend" 库原语——它们本身可以被理解,而不仅仅是用于更复杂的消息传递和信号发送目的的构建块,而不是现实生活中的解决方案——以便做事最初的S可扩展F普通C通信P 图案设计原型。
PUB/SUB
原型 只是 PUB/SUB
.
PUB
-向当时在场的所有 SUB
广播,如果有的话,向每个 广播SUB
侦听到达的内容(如果有任何内容到达,它会应用过滤器...是的,在 SUB
端加载网络上进行过滤)。
仍然 - 可以设计一种多原型方法来解决这个问题。
让我为最直接的方案画一个基本方案。
假设您完全控制双方(设计方面和实施方面)。
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
# [Side A]
# |_____aKnockSOCK = context.socket( zmq.PAIR )
# | + .setsockopt( zmq.CONFLATE )
# | + .bind()
# |
# |_____anEmitSOCK = context.socket( zmq.PUB )
# + .bind()
#
isNotReceivedSigEXIT = False
while( isNotReceviedSigEXIT ):
if ( 0 == aKnockSOCK.poll( aFastPollIN, zmq.POLLIN ) ):
# nobody new knocking to setup ...
# ------------------------------------------------
# do the main job,
# with countdown segmentation
# to escape to the outer loop
# so as to check for new SUB-s
# knocking as they come to the show
# ------------------------------------------------
else:
aKnockSOCK.send( "aKnockSOCK ACK: service is ready. Go .connect()" )
#
# --------------------------------------------------------
# ZeroMQ resources graceful termination
# socket closes + context dismantle & clean exit
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
#[Side B]
# |_____aKnockSOCK = context.socket( zmq.PAIR )
# | + .setsockopt( zmq.CONFLATE )
# |
# |_____aRecvrSOCK = context.socket( zmq.SUB )
#
#
isToBreakEXIT = False
for nthAttempt in range( 10 ):
if ( isToBreakEXIT ):
break
try:
aKnockSOCK.connect( ... )
except:
ReportConnectERROR( ... )
sleep( ... )
continue
# ---------------------------------- once local aKnockSOCK got instantiated
for kthPoll in range( 10 ):
if ( 0 == aKnockSOCK.poll( aLongPollIN, zmq.POLLIN ) ):
sleep( thisCouldBeAddedToLongPollIN )
else:
# ----------------------------------------- .recv() + dismantle
aKnockSOCK.recv()
aKnockSOCK.setsockopt( zmq.LINGER, 0 )
aKnockSOCK.close()
# ----------------------------------------- .connect() + use
aRecvrSOCK.connect( ... )
aRecvrSOCK.setsockopt( zmq.SUBSCRIBE, ... )
# ----------------------------------------- [Side B] main job start
# ----------------------------------------- [Side B] main job end
isToBreakEXIT = True
break
pass
# --------------------------------------------------------
# ZeroMQ resources graceful termination
# socket closes + context dismantle & clean exit
SUB
-抄写员如何才能真正确定PUB
-刻录机另一边,不然根本无法开始?
详情:
虽然一切都适用于我的 SUB
-scriber 代码 运行 远程 PUB
-lisher,当我 .connect()
+ 将我的客户端订阅到虚拟服务器时说 localhost
,它没有注意到没有 PUB
-listers 运行,它只是开始等待。
我使用标准程序进行:
sock = context.socket(zmq.SUB)
sock.connect("tcp://{}:{}".format(host, port))
topic_filter = 'blah'
sock.setsockopt_string(zmq.SUBSCRIBE, topic_filter)
# here should come something that warns about offline publisher...
Sole PUB/SUB 没有办法创建这个
ZeroMQ 在概念上和实践上都是一个强大的工具箱。人们不应尝试 "bend" 库原语——它们本身可以被理解,而不仅仅是用于更复杂的消息传递和信号发送目的的构建块,而不是现实生活中的解决方案——以便做事最初的S可扩展F普通C通信P 图案设计原型。
PUB/SUB
原型 只是 PUB/SUB
.
PUB
-向当时在场的所有 SUB
广播,如果有的话,向每个 广播SUB
侦听到达的内容(如果有任何内容到达,它会应用过滤器...是的,在 SUB
端加载网络上进行过滤)。
仍然 - 可以设计一种多原型方法来解决这个问题。
让我为最直接的方案画一个基本方案。
假设您完全控制双方(设计方面和实施方面)。
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
# [Side A]
# |_____aKnockSOCK = context.socket( zmq.PAIR )
# | + .setsockopt( zmq.CONFLATE )
# | + .bind()
# |
# |_____anEmitSOCK = context.socket( zmq.PUB )
# + .bind()
#
isNotReceivedSigEXIT = False
while( isNotReceviedSigEXIT ):
if ( 0 == aKnockSOCK.poll( aFastPollIN, zmq.POLLIN ) ):
# nobody new knocking to setup ...
# ------------------------------------------------
# do the main job,
# with countdown segmentation
# to escape to the outer loop
# so as to check for new SUB-s
# knocking as they come to the show
# ------------------------------------------------
else:
aKnockSOCK.send( "aKnockSOCK ACK: service is ready. Go .connect()" )
#
# --------------------------------------------------------
# ZeroMQ resources graceful termination
# socket closes + context dismantle & clean exit
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/
#
#[Side B]
# |_____aKnockSOCK = context.socket( zmq.PAIR )
# | + .setsockopt( zmq.CONFLATE )
# |
# |_____aRecvrSOCK = context.socket( zmq.SUB )
#
#
isToBreakEXIT = False
for nthAttempt in range( 10 ):
if ( isToBreakEXIT ):
break
try:
aKnockSOCK.connect( ... )
except:
ReportConnectERROR( ... )
sleep( ... )
continue
# ---------------------------------- once local aKnockSOCK got instantiated
for kthPoll in range( 10 ):
if ( 0 == aKnockSOCK.poll( aLongPollIN, zmq.POLLIN ) ):
sleep( thisCouldBeAddedToLongPollIN )
else:
# ----------------------------------------- .recv() + dismantle
aKnockSOCK.recv()
aKnockSOCK.setsockopt( zmq.LINGER, 0 )
aKnockSOCK.close()
# ----------------------------------------- .connect() + use
aRecvrSOCK.connect( ... )
aRecvrSOCK.setsockopt( zmq.SUBSCRIBE, ... )
# ----------------------------------------- [Side B] main job start
# ----------------------------------------- [Side B] main job end
isToBreakEXIT = True
break
pass
# --------------------------------------------------------
# ZeroMQ resources graceful termination
# socket closes + context dismantle & clean exit