如何将 RxPy 数据流发送到前端 javascript

How to send RxPy data stream to frontend javascript

我正在尝试将 python ReactiveX 流(使用 RxPy 库)发送到 Web UI 组件上的 javascript,但我似乎找不到这样做的一种方法。此外,我可能需要将进入 Javascript 的数据流放入某种 RxJS Observable 中以进行进一步处理。 你能帮我理解如何实现这一目标吗? 我仍然在掌握 ReactiveX,所以也许我遗漏了一些基本概念,但我正在努力在网上找到与此类似的东西。

这个问题出现了,因为我正在开发一个桌面应用程序,该应用程序从 csv 或 zeromq 端点获取数据,并将其流式传输到 UI 数据将被动态绘制(更新绘制新数据进来)。我正在使用 Electron 构建我的应用程序,使用 python 作为我的后端代码。 Python 是必须的,因为我将使用一些 TensorFlow 模型扩展应用程序。

按照 fyears really well made example 作为初始结构,我编写了一些示例代码来玩,但我似乎无法让它工作。 我设法从 UI 按钮一直到 python 脚本,但我卡在了 PricesApi.get_stream(...) 方法的 return 中。

index.html

前端笔直

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>Electron Application</title>  
    </head>
    <body>
        <button id="super-button">Trigger Python Code</button>
        <div id="py-output">
        </div>
    </body>
    <script src="renderer.js" ></script>
</html>

api.py:

ZeroRPC 服务器文件就像上面提到的 link.

import gevent
import json
import signal
import zerorpc
from core_operator import stream


class PricesApi(object):

    def get_stream(self, filename):
        return stream(filename)

    def stop(self):
        print('Stopping strategy.')

    def echo(self, text):
        """echo any text"""
        return text


def load_settings():
    with open('settings.json') as json_settings:
        settings_dictionary = json.load(json_settings)
    return settings_dictionary


def main():
    settings = load_settings()
    s = zerorpc.Server(PricesApi())
    s.bind(settings['address'])
    print(f"Initialising server on {settings['address']}")
    s.run()


if __name__ == '__main__':
    main()

core_operator.py

这是一个主要逻辑文件,用于从 zeroMQ 订阅获取价格,但目前只是从 csv 创建一个 Observable。

import sys
import rx
from csv import DictReader


def prepare_csv_timeseries_stream(filename):
    return rx.from_(DictReader(open(filename, 'r')))


def stream(filename):
    price_observable = prepare_csv_timeseries_stream(filename)
    return price_observable

rendered.js

最后,应该接收流的javascript:

const zerorpc = require('zerorpc');
const fs = require('fs')

const settings_block = JSON.parse(fs.readFileSync('./settings.json').toString());
let client = new zerorpc.Client();
client.connect(settings_block['address']);

let button = document.querySelector('#super-button');
let pyOutput = document.querySelector('#py-output');
let filename = '%path-to-file%'
button.addEventListener('click', () => {
    let line_to_write = '1'
    console.log('button click received.')
    client.invoke('get_stream', filename, (error, result) => {
        var messages = pyOutput;
        message = document.createElement('li'),
        content = document.createTextNode(error.data);
        message.appendChild(content);
        messages.appendChild(message);

        if(error) {
            console.error(error);
        } else {
           var messages = pyOutput;
           message = document.createElement('li'),
           content = document.createTextNode(result.data);
           message.appendChild(content);
           messages.appendChild(message);    
        }
    })
})

我一直在研究使用 WebSockets,但未能理解如何实现它。我确实找到了一些使用 Tornado 服务器的例子,但是我试图尽可能地保持它的纯净,而且,已经有一个来自 Electron 的 client/server 结构,我不能直接使用它,这感觉很奇怪。 此外,我正在尝试将整个系统维护为 PUSH 结构,因为数据要求不允许 PULL 类型的模式,定期轮询等。

非常感谢您抽出宝贵的时间,如果您需要任何进一步的详细信息或解释,请告诉我。

我使用 amazing library called Eel(描述为 "A little Python library for making simple Electron-like HTML/JS GUI apps")找到了解决方案。它的绝对简单性和直观性使我能够通过几行简单的代码实现我想要的。

  1. 按照介绍了解布局。
  2. 然后你的主 python 文件(我方便地将其命名为 main.py),你将流函数暴露给 eel,这样它就可以从 JS 文件中调用,并将流传输到 JavaScript "receive_price" 从JS文件中暴露出来的函数!
import sys
import rx
from csv import DictReader


def prepare_csv_timeseries_stream(filename):
    return rx.from_(DictReader(open(filename, 'r')))


def process_logic():
    return pipe(
        ops.map(lambda p: print(p)),  # just to view what's flowing through
        ops.map(lambda p: eel.receive_price(p)),  # KEY FUNCTION in JS file, exposed via eel, is called for each price. 
    )


@eel.expose  # Decorator so this function can get triggered from JavaScript
def stream(filename):
    price_observable = prepare_csv_timeseries_stream(filename)
    price_observable.pipe(process_logic()).subscribe()  # apply the pipe and subscribe to trigger stream

eel.init('web')
eel.start('main.html')  # look at how beautiful and elegant this is! 
  1. 现在我们创建 price_processing.js 文件(按照 Eel 说明放置在 'web' 文件夹中)以合并公开的函数
let button   = document.querySelector('#super-button');
let pyOutput = document.querySelector('#py-output'   );
let filename = '%path-to-file%'

console.log("ready to receive data!")

eel.expose(receive_price);  // Exposing the function to Python, to process each price
function receive_price(result) {
    var messages = pyOutput;
    message = document.createElement('li');
    content = document.createTextNode(result);
    message.appendChild(content);
    messages.appendChild(message);
    // in here you can add more functions to process data, e.g. logging, charting and so on..
};

button.addEventListener('click', () => {
    console.log('Button clicked magnificently! Bloody good job')
    eel.stream(filename); // calling the Python function exposed through Eel to start stream.
})
  1. HTML 几乎保持不变,除了更改脚本参考:/eel.js,根据 Eel 文档和我们的 price_processing.js 文件。
<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>Let's try Eel</title>
    </head>
    <body>
        <h1>Eel-saved-my-life: the App!</h1>
        <button id="super-button">Trigger Python Code</button>
        <div id="py-output">

        </div>
    </body>
    <script type="text/javascript" src="/eel.js"></script>
    <script type="text/javascript" src="price_processing.js"></script>
</html>

我希望这可以帮助任何遇到同样问题的人。