从多处理过程更新散景图
Update Bokeh figure from multiprocessing Process
我正在使用 Bokeh 服务器绘制来自传感器的在线数据。我写了一个 multiprocessing.Process
subclass 从传感器读取数据,然后通过管道将其馈送到另一个 subclass 应该用传入的数据更新 Bokeh 图。
如何从管道异步读取绘图 Process
并将其绘制到 Bokeh?
考虑 Grapher
class,假设另一个进程已经向 input_pipe
发送数据:
from multiprocessing import Process
from bokeh.client import push_session
from bokeh.models import ColumnDataSource
from bokeh.plotting import curdoc, figure
from functools import partial
class Grapher(Process):
def __init__(self, name, input_pipe):
super(Grapher, self).__init__(name=name, daemon=True)
self.input_pipe = input_pipe
self.doc = curdoc()
self.source = ColumnDataSource(dict(time=[], value=[])
self.fig = figure()
self.fig.line(source=self.source, x='time', y='value')
self.doc.add_root(self.fig)
self.session = push_session(self.doc) # To keep session updated.
def run(self):
while True:
time, value = self.input_pipe.recv()
self.doc.add_next_tick_callback(partial(self.update, time, value))
@gen.coroutine
def update(self, time, value):
self.source.stream(dict(time=[time], value=[value]))
此处的代码取自 Bokeh 文档中的 example。另外,我 运行 bokeh serve
以便 curdoc()
可以连接到某些东西。从 Bokeh 服务器日志中,我看到存在连接。
但是,问题似乎是 update
在 while
循环的下一个滴答后并没有真正执行。我可以通过在 update
中添加一条日志消息来检查,它永远不会打印出来。
如果我尝试将 add_next_tick_callback
更改为仅 update
调用,函数确实是 运行,但会话不会绘制任何内容。
什么会导致这里的问题?代码看起来合乎逻辑,我找不到使用此方法可能失败的文档。
谢谢。
在对应用程序使用 bokeh.client
方法时,如果您希望为事件提供服务,并在事件发生时调用回调,那么您必须在最后调用阻塞函数 session.loop_until_closed()
.这就是让监视事件和调用回调的东西保持不变的原因,实际上是为了监视事件和调用回调。
如果调用阻塞函数有问题,并且您不想使用 bokeh serve app.py
风格的应用程序,那么您可以尝试在另一个线程中调用 session.loop_until_closed()
(不是 100% 这会工作),或者启动你自己的 Tornado ioloop 并直接在其上搭载 Bokeh 服务器应用程序。该技术将在即将发布的 0.12.4
版本中得到更好的展示,但您现在可能会发现此笔记本是一个有用的参考:
https://gist.github.com/bryevdv/ff84871fcd843aceea4f0197be9c57e0
请注意,该笔记本是概念验证,您需要 0.12.4
开发版本才能运行,一些使用细节可能会发生变化。
我正在使用 Bokeh 服务器绘制来自传感器的在线数据。我写了一个 multiprocessing.Process
subclass 从传感器读取数据,然后通过管道将其馈送到另一个 subclass 应该用传入的数据更新 Bokeh 图。
如何从管道异步读取绘图 Process
并将其绘制到 Bokeh?
考虑 Grapher
class,假设另一个进程已经向 input_pipe
发送数据:
from multiprocessing import Process
from bokeh.client import push_session
from bokeh.models import ColumnDataSource
from bokeh.plotting import curdoc, figure
from functools import partial
class Grapher(Process):
def __init__(self, name, input_pipe):
super(Grapher, self).__init__(name=name, daemon=True)
self.input_pipe = input_pipe
self.doc = curdoc()
self.source = ColumnDataSource(dict(time=[], value=[])
self.fig = figure()
self.fig.line(source=self.source, x='time', y='value')
self.doc.add_root(self.fig)
self.session = push_session(self.doc) # To keep session updated.
def run(self):
while True:
time, value = self.input_pipe.recv()
self.doc.add_next_tick_callback(partial(self.update, time, value))
@gen.coroutine
def update(self, time, value):
self.source.stream(dict(time=[time], value=[value]))
此处的代码取自 Bokeh 文档中的 example。另外,我 运行 bokeh serve
以便 curdoc()
可以连接到某些东西。从 Bokeh 服务器日志中,我看到存在连接。
但是,问题似乎是 update
在 while
循环的下一个滴答后并没有真正执行。我可以通过在 update
中添加一条日志消息来检查,它永远不会打印出来。
如果我尝试将 add_next_tick_callback
更改为仅 update
调用,函数确实是 运行,但会话不会绘制任何内容。
什么会导致这里的问题?代码看起来合乎逻辑,我找不到使用此方法可能失败的文档。
谢谢。
在对应用程序使用 bokeh.client
方法时,如果您希望为事件提供服务,并在事件发生时调用回调,那么您必须在最后调用阻塞函数 session.loop_until_closed()
.这就是让监视事件和调用回调的东西保持不变的原因,实际上是为了监视事件和调用回调。
如果调用阻塞函数有问题,并且您不想使用 bokeh serve app.py
风格的应用程序,那么您可以尝试在另一个线程中调用 session.loop_until_closed()
(不是 100% 这会工作),或者启动你自己的 Tornado ioloop 并直接在其上搭载 Bokeh 服务器应用程序。该技术将在即将发布的 0.12.4
版本中得到更好的展示,但您现在可能会发现此笔记本是一个有用的参考:
https://gist.github.com/bryevdv/ff84871fcd843aceea4f0197be9c57e0
请注意,该笔记本是概念验证,您需要 0.12.4
开发版本才能运行,一些使用细节可能会发生变化。