流式传输实时 Websocket 流数据并使用 class 内的破折号更新绘图

Streaming Live Websocket stream data and updating plotly graph with dash within a class

抱歉,如果这是一个愚蠢的问题,但我想知道是否可以创建一个破折号应用程序并将实时数据直接流式传输到图表中?

我一直在研究一个包含多个元素的交易机器人。我目前正在尝试从 websocket 流式传输每 2000 毫秒更新一次的数据,并将其直接输入图表中。

我对编程比较陌生,因此可以在必要时使用一些帮助和批评!

因此,由于此实时图表最终将作为 GUI 和机器人功能的一部分并入,我正在尝试使此 priceTicker class 包含初始化 dash 应用程序以及实时更新图表接收到的 websocket 流。

我能够毫无问题地流式传输数据,并且可以将数据解析为 json、csv 或任何真正的东西。我也可以在 dash 应用程序中初始化图表,但它不会用新数据更新图表。

我已经查看了一般区域的一些 SO 帖子,但不幸的是没有找到任何可以解释我的情况的内容 - 如果有任何我错过的帖子,我深表歉意!

下面是我的完整代码:

from binance.client import Client
from binance.websockets import BinanceSocketManager
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Output, Input
import plotly
import plotly.graph_objs as go
from collections import deque
import pandas as pd
import json

class priceTicker:

    def __init__(self, api_key, secret_key):
        self.app = dash.Dash(__name__)
        self.ticker_time = deque(maxlen=200)
        self.last_price = deque(maxlen=200)
        self.app.layout = html.Div(
            [
                dcc.Graph(id = 'live-graph', animate = True),
                dcc.Interval(
                    id = 'graph-update',
                    interval = 2000,
                    n_intervals = 0
                )
            ]
        )

        client = Client(api_key, secret_key)
        socket = BinanceSocketManager(client)
        socket.start_kline_socket('BTCUSDT', self.on_message, '1m')

        socket.start()
    def on_message(self, message):
        app = self.app
        price = {'time':message['E'], 'price':message['k']['c']}
        self.ticker_time.append(message['E'])
        self.last_price.append(message['k']['c'])
        ticker_time = self.ticker_time
        last_price = self.last_price
        #print(self.ticker_time, self.last_price)

        @app.callback(
            Output('live-graph', 'figure'),
            [Input('graph-update', 'n_intervals')])
        def update_graph(ticker_time,last_price):

            data = go.Scatter(
                x = list(ticker_time),
                y = list(last_price),
                name ='Scatter',
                mode = 'lines+markers'
            )


            return {'data': [data],
                    'layout': go.Layout(xaxis=dict(range=[min(ticker_time), max(ticker_time)]),
                    yaxis=dict(range=[min(last_price), max(last_price)]),)}

def Main():
    api_key = ''
    secret_key = ''
    ticker = priceTicker(api_key, secret_key)
    ticker.app.run_server(debug=True, host='127.0.0.1', port=16552)
if __name__ == '__main__':
    #app.run_server(debug=False, host='127.0.0.1', port=10010)
    Main()

我不确定哪里出错了,但好像没有调用回调函数。对不起,如果我错过了一些明显的东西!

我 运行 使用 3.7.6 和 macos Big Sur

如果有人有任何建议,将不胜感激。

干杯

对于任何有兴趣或试图对我做类似事情的人。

我设法通过在 init 中调用 app.callback 并将回调分配给 class:[=11= 中的函数来解决问题]

from binance.client import Client
from binance.websockets import BinanceSocketManager
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Output, Input
import plotly
import plotly.graph_objs as go
from collections import deque
import pandas as pd
import json

class priceTicker:
    def __init__(self, api_key, secret_key):
        self.ticker_time = deque(maxlen=200)
        self.last_price = deque(maxlen=200)
        client = Client(api_key, secret_key)
        socket = BinanceSocketManager(client)
        socket.start_kline_socket('BTCUSDT', self.on_message, '1m')

        socket.start()
        self.app = dash.Dash()
        self.app.layout = html.Div(
            [
                dcc.Graph(id = 'live-graph', animate = True),
                dcc.Interval(
                    id = 'graph-update',
                    interval = 2000,
                    n_intervals = 0
                )
            ]
        )
        self.app.callback(
            Output('live-graph', 'figure'),
            [Input('graph-update', 'n_intervals')])(self.update_graph)
        #app.run_server(debug=True, host='127.0.0.1', port=16452)
    def on_message(self, message):

        #price = {'time':message['E'], 'price':message['k']['c']}
        self.ticker_time.append(message['E'])
        self.last_price.append(message['k']['c'])
        #print(self.ticker_time, self.last_price)

    def update_graph(self, n):

        data = go.Scatter(
            x = list(self.ticker_time),
            y = list(self.last_price),
            name ='Scatter',
            mode = 'lines+markers'
        )


        return {'data': [data],
                'layout': go.Layout(xaxis=dict(range=[min(self.ticker_time), max(self.ticker_time)]),
                yaxis=dict(range=[min(self.last_price), max(self.last_price)]),)}

def Main():
    api_key = ''
    secret_key = ''
    ticker = priceTicker(api_key, secret_key)
    ticker.app.run_server(debug=True, host='127.0.0.1', port=16452)
if __name__ == '__main__':
    #app.run_server(debug=False, host='127.0.0.1', port=10010)
    Main()

输出符合预期,应用程序每 2 秒更新一次最新价格。

干杯