为什么我在以下脚本中的回调函数永远不会被调用?

Why my callback function in the following script never get called?

在下面的脚本中,为什么我的回调函数从未被调用过? 我正在使用预先创建的内核来 运行 代码并尝试通过为各个套接字附加回调来获取它的输出。

from zmq.eventloop import ioloop
ioloop.install()
from zmq.eventloop.zmqstream import ZMQStream
from functools import partial
from tornado import gen
from tornado.concurrent import Future
from jupyter_client import BlockingKernelClient
from pprint import pprint
import logging, os, zmq

reply_futures = {}

context = zmq.Context()
publisher = context.socket(zmq.PUSH)
publisher.connect("tcp://127.0.0.1:5253")

def reply_callback(session, stream, msg_list):
    idents, msg_parts = session.feed_identities(msg_list)
    reply = session.deserialize(msg_parts)
    parent_id = reply['parent_header'].get('msg_id')
    reply_future = reply_futures.get(parent_id)
    print("{} \n".format(reply))
    if reply_future:
        if "execute_reply" == reply["msg_type"]:
            reply_future.set_result(reply)
    publisher.send(reply)

def fv_execute():
    code = 'print ("hello")'
    msg_id = execute(code)
    return msg_id

def get_connection_file(kernel_id):
    json_file = 'kernel-{}.json'.format(kernel_id)
    return os.path.join('/tmp',json_file)

def execute(code,):
    kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
    cf = get_connection_file(kernel_id)
    kernel_client = BlockingKernelClient(connection_file=cf)
    setup_listener(kernel_client)
    msg_id = ioloop.IOLoop.current().run_sync(lambda:         execute_(kernel_client,code))
    return msg_id

def setup_listener(kernel_client):
    shell_stream = ZMQStream(kernel_client.shell_channel.socket)
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
    shell_stream.on_recv_stream(partial(reply_callback,     kernel_client.session))
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))

@gen.coroutine
def execute_(kernel_client, code):
    msg_id = kernel_client.execute(code)
    f = reply_futures[msg_id] = Future()
    print("Is kernel alive: {}".format(kernel_client.is_alive()))
    print(msg_id)
    yield f
    raise gen.Return(msg_id)

if __name__ == '__main__':
    fv_execute()

这里是输出,脚本运行s forever

jupyter@albus:~/lab$ python2 iolooptest2.py
Is kernel alive: True
de3eae2e-48d3-451a-b6bc-421674bb2a35
^X^CTraceback (most recent call last):
  File "iolooptest2.py", line 61, in <module>
    fv_execute()
  File "iolooptest2.py", line 30, in fv_execute
    msg_id = execute(code)
  File "iolooptest2.py", line 42, in execute
    msg_id = ioloop.IOLoop.current().run_sync(lambda:     execute_(kernel_client,code))
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py",  line 452, in run_sync
    self.start()
  File "/usr/local/lib/python2.7/dist-     packages/zmq/eventloop/ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
 File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line  862, in start
    event_pairs = self._impl.poll(poll_timeout)
  File "/usr/local/lib/python2.7/dist-   packages/zmq/eventloop/ioloop.py", line 122, in poll
    z_events = self._poller.poll(1000*timeout)
  File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/poll.py", line 99, in poll
    return zmq_poll(self.sockets, timeout=timeout)
  File "zmq/backend/cython/_poll.pyx", line 116, in  zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:2036)
  File "zmq/backend/cython/checkrc.pxd", line 12, in  zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_poll.c:2418)
 KeyboardInterrupt

这里是代码的略微修改版本 https://gist.github.com/jayendra13/76a4f5726428882013ea62d94974da5c 我将 ioloop 作为参数传递给 zmqstream,同时附加回调,它也具有相同的行为。

这是几乎相似的脚本 https://gist.github.com/jayendra13/e553fafba5398e287107e947c16988df

在创建 kernel_client 后添加以下两行解决了我的问题。

    kernel_client.load_connection_file()
    kernel_client.start_channels()

这么新 execute 看起来像这样

def execute(code,):
    kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
    cf = get_connection_file(kernel_id)
    kernel_client = BlockingKernelClient(connection_file=cf)
    kernel_client.load_connection_file()
    kernel_client.start_channels()
    setup_listener(kernel_client)
    msg_id = ioloop.IOLoop.current().run_sync(lambda:         execute_(kernel_client,code))
    return msg_id