从多处理过程更新散景图

Update Bokeh figure from multiprocessing Process

我正在使用 Bokeh 服务器绘制来自传感器的在线数据。我写了一个 multiprocessing.Process subclass 从传感器读取数据,然后通过管道将其馈送到另一个 subclass 应该用传入的数据更新 Bokeh 图。

如何从管道异步读取绘图 Process 并将其绘制到 Bokeh?

考虑 Grapher class,假设另一个进程已经向 input_pipe 发送数据:

from multiprocessing import Process

from bokeh.client import push_session
from bokeh.models import ColumnDataSource
from bokeh.plotting import curdoc, figure

from functools import partial

class Grapher(Process):
  def __init__(self, name, input_pipe):
    super(Grapher, self).__init__(name=name, daemon=True)
    self.input_pipe = input_pipe

    self.doc = curdoc()
    self.source = ColumnDataSource(dict(time=[], value=[])
    self.fig = figure()
    self.fig.line(source=self.source, x='time', y='value')
    self.doc.add_root(self.fig)
    self.session = push_session(self.doc)  # To keep session updated.

  def run(self):
    while True:
      time, value = self.input_pipe.recv()
      self.doc.add_next_tick_callback(partial(self.update, time, value))

  @gen.coroutine
  def update(self, time, value):
    self.source.stream(dict(time=[time], value=[value]))

此处的代码取自 Bokeh 文档中的 example。另外,我 运行 bokeh serve 以便 curdoc() 可以连接到某些东西。从 Bokeh 服务器日志中,我看到存在连接。

但是,问题似乎是 updatewhile 循环的下一个滴答后并没有真正执行。我可以通过在 update 中添加一条日志消息来检查,它永远不会打印出来。

如果我尝试将 add_next_tick_callback 更改为仅 update 调用,函数确实是 运行,但会话不会绘制任何内容。

什么会导致这里的问题?代码看起来合乎逻辑,我找不到使用此方法可能失败的文档。

谢谢。

在对应用程序使用 bokeh.client 方法时,如果您希望为事件提供服务,并在事件发生时调用回调,那么您必须在最后调用阻塞函数 session.loop_until_closed() .这就是让监视事件和调用回调的东西保持不变的原因,实际上是为了监视事件和调用回调。

如果调用阻塞函数有问题,并且您不想使用 bokeh serve app.py 风格的应用程序,那么您可以尝试在另一个线程中调用 session.loop_until_closed()(不是 100% 这会工作),或者启动你自己的 Tornado ioloop 并直接在其上搭载 Bokeh 服务器应用程序。该技术将在即将发布的 0.12.4 版本中得到更好的展示,但您现在可能会发现此笔记本是一个有用的参考:

https://gist.github.com/bryevdv/ff84871fcd843aceea4f0197be9c57e0

请注意,该笔记本是概念验证,您需要 0.12.4 开发版本才能运行,一些使用细节可能会发生变化。