Plotly/Dash 以流畅的动画显示实时数据

Plotly/Dash display real time data in smooth animation

我们正尝试在 plotly-dash 中生成一个实时仪表板,以显示生成的实时数据。我们通常遵循此处的指导 (https://dash.plotly.com/live-updates)。

我们有一个回调,大约每秒从源中收集一大块新数据点,然后将数据附加到图表中。

当我们这样做时,对图表的更新是不稳定的,因为我们每秒都会在回调中生成一个新的图表对象。我们希望图表流畅,即使这意味着我们比实时数据落后一两秒。

我们正在研究动画 (https://plotly.com/python/animations/),但尚不清楚我们如何将动画应用于附加到图表的实时数据流。

编辑:修订版 1

您会在我建议的末尾找到一个完全可重现的,尽管 最小 代码片段。但请注意,这是一个旨在在 JupyterDash 中启动的示例。


我只需要假设您在一个或多个 pandas 数据帧中收集数据流。为了模拟我所理解的 real-world 情况,我只需要依靠生成一些随机数据。 在对我的原始答案的以下修订中,我将争辩说,要使用 real-time 数据制作流畅的动画,您唯一需要的是

1. df.plot() pandas 绘图后端设置为 plotly,

2. 像这样的破折号组件:

   dcc.Interval(id='interval-component',
                interval=1*1000, # in milliseconds
                n_intervals=0
        )

3. 和这样的回调函数:

@app.callback(
    Output('graph', 'figure'),
    [Input('interval-component', "n_intervals")]
)

下面的代码片段包含的代码完全符合您在问题中描述的内容:

1. 它每秒在数据帧 df2 中收集一大块随机数据,

2. 将其添加到现有数据框 df1,并且

3. 绘制结果。

初始图是这样的:

几秒钟后,图形如下所示:

这听起来好得令人难以置信,但数字之间的过渡看起来非常棒,开箱即用。新点被优雅地添加到行的末尾,x- 和 y-axis 更新都非常顺利。

更新一开始可能看起来 断断续续,但运行几千次后,您只会看到行尾在移动:

在上图中可以看到运行几千次后起点就包含在内了。这可能是显而易见的,但是如果您想在例如 1000 次运行后保持恒定的 window 长度,只需将 df3 = df3.cumsum() 替换为 df3 = df3.cumsum().tail(1000) 即可得到:

但你不必相信我的话。只需在 JupyterLab 中启动以下代码片段并亲自查看:

import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output

# code and plot setup
# settings
pd.options.plotting.backend = "plotly"
countdown = 20
#global df

# sample dataframe of a wide format
np.random.seed(4); cols = list('abc')
X = np.random.randn(50,len(cols))  
df=pd.DataFrame(X, columns=cols)
df.iloc[0]=0;

# plotly figure
fig = df.plot(template = 'plotly_dark')

app = JupyterDash(__name__)
app.layout = html.Div([
    html.H1("Streaming of random data"),
            dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        ),
    dcc.Graph(id='graph'),
])

# Define callback to update graph
@app.callback(
    Output('graph', 'figure'),
    [Input('interval-component', "n_intervals")]
)
def streamFig(value):
    
    global df
    
    Y = np.random.randn(1,len(cols))  
    df2 = pd.DataFrame(Y, columns = cols)
    df = df.append(df2, ignore_index=True)#.reset_index()
    df.tail()
    df3=df.copy()
    df3 = df3.cumsum()
    fig = df3.plot(template = 'plotly_dark')
    #fig.show()
    return(fig)

app.run_server(mode='external', port = 8069, dev_tools_ui=True, #debug=True,
              dev_tools_hot_reload =True, threaded=True)

这个例子不是很优雅,还有很大的改进空间(即使是全局变量....),但我希望它对你有用。

编辑:修订版 2:

运行大约 6000 次后,图表将如下所示:

虽然现在运行非常很顺利,但现在看起来不再那么有趣了。每次更新只会揭示端点处的微小移动。所以我在最后添加了一些注释,以更清楚地表明事情实际上仍然是 运行:

带有注释的完整代码

import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output

# code and plot setup
# settings
pd.options.plotting.backend = "plotly"
countdown = 20
#global df

# sample dataframe of a wide format
np.random.seed(4); cols = list('abc')
X = np.random.randn(50,len(cols))  
df=pd.DataFrame(X, columns=cols)
df.iloc[0]=0;

# plotly figure
fig = df.plot(template = 'plotly_dark')

app = JupyterDash(__name__)
app.layout = html.Div([
    html.H1("Streaming of random data"),
            dcc.Interval(
            id='interval-component',
            interval=1*1000, # in milliseconds
            n_intervals=0
        ),
    dcc.Graph(id='graph'),
])

# Define callback to update graph
@app.callback(
    Output('graph', 'figure'),
    [Input('interval-component', "n_intervals")]
)
def streamFig(value):
    
    global df
    
    Y = np.random.randn(1,len(cols))  
    df2 = pd.DataFrame(Y, columns = cols)
    df = df.append(df2, ignore_index=True)#.reset_index()
    #df.tail()
    df3=df.copy()
    df3 = df3.cumsum()#.tail(1000)
    fig = df3.plot(template = 'plotly_dark')
    #fig.show()
    
    colors = px.colors.qualitative.Plotly
    for i, col in enumerate(df3.columns):
            fig.add_annotation(x=df3.index[-1], y=df3[col].iloc[-1],
                                   text = str(df3[col].iloc[-1])[:4],
                                   align="right",
                                   arrowcolor = 'rgba(0,0,0,0)',
                                   ax=25,
                                   ay=0,
                                   yanchor = 'middle',
                                   font = dict(color = colors[i]))
    
    return(fig)

app.run_server(mode='external', port = 8069, dev_tools_ui=True, #debug=True,
              dev_tools_hot_reload =True, threaded=True)

原回答和建议

你没有提供任何示例代码,所以我只能提供一个一般性的建议,那就是在 Dash 画廊的一个例子中仔细看看 streams forex data 的情节:

我特别想看看他们是如何设置回调和第 932 行中的函数 generate_figure_callback(pair) in the source:

# Function to update Graph Figure
def generate_figure_callback(pair):
    def chart_fig_callback(n_i, p, t, s, pairs, a, b, old_fig):

        if pairs is None:
            return {"layout": {}, "data": {}}

        pairs = pairs.split(",")
        if pair not in pairs:
            return {"layout": {}, "data": []}

        if old_fig is None or old_fig == {"layout": {}, "data": {}}:
            return get_fig(pair, a, b, t, s, p)

        fig = get_fig(pair, a, b, t, s, p)
        return fig

    return chart_fig_callback

这就是我目前所有的,但我希望你会发现它有用!

编辑:只是为了表明更新不限于 5 分钟。

屏幕截图在 21:16:29

屏幕截图在 21:16:55

您在 bid/ask 文本中看到的只是:买价和卖价。他们一直在变化。如果我 100% 正确,则这条线代表已完成的交易,而且这种情况偶尔会发生。所以我 认为 这只是你在这里显示的数据的问题。我希望要获得您想要的结果,您唯一需要做的就是用您的数据源替换此示例的中心部分。您还可以查看 Wind Streaming example。对于您的场景,这甚至可能更容易实现。

通过 extendData 属性 可以在不生成新图形对象的情况下更新 Graph 组件的轨迹。这是一个每秒附加数据的小例子,

import dash
import dash_html_components as html
import dash_core_components as dcc
import numpy as np

from dash.dependencies import Input, Output

# Example data (a circle).
resolution = 20
t = np.linspace(0, np.pi * 2, resolution)
x, y = np.cos(t), np.sin(t)
# Example app.
figure = dict(data=[{'x': [], 'y': []}], layout=dict(xaxis=dict(range=[-1, 1]), yaxis=dict(range=[-1, 1])))
app = dash.Dash(__name__, update_title=None)  # remove "Updating..." from title
app.layout = html.Div([dcc.Graph(id='graph', figure=figure), dcc.Interval(id="interval")])


@app.callback(Output('graph', 'extendData'), [Input('interval', 'n_intervals')])
def update_data(n_intervals):
    index = n_intervals % resolution
    # tuple is (dict of new data, target trace index, number of points to keep)
    return dict(x=[[x[index]]], y=[[y[index]]]), [0], 10


if __name__ == '__main__':
    app.run_server()

根据客户端和服务器之间的网络连接(在每次更新时,客户端和服务器之间交换请求),这种方法可以达到大约 1 秒的刷新率。

如果您需要更高的刷新率,我建议使用 client side callback 进行图形更新。采用前面的示例,代码将遵循

import dash
import dash_html_components as html
import dash_core_components as dcc
import numpy as np

from dash.dependencies import Input, Output, State

# Example data (a circle).
resolution = 1000
t = np.linspace(0, np.pi * 2, resolution)
x, y = np.cos(t), np.sin(t)
# Example app.
figure = dict(data=[{'x': [], 'y': []}], layout=dict(xaxis=dict(range=[-1, 1]), yaxis=dict(range=[-1, 1])))
app = dash.Dash(__name__, update_title=None)  # remove "Updating..." from title
app.layout = html.Div([
    dcc.Graph(id='graph', figure=dict(figure)), dcc.Interval(id="interval", interval=25),
    dcc.Store(id='offset', data=0), dcc.Store(id='store', data=dict(x=x, y=y, resolution=resolution)),
])
app.clientside_callback(
    """
    function (n_intervals, data, offset) {
        offset = offset % data.x.length;
        const end = Math.min((offset + 10), data.x.length);
        return [[{x: [data.x.slice(offset, end)], y: [data.y.slice(offset, end)]}, [0], 500], end]
    }
    """,
    [Output('graph', 'extendData'), Output('offset', 'data')],
    [Input('interval', 'n_intervals')], [State('store', 'data'), State('offset', 'data')]
)

if __name__ == '__main__':
    app.run_server()

客户端更新应该足够快以实现平滑更新。下面的 gif 显示了上面的示例 运行 25 毫秒刷新率,

请记住,只有当数据已经存在于客户端时,客户端更新才有可能,即需要另一种机制从服务器获取数据。可能的数据流可以是

  1. 使用较慢的 Interval 组件(例如 2 秒)触发(正常)回调,该回调从源中获取数据块并将其放入 Store 组件中
  2. 使用快速 Interval 组件(例如 25 毫秒)触发客户端回调,将数据从 Store 组件流式传输到 Graph 组件