为什么我在以下脚本中的回调函数永远不会被调用?
Why my callback function in the following script never get called?
在下面的脚本中,为什么我的回调函数从未被调用过?
我正在使用预先创建的内核来 运行 代码并尝试通过为各个套接字附加回调来获取它的输出。
from zmq.eventloop import ioloop
ioloop.install()
from zmq.eventloop.zmqstream import ZMQStream
from functools import partial
from tornado import gen
from tornado.concurrent import Future
from jupyter_client import BlockingKernelClient
from pprint import pprint
import logging, os, zmq
reply_futures = {}
context = zmq.Context()
publisher = context.socket(zmq.PUSH)
publisher.connect("tcp://127.0.0.1:5253")
def reply_callback(session, stream, msg_list):
idents, msg_parts = session.feed_identities(msg_list)
reply = session.deserialize(msg_parts)
parent_id = reply['parent_header'].get('msg_id')
reply_future = reply_futures.get(parent_id)
print("{} \n".format(reply))
if reply_future:
if "execute_reply" == reply["msg_type"]:
reply_future.set_result(reply)
publisher.send(reply)
def fv_execute():
code = 'print ("hello")'
msg_id = execute(code)
return msg_id
def get_connection_file(kernel_id):
json_file = 'kernel-{}.json'.format(kernel_id)
return os.path.join('/tmp',json_file)
def execute(code,):
kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
cf = get_connection_file(kernel_id)
kernel_client = BlockingKernelClient(connection_file=cf)
setup_listener(kernel_client)
msg_id = ioloop.IOLoop.current().run_sync(lambda: execute_(kernel_client,code))
return msg_id
def setup_listener(kernel_client):
shell_stream = ZMQStream(kernel_client.shell_channel.socket)
iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
@gen.coroutine
def execute_(kernel_client, code):
msg_id = kernel_client.execute(code)
f = reply_futures[msg_id] = Future()
print("Is kernel alive: {}".format(kernel_client.is_alive()))
print(msg_id)
yield f
raise gen.Return(msg_id)
if __name__ == '__main__':
fv_execute()
这里是输出,脚本运行s forever
jupyter@albus:~/lab$ python2 iolooptest2.py
Is kernel alive: True
de3eae2e-48d3-451a-b6bc-421674bb2a35
^X^CTraceback (most recent call last):
File "iolooptest2.py", line 61, in <module>
fv_execute()
File "iolooptest2.py", line 30, in fv_execute
msg_id = execute(code)
File "iolooptest2.py", line 42, in execute
msg_id = ioloop.IOLoop.current().run_sync(lambda: execute_(kernel_client,code))
File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 452, in run_sync
self.start()
File "/usr/local/lib/python2.7/dist- packages/zmq/eventloop/ioloop.py", line 177, in start
super(ZMQIOLoop, self).start()
File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 862, in start
event_pairs = self._impl.poll(poll_timeout)
File "/usr/local/lib/python2.7/dist- packages/zmq/eventloop/ioloop.py", line 122, in poll
z_events = self._poller.poll(1000*timeout)
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/poll.py", line 99, in poll
return zmq_poll(self.sockets, timeout=timeout)
File "zmq/backend/cython/_poll.pyx", line 116, in zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:2036)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_poll.c:2418)
KeyboardInterrupt
这里是代码的略微修改版本
https://gist.github.com/jayendra13/76a4f5726428882013ea62d94974da5c
我将 ioloop
作为参数传递给 zmqstream
,同时附加回调,它也具有相同的行为。
这是几乎相似的脚本
https://gist.github.com/jayendra13/e553fafba5398e287107e947c16988df
在创建 kernel_client 后添加以下两行解决了我的问题。
kernel_client.load_connection_file()
kernel_client.start_channels()
这么新 execute
看起来像这样
def execute(code,):
kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
cf = get_connection_file(kernel_id)
kernel_client = BlockingKernelClient(connection_file=cf)
kernel_client.load_connection_file()
kernel_client.start_channels()
setup_listener(kernel_client)
msg_id = ioloop.IOLoop.current().run_sync(lambda: execute_(kernel_client,code))
return msg_id
在下面的脚本中,为什么我的回调函数从未被调用过? 我正在使用预先创建的内核来 运行 代码并尝试通过为各个套接字附加回调来获取它的输出。
from zmq.eventloop import ioloop
ioloop.install()
from zmq.eventloop.zmqstream import ZMQStream
from functools import partial
from tornado import gen
from tornado.concurrent import Future
from jupyter_client import BlockingKernelClient
from pprint import pprint
import logging, os, zmq
reply_futures = {}
context = zmq.Context()
publisher = context.socket(zmq.PUSH)
publisher.connect("tcp://127.0.0.1:5253")
def reply_callback(session, stream, msg_list):
idents, msg_parts = session.feed_identities(msg_list)
reply = session.deserialize(msg_parts)
parent_id = reply['parent_header'].get('msg_id')
reply_future = reply_futures.get(parent_id)
print("{} \n".format(reply))
if reply_future:
if "execute_reply" == reply["msg_type"]:
reply_future.set_result(reply)
publisher.send(reply)
def fv_execute():
code = 'print ("hello")'
msg_id = execute(code)
return msg_id
def get_connection_file(kernel_id):
json_file = 'kernel-{}.json'.format(kernel_id)
return os.path.join('/tmp',json_file)
def execute(code,):
kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
cf = get_connection_file(kernel_id)
kernel_client = BlockingKernelClient(connection_file=cf)
setup_listener(kernel_client)
msg_id = ioloop.IOLoop.current().run_sync(lambda: execute_(kernel_client,code))
return msg_id
def setup_listener(kernel_client):
shell_stream = ZMQStream(kernel_client.shell_channel.socket)
iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
@gen.coroutine
def execute_(kernel_client, code):
msg_id = kernel_client.execute(code)
f = reply_futures[msg_id] = Future()
print("Is kernel alive: {}".format(kernel_client.is_alive()))
print(msg_id)
yield f
raise gen.Return(msg_id)
if __name__ == '__main__':
fv_execute()
这里是输出,脚本运行s forever
jupyter@albus:~/lab$ python2 iolooptest2.py
Is kernel alive: True
de3eae2e-48d3-451a-b6bc-421674bb2a35
^X^CTraceback (most recent call last):
File "iolooptest2.py", line 61, in <module>
fv_execute()
File "iolooptest2.py", line 30, in fv_execute
msg_id = execute(code)
File "iolooptest2.py", line 42, in execute
msg_id = ioloop.IOLoop.current().run_sync(lambda: execute_(kernel_client,code))
File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 452, in run_sync
self.start()
File "/usr/local/lib/python2.7/dist- packages/zmq/eventloop/ioloop.py", line 177, in start
super(ZMQIOLoop, self).start()
File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 862, in start
event_pairs = self._impl.poll(poll_timeout)
File "/usr/local/lib/python2.7/dist- packages/zmq/eventloop/ioloop.py", line 122, in poll
z_events = self._poller.poll(1000*timeout)
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/poll.py", line 99, in poll
return zmq_poll(self.sockets, timeout=timeout)
File "zmq/backend/cython/_poll.pyx", line 116, in zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:2036)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_poll.c:2418)
KeyboardInterrupt
这里是代码的略微修改版本
https://gist.github.com/jayendra13/76a4f5726428882013ea62d94974da5c
我将 ioloop
作为参数传递给 zmqstream
,同时附加回调,它也具有相同的行为。
这是几乎相似的脚本 https://gist.github.com/jayendra13/e553fafba5398e287107e947c16988df
在创建 kernel_client 后添加以下两行解决了我的问题。
kernel_client.load_connection_file()
kernel_client.start_channels()
这么新 execute
看起来像这样
def execute(code,):
kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
cf = get_connection_file(kernel_id)
kernel_client = BlockingKernelClient(connection_file=cf)
kernel_client.load_connection_file()
kernel_client.start_channels()
setup_listener(kernel_client)
msg_id = ioloop.IOLoop.current().run_sync(lambda: execute_(kernel_client,code))
return msg_id