在ZeroMQ中能否实现如下模型?
Can the following model be achieved in ZeroMQ?
Where A理论上'acts'像一个服务器,其中D和E 既是订阅者又是发布者,F 是订阅者。
如果确实可行,将不胜感激。
如果不是,请提供替代方案。 Websockets 不是我的目标中的一个选项。
当然可以:
最佳
首先阅读 [] 部分中的主要概念差异。
为简单起见,我们假设只有一个 tcp://
传输-class 和具有独立 IP 地址的节点(很容易转换为任何共同定位案例 and/or 不同的传输方式-class 混合)。
对于任何更大的卷 and/or 严格的延迟管理案例,性能调整都是必须的。
节点-A:
import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass; aPUB.bind( "tcp:10.0.0.1:12345" )
pass; aPUB.setsockopt( zmq.CONFLATE, 1 )
pass; print( "A: Started. Can Ctrl+C." )
while True:
try:
aPUB.send( "A: sending ...[{0:}]".format( time.ctime() ); time.sleep( 1 )
except KeyboardInterrupt:
pass; print( "A: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aCTX.term()
节点-B(,C):
import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass; aPUB.bind( "tcp:10.0.0.10:23456" )
pass; aPUB.setsockopt( zmq.CONFLATE, 1 )
pass; aSUB = aCTX.socket( zmq.SUB )
pass; aSUB.connect( "tcp://10.0.0.1:12345" )
pass; aSUB.setsockopt( zmq.LINGER, 0 )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "A:" )
pass; print( "B: Started. Can Ctrl+C." )
while True:
try:
aPUB.send( "B: sending ...[{0:}]".format( time.ctime() );
if ( 0 != aSUB.poll( 500, zmq.POLLIN ) ):
print( "B:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )
except KeyboardInterrupt:
pass; print( "B: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()
节点-D(,E):
import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass; aPUB.bind( "tcp:10.0.0.100:34567" )
pass; aPUB.setsockopt( zmq.CONFLATE, 1 )
pass; aSUB = aCTX.socket( zmq.SUB )
pass; aSUB.connect( "tcp://10.0.0.10:23456" )
pass; aSUB.connect( "tcp://10.0.0.20:23456" )
pass; aSUB.setsockopt( zmq.LINGER, 0 )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "B:" )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "C:" )
pass; print( "D: Started. Can Ctrl+C." )
while True:
try:
aPUB.send( "D: sending ...[{0:}]".format( time.ctime() )
if ( 0 != aSUB.poll( 250, zmq.POLLIN ) ):
print( "D:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )
except KeyboardInterrupt:
pass; print( "D: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()
节点-F:
import time, zmq; aCTX = zmq.Context(); aSUB = aCTX.socket( zmq.SUB )
pass; aSUB.connect( "tcp://10.0.0.100:34567" )
pass; aSUB.connect( "tcp://10.0.0.200:34567" )
pass; aSUB.setsockopt( zmq.LINGER, 0 )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "D:" )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "E:" )
pass; print( "F: Started. Can Ctrl+C." )
while True:
try:
print( "F:recv()'d: {0:} at {1:}".format( aSUB.recv(), time.ctime() )
except KeyboardInterrupt:
pass; print( "F: Ctrl+C'd. Will terminate" ); break
pass;
aSUB.close()
aCTX.term()
Where A理论上'acts'像一个服务器,其中D和E 既是订阅者又是发布者,F 是订阅者。
如果确实可行,将不胜感激。
如果不是,请提供替代方案。 Websockets 不是我的目标中的一个选项。
当然可以:
最佳
首先阅读 [
为简单起见,我们假设只有一个 tcp://
传输-class 和具有独立 IP 地址的节点(很容易转换为任何共同定位案例 and/or 不同的传输方式-class 混合)。
对于任何更大的卷 and/or 严格的延迟管理案例,性能调整都是必须的。
节点-A:
import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass; aPUB.bind( "tcp:10.0.0.1:12345" )
pass; aPUB.setsockopt( zmq.CONFLATE, 1 )
pass; print( "A: Started. Can Ctrl+C." )
while True:
try:
aPUB.send( "A: sending ...[{0:}]".format( time.ctime() ); time.sleep( 1 )
except KeyboardInterrupt:
pass; print( "A: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aCTX.term()
节点-B(,C):
import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass; aPUB.bind( "tcp:10.0.0.10:23456" )
pass; aPUB.setsockopt( zmq.CONFLATE, 1 )
pass; aSUB = aCTX.socket( zmq.SUB )
pass; aSUB.connect( "tcp://10.0.0.1:12345" )
pass; aSUB.setsockopt( zmq.LINGER, 0 )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "A:" )
pass; print( "B: Started. Can Ctrl+C." )
while True:
try:
aPUB.send( "B: sending ...[{0:}]".format( time.ctime() );
if ( 0 != aSUB.poll( 500, zmq.POLLIN ) ):
print( "B:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )
except KeyboardInterrupt:
pass; print( "B: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()
节点-D(,E):
import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass; aPUB.bind( "tcp:10.0.0.100:34567" )
pass; aPUB.setsockopt( zmq.CONFLATE, 1 )
pass; aSUB = aCTX.socket( zmq.SUB )
pass; aSUB.connect( "tcp://10.0.0.10:23456" )
pass; aSUB.connect( "tcp://10.0.0.20:23456" )
pass; aSUB.setsockopt( zmq.LINGER, 0 )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "B:" )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "C:" )
pass; print( "D: Started. Can Ctrl+C." )
while True:
try:
aPUB.send( "D: sending ...[{0:}]".format( time.ctime() )
if ( 0 != aSUB.poll( 250, zmq.POLLIN ) ):
print( "D:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )
except KeyboardInterrupt:
pass; print( "D: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()
节点-F:
import time, zmq; aCTX = zmq.Context(); aSUB = aCTX.socket( zmq.SUB )
pass; aSUB.connect( "tcp://10.0.0.100:34567" )
pass; aSUB.connect( "tcp://10.0.0.200:34567" )
pass; aSUB.setsockopt( zmq.LINGER, 0 )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "D:" )
pass; aSUB.setsockopt( zmq.SUBSCRIBE, "E:" )
pass; print( "F: Started. Can Ctrl+C." )
while True:
try:
print( "F:recv()'d: {0:} at {1:}".format( aSUB.recv(), time.ctime() )
except KeyboardInterrupt:
pass; print( "F: Ctrl+C'd. Will terminate" ); break
pass;
aSUB.close()
aCTX.term()