pyzmq: socket breaks with "ZMQError: Operation cannot be accomplished in current state"
pyzmq: socket breaks with "ZMQError: Operation cannot be accomplished in current state"
我是 ZeroMQ 的新手,正在尝试构建一个非常基本的消息传递系统。该代码很大程度上基于 here 中的示例,但有一些曲折
出于某种原因,在最后一条消息到达 frontend
套接字(hbm
,tx
)之后,代码抛出错误,我不确定它的来源
抱歉这么久code/output我觉得强调问题所在并帮助调试很重要
下面是我的代码+输出:
s_frontend = "ipc://frontend.ipc"
s_backend = "ipc://backend.ipc"
import time
def hbm():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"hbm".encode("ascii")
socket.connect(s_frontend)
# Tell broker we're ready for work
socket.send(b"READY")
while True:
msgs = socket.recv_multipart()
print("hbm got something", msgs)
if "BE READY" in msgs:
print("hbm:BE is ready for some work")
time.sleep(3)
def tx():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"txtx".encode("ascii")
socket.connect(s_frontend)
# Tell broker we're ready for work
socket.send(b"READY")
while True:
msgs = socket.recv_multipart()
print("tx got something ", msgs)
if "BE READY" in msgs:
print("tx:BE is ready for some work")
time.sleep(3)
def dev():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"dev".encode("ascii")
socket.connect(s_backend)
# Tell broker we're ready for work
socket.send(b"READY")
def main():
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind( s_frontend ) # ( "ipc://frontend.ipc" )
backend = context.socket(zmq.ROUTER)
backend.bind( s_backend ) # ( "ipc://backend.ipc" )
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()
start(hbm)
start(tx)
time.sleep(1)
start(dev)
clients = []
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
#poller.register(frontend, zmq.POLLIN)
all_is_ready = False
while True:
sockets = dict(poller.poll(timeout=1))
#print(sockets)
soc = None
if backend in sockets:
print("got something from backend")
msg = backend.recv_multipart()
print(msg)
print("adding frontend to poller")
poller.register(frontend, zmq.POLLIN)
print("backend is ready, notify frontend")
elif frontend in sockets:
print("got something from frontend")
msg = frontend.recv_multipart()
print(msg)
clients.append(bytes(msg[0]))
elif len(clients) == 2 and all_is_ready is False:
all_is_ready = True
for c in clients:
print("sending response to", c)
time.sleep(0.1) # just to prevent print overlap
frontend.send_multipart([c, b"", b"BE READY"])
else:
print("so much work, no rest, sleeping for 3")
time.sleep(3)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
main()
运行 此代码产生以下输出:
so much work, no rest, sleeping for 3
got something from backend
['dev', '', 'READY']
adding frontend to poller
backend is ready, notify frontend
got something from frontend
['hbm', '', 'READY']
got something from frontend
['txtx', '', 'READY']
sending response to hbm
sending response to txtx
hbm got something ['BE READY']
hbm:BE is ready for some work
tx got something ['BE READY']
tx:BE is ready for some work
so much work, no rest, sleeping for 3
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 104, in hbm
msgs = socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
raise ZMQError(errno)
ZMQError: Operation cannot be accomplished in current state
so much work, no rest, sleeping for 3
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 130, in tx
msgs = socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
raise ZMQError(errno)
ZMQError: Operation cannot be accomplished in current state
so much work, no rest, sleeping for 3
Traceback (most recent call last):
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 244, in <module>
main()
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 233, in main
time.sleep(3)
KeyboardInterrupt
进程已完成,退出代码为 1
Q : "... throws error ... I'm not sure of it's origin "
ZMQError: Operation cannot be accomplished in current state
起源是在每个用例中,使用 REQ
-Archetype,代码不满足本机 API 条件:
...
socket = zmq.Context().socket( zmq.REQ ) #--------------------- REQ socket Archetype
socket.identity = u"hbm".encode( "ascii" )
socket.connect( s_frontend )
socket.send( b"READY" ) #-------------------------------------- REQ.send()-s
while True: # ..................................... REQ next can .recv()
# socket-FSA( of a type of REQ ) can execute a .recv() iff.send() preceded.....?
msgs = socket.recv_multipart() #- - - - - - - - - - - - - - REQ.recv_multipart()-s
#.......................................................... REQ next can .send()
continue #-?-?-?-?-?-?: DID IT TRY TO REQ.send()? NO, NEVER...!
Do [you] mean by that that REQ must do iterative recv send and cannot do any other way ? (send recv recv) – LordTitiKaka
REQ
-Archetype 有一个硬性的期望,即必须保持强制排序:
.send() - .recv() - .send() - .recv() - .send() - .recv() - ... 无穷无尽
任何违反此 flip/flop-changing 其内部有限状态自动机 (FSA) 内部状态的行为都会导致上述 ZMQError: Operation cannot be accomplished in current state
我是 ZeroMQ 的新手,正在尝试构建一个非常基本的消息传递系统。该代码很大程度上基于 here 中的示例,但有一些曲折
出于某种原因,在最后一条消息到达 frontend
套接字(hbm
,tx
)之后,代码抛出错误,我不确定它的来源
抱歉这么久code/output我觉得强调问题所在并帮助调试很重要
下面是我的代码+输出:
s_frontend = "ipc://frontend.ipc"
s_backend = "ipc://backend.ipc"
import time
def hbm():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"hbm".encode("ascii")
socket.connect(s_frontend)
# Tell broker we're ready for work
socket.send(b"READY")
while True:
msgs = socket.recv_multipart()
print("hbm got something", msgs)
if "BE READY" in msgs:
print("hbm:BE is ready for some work")
time.sleep(3)
def tx():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"txtx".encode("ascii")
socket.connect(s_frontend)
# Tell broker we're ready for work
socket.send(b"READY")
while True:
msgs = socket.recv_multipart()
print("tx got something ", msgs)
if "BE READY" in msgs:
print("tx:BE is ready for some work")
time.sleep(3)
def dev():
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"dev".encode("ascii")
socket.connect(s_backend)
# Tell broker we're ready for work
socket.send(b"READY")
def main():
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind( s_frontend ) # ( "ipc://frontend.ipc" )
backend = context.socket(zmq.ROUTER)
backend.bind( s_backend ) # ( "ipc://backend.ipc" )
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()
start(hbm)
start(tx)
time.sleep(1)
start(dev)
clients = []
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
#poller.register(frontend, zmq.POLLIN)
all_is_ready = False
while True:
sockets = dict(poller.poll(timeout=1))
#print(sockets)
soc = None
if backend in sockets:
print("got something from backend")
msg = backend.recv_multipart()
print(msg)
print("adding frontend to poller")
poller.register(frontend, zmq.POLLIN)
print("backend is ready, notify frontend")
elif frontend in sockets:
print("got something from frontend")
msg = frontend.recv_multipart()
print(msg)
clients.append(bytes(msg[0]))
elif len(clients) == 2 and all_is_ready is False:
all_is_ready = True
for c in clients:
print("sending response to", c)
time.sleep(0.1) # just to prevent print overlap
frontend.send_multipart([c, b"", b"BE READY"])
else:
print("so much work, no rest, sleeping for 3")
time.sleep(3)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
main()
运行 此代码产生以下输出:
so much work, no rest, sleeping for 3
got something from backend
['dev', '', 'READY']
adding frontend to poller
backend is ready, notify frontend
got something from frontend
['hbm', '', 'READY']
got something from frontend
['txtx', '', 'READY']
sending response to hbm
sending response to txtx
hbm got something ['BE READY']
hbm:BE is ready for some work
tx got something ['BE READY']
tx:BE is ready for some work
so much work, no rest, sleeping for 3
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 104, in hbm
msgs = socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
raise ZMQError(errno)
ZMQError: Operation cannot be accomplished in current state
so much work, no rest, sleeping for 3
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 130, in tx
msgs = socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv
File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
raise ZMQError(errno)
ZMQError: Operation cannot be accomplished in current state
so much work, no rest, sleeping for 3
Traceback (most recent call last):
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 244, in <module>
main()
File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 233, in main
time.sleep(3)
KeyboardInterrupt
进程已完成,退出代码为 1
Q : "... throws error ... I'm not sure of it's origin "
ZMQError: Operation cannot be accomplished in current state
起源是在每个用例中,使用 REQ
-Archetype,代码不满足本机 API 条件:
...
socket = zmq.Context().socket( zmq.REQ ) #--------------------- REQ socket Archetype
socket.identity = u"hbm".encode( "ascii" )
socket.connect( s_frontend )
socket.send( b"READY" ) #-------------------------------------- REQ.send()-s
while True: # ..................................... REQ next can .recv()
# socket-FSA( of a type of REQ ) can execute a .recv() iff.send() preceded.....?
msgs = socket.recv_multipart() #- - - - - - - - - - - - - - REQ.recv_multipart()-s
#.......................................................... REQ next can .send()
continue #-?-?-?-?-?-?: DID IT TRY TO REQ.send()? NO, NEVER...!
Do [you] mean by that that REQ must do iterative recv send and cannot do any other way ? (send recv recv) – LordTitiKaka
REQ
-Archetype 有一个硬性的期望,即必须保持强制排序:
.send() - .recv() - .send() - .recv() - .send() - .recv() - ... 无穷无尽
任何违反此 flip/flop-changing 其内部有限状态自动机 (FSA) 内部状态的行为都会导致上述 ZMQError: Operation cannot be accomplished in current state