使用 0mq (ZeroMQ) 同步两个简单的 python3 脚本时出现死锁
Deadlock when synchronizing two simple python3 scripts using 0mq (ZeroMQ)
当我尝试使用 0mq (ZeroMQ
) 同步两个 python3 脚本时,我遇到了这个奇怪的死锁。脚本 运行 可以进行数千次迭代,但迟早它们都会停止并等待彼此。我在 Windows 7.
上 运行ning 来自不同 CMD-Windows 的两个脚本
我想不通
为什么会出现这样的死锁.
这里会出什么问题?
脚本 A:
while (1):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10001')
msg = socket.recv() # Waiting for script B to send done
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10002')
socket.send_string("done") # Tell script B we are done
脚本 B
while (1):
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10001')
socket.send_string("done") # Tell script A we are done
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10002')
msg = socket.recv() # Waiting for script A to send done
您应该只处理一次 context
和 socket
创建,而不是每次迭代。
此外,您应该重用上下文(除非您要从另一个人那里使用它
在你的代码中线程)。
try:
context = zmq.Context()
rep_sck = context.socket(zmq.REP)
rep_sck.bind('tcp://127.0.0.1:10001')
rq_sck = context.socket(zmq.REQ)
rq_sck.connect('tcp://127.0.0.1:10002')
while (1):
msg = rep_sck.recv() # Waiting for script B to send done
do something useful (takes only a few millisecs)
rq_sck.send_string("done") # Tell script B we are done
finally:
rep_sck.close()
rq_sck.close()
第二个脚本也是如此。
try:
context = zmq.Context()
rq_sck = context.socket(zmq.REQ)
rq_sck.connect('tcp://127.0.0.1:10001')
rep_sck = context.socket(zmq.REP)
rep_sck.bind('tcp://127.0.0.1:10002')
while (1):
do something useful (takes only a few millisecs)
rq_sck.send_string("done") # Tell script A we are done
msg = rep_sck.recv() # Waiting for script A to send done
finally:
rq_sck.close()
rep_sck.close()
编辑:更新代码以调用 Socket.close()
从 pyzmq 版本 14.3.0 开始,Socket.close()
和 Context.term()
不会自动调用
在垃圾收集期间,添加了正确关闭套接字。
这不是死锁案例
当然,代码仍然需要注意。
消歧义:您的场景不会进入资源互锁状态,也就是 DeadLock。是的,当然,您的代码崩溃了,但很可能不是由于 REQ/REP
死锁(它可能并且确实出现在有损网络 tcp:
transport-class 上)。 posted 代码崩溃是由于 非托管资源处理,而不是由于达到死锁/活锁的相互阻塞状态。
如何解决?
首先,让我们假设您的超低延迟驱动系统不允许重复实例化任何东西。这也有例外,但让我们成为专业人士吧。
将您的.Context()
资源设置(或从外部调用继承)移出循环
审查,是否需要以及您的延迟限制允许您在每个循环中两次设置/拆除 .socket()
资源-运行.
决定,一旦第一条消息在传输路径
中丢失,您是否可以忍受真正的REQ/REP
死锁
enforce 优雅的资源使用终止(.socket()
-s, O/S port#
s, .Context()
-s)。不要让他们永远悬而未决,同时创造无限数量的其他人,这会破坏任何 "fault-resilient" 系统。资源永远是无限的。
design 信号和传输行为均以非阻塞方式进行。这允许您检测和处理远程进程超时并引入本地补救/响应操作的机会。
重新设计 代码到您需要的安全代码级别(下面的示例在软实时控制的无限循环 24/7 中工作了几年/365 在带有远程键盘和一些其他本地和远程诊断工具的分布式处理框架中。
生产级代码缺少什么?
您的代码必须 "envisage" 在您的分布式系统的任何部分可能出了什么问题。是的,这很难,但却是必要的。你的远程节点——一个通信对方——停止响应、丢失消息、重新启动、由于 O/S 崩溃而停止,任何可以想象的(加上一些相当令人讨厌的惊讶,你会发现只有在运行中.. .)。小post又是一个潘多拉魔盒,不代表没有必要。是你的救命马甲
尽可能以非阻塞方式进行设计,这样您就可以控制事件...
无论如何,总是释放系统资源和.term()
所有ZeroMQ .Context()
实例以优雅的方式——"tidy up"是一种公平的做法-- 无论是在现实生活中还是在代码帝国中。
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
#NONSTOP RESPONDER RAW EXAMPLE:
def aMiniRESPONDER( aTarget2Bind2_URL = "tcp://A.B.C.D:8889",
anExternalPREDICTOR = None,
anExternallyManagedZmqCONTEXT = None,
aSpreadMinSafetyMUL = 3.0,
aSilentMODE = True
):
try: # RESOURCES LAYER
# ... SETUP
# ------------------------------------------------- .Context()
# can setup a locally-managed context or re-use
# anExternallyManagedZmqCONTEXT obtained upon a func Call
aZmqCONTEXT = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )
# localhost:8887 [REP] ... remote [REQ] peer .connect() + .send()
aCtrlPORT_URL = "tcp://*:8887"
# localhost:8890 [PUB] ... remote [SUB] peers .connect() +
# .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
aSIGsPORT_URL = "tcp://*:8890"
aXmitPORT_URL = aTarget2Bind2_URL
aListOfSOCKETs = []
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: XmitPORT
aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )
# XmitPORT
aXmitSOCKET.bind( aXmitPORT_URL )
aListOfSOCKETs.append( aXmitSOCKET )
except:
# EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: CtrlPORT
# CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )
# CtrlPORT <-REQ/REP means a remote peer [REQ] has to
# .send()+.recv() before sending another CtrlCMD
aCtrlSOCKET.bind( aCtrlPORT_URL )
aListOfSOCKETs.append( aCtrlSOCKET )
except:
# EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: SIGsPORT
# SIGsPORT [PUB] .send()s--> [SUB]s
aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB )
# SIGsPORT --> PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
aSIGsSOCKET.bind( aSIGsPORT_URL )
aListOfSOCKETs.append( aSIGsSOCKET )
except:
# EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT and SIGsPORT
msg = "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# ... SETUP YOUR APPLICATION CODE
try: # APP LAYER ___________________________________________
# what you want to do
# here you go ...
except: # APP LAYER ___________________________________________
# handle EXCs
finally: # APP LAYER ___________________________________________
# your own application post-mortem / pre-exit code
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
except: # RESOURCES LAYER .............................................
# ... code shall handle it's own exceptions + externally caused events
finally: # RESOURCES LAYER .............................................
# ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )
[ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
[ allSOCKETs.close( ) for allSOCKETs in aListOfSOCKETs ]
# --------------------------------------------------------------#
# RESOURCES dismantled, may .term()
# .TERM(), NOP otherwise
if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ): #
aZmqCONTEXT.term() #
return
当我尝试使用 0mq (ZeroMQ
) 同步两个 python3 脚本时,我遇到了这个奇怪的死锁。脚本 运行 可以进行数千次迭代,但迟早它们都会停止并等待彼此。我在 Windows 7.
我想不通
为什么会出现这样的死锁.
这里会出什么问题?
脚本 A:
while (1):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10001')
msg = socket.recv() # Waiting for script B to send done
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10002')
socket.send_string("done") # Tell script B we are done
脚本 B
while (1):
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10001')
socket.send_string("done") # Tell script A we are done
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10002')
msg = socket.recv() # Waiting for script A to send done
您应该只处理一次 context
和 socket
创建,而不是每次迭代。
此外,您应该重用上下文(除非您要从另一个人那里使用它 在你的代码中线程)。
try:
context = zmq.Context()
rep_sck = context.socket(zmq.REP)
rep_sck.bind('tcp://127.0.0.1:10001')
rq_sck = context.socket(zmq.REQ)
rq_sck.connect('tcp://127.0.0.1:10002')
while (1):
msg = rep_sck.recv() # Waiting for script B to send done
do something useful (takes only a few millisecs)
rq_sck.send_string("done") # Tell script B we are done
finally:
rep_sck.close()
rq_sck.close()
第二个脚本也是如此。
try:
context = zmq.Context()
rq_sck = context.socket(zmq.REQ)
rq_sck.connect('tcp://127.0.0.1:10001')
rep_sck = context.socket(zmq.REP)
rep_sck.bind('tcp://127.0.0.1:10002')
while (1):
do something useful (takes only a few millisecs)
rq_sck.send_string("done") # Tell script A we are done
msg = rep_sck.recv() # Waiting for script A to send done
finally:
rq_sck.close()
rep_sck.close()
编辑:更新代码以调用 Socket.close()
从 pyzmq 版本 14.3.0 开始,Socket.close()
和 Context.term()
不会自动调用
在垃圾收集期间,添加了正确关闭套接字。
这不是死锁案例
当然,代码仍然需要注意。
消歧义:您的场景不会进入资源互锁状态,也就是 DeadLock。是的,当然,您的代码崩溃了,但很可能不是由于 REQ/REP
死锁(它可能并且确实出现在有损网络 tcp:
transport-class 上)。 posted 代码崩溃是由于 非托管资源处理,而不是由于达到死锁/活锁的相互阻塞状态。
如何解决?
首先,让我们假设您的超低延迟驱动系统不允许重复实例化任何东西。这也有例外,但让我们成为专业人士吧。
将您的
.Context()
资源设置(或从外部调用继承)移出循环审查,是否需要以及您的延迟限制允许您在每个循环中两次设置/拆除
.socket()
资源-运行.决定,一旦第一条消息在传输路径
中丢失,您是否可以忍受真正的REQ/REP
死锁enforce 优雅的资源使用终止(
.socket()
-s, O/Sport#
s,.Context()
-s)。不要让他们永远悬而未决,同时创造无限数量的其他人,这会破坏任何 "fault-resilient" 系统。资源永远是无限的。design 信号和传输行为均以非阻塞方式进行。这允许您检测和处理远程进程超时并引入本地补救/响应操作的机会。
重新设计 代码到您需要的安全代码级别(下面的示例在软实时控制的无限循环 24/7 中工作了几年/365 在带有远程键盘和一些其他本地和远程诊断工具的分布式处理框架中。
生产级代码缺少什么?
您的代码必须 "envisage" 在您的分布式系统的任何部分可能出了什么问题。是的,这很难,但却是必要的。你的远程节点——一个通信对方——停止响应、丢失消息、重新启动、由于 O/S 崩溃而停止,任何可以想象的(加上一些相当令人讨厌的惊讶,你会发现只有在运行中.. .)。小post又是一个潘多拉魔盒,不代表没有必要。是你的救命马甲
尽可能以非阻塞方式进行设计,这样您就可以控制事件...
无论如何,总是释放系统资源和.term()
所有ZeroMQ .Context()
实例以优雅的方式——"tidy up"是一种公平的做法-- 无论是在现实生活中还是在代码帝国中。
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
#NONSTOP RESPONDER RAW EXAMPLE:
def aMiniRESPONDER( aTarget2Bind2_URL = "tcp://A.B.C.D:8889",
anExternalPREDICTOR = None,
anExternallyManagedZmqCONTEXT = None,
aSpreadMinSafetyMUL = 3.0,
aSilentMODE = True
):
try: # RESOURCES LAYER
# ... SETUP
# ------------------------------------------------- .Context()
# can setup a locally-managed context or re-use
# anExternallyManagedZmqCONTEXT obtained upon a func Call
aZmqCONTEXT = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )
# localhost:8887 [REP] ... remote [REQ] peer .connect() + .send()
aCtrlPORT_URL = "tcp://*:8887"
# localhost:8890 [PUB] ... remote [SUB] peers .connect() +
# .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
aSIGsPORT_URL = "tcp://*:8890"
aXmitPORT_URL = aTarget2Bind2_URL
aListOfSOCKETs = []
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: XmitPORT
aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )
# XmitPORT
aXmitSOCKET.bind( aXmitPORT_URL )
aListOfSOCKETs.append( aXmitSOCKET )
except:
# EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: CtrlPORT
# CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )
# CtrlPORT <-REQ/REP means a remote peer [REQ] has to
# .send()+.recv() before sending another CtrlCMD
aCtrlSOCKET.bind( aCtrlPORT_URL )
aListOfSOCKETs.append( aCtrlSOCKET )
except:
# EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: SIGsPORT
# SIGsPORT [PUB] .send()s--> [SUB]s
aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB )
# SIGsPORT --> PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
aSIGsSOCKET.bind( aSIGsPORT_URL )
aListOfSOCKETs.append( aSIGsSOCKET )
except:
# EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT and SIGsPORT
msg = "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# ... SETUP YOUR APPLICATION CODE
try: # APP LAYER ___________________________________________
# what you want to do
# here you go ...
except: # APP LAYER ___________________________________________
# handle EXCs
finally: # APP LAYER ___________________________________________
# your own application post-mortem / pre-exit code
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
except: # RESOURCES LAYER .............................................
# ... code shall handle it's own exceptions + externally caused events
finally: # RESOURCES LAYER .............................................
# ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )
[ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
[ allSOCKETs.close( ) for allSOCKETs in aListOfSOCKETs ]
# --------------------------------------------------------------#
# RESOURCES dismantled, may .term()
# .TERM(), NOP otherwise
if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ): #
aZmqCONTEXT.term() #
return