使用 SSE 实时传递来自其他 API 的数据
Using SSE to pass on data from other APIs in real time
我想做一个实时可视化,其中我有一个 Flask 应用程序将 SSE 发送到 HTML,然后它正在做它的事情。可视化效果非常好。我对 SSE 感到困惑。有一个系统可以在“/data”处对我的应用程序执行 ping 操作以移交数据。然后我想将其传输到事件流中的 vis。
import time, json
from flask import Flask, request, Response, render_template
app =Flask(__name__)
def data_stream(data):
if data:
yield 'data: {}\n\n'.format(json.dumps(data))
else:
yield 'data: {}\n\n'.format(json.dumps({'data': [{'lat':0, 'lg':0}]}))
@app.route('/data', methods =['GET','POST'])
def collect_data():
data = {"data": [request.get_json()]}
data_stream(data)
return 'asd'
@app.route('/my_event_source', methods =['GET', 'POST'])
def sse_request():
return Response(data_stream(None), mimetype='text/event-stream')
@app.route('/')
def page():
return render_template('map_vis.html')
if __name__ == '__main__':
data2 = []
app.debug =True
app.run('0.0.0.0', 8081)
我无法找到将 data_stream 函数传递给 Response 的方法,因为截至目前它只是简单地调用 data_stream(None) 即我得到 {'lat':0, 'lg':0}
作为回应。
您似乎在使用长轮询或其他 AJAX 衍生技术之类的 SSE; SSE 作为一个流更有用,所以一旦建立 SSE 连接你需要保持数据发送功能 'alive' 并使
它会在您需要时发送数据。
所以,我会这样做:
import Queue
data_queue = Queue.Queue()
def data_stream():
while True:
data = data_queue.get()
yield 'data: {}\n\n'.format(json.dumps(data))
通过SSE发送数据时,我只是将数据放入队列:
def sse_request():
r = Response(data_stream(), mimetype='text/event-stream')
data_queue.put({'data': [{'lat':0, 'lg':0}]})
return r
def collect_data():
data_queue.put({"data": [request.get_json()]})
...
唯一就是这样写会导致while
循环阻止服务器。所以你需要使用一些额外的东西,比如
gevent or Eventlet。
有很多例子说明如何将 Flask 与这些结合起来。
我想做一个实时可视化,其中我有一个 Flask 应用程序将 SSE 发送到 HTML,然后它正在做它的事情。可视化效果非常好。我对 SSE 感到困惑。有一个系统可以在“/data”处对我的应用程序执行 ping 操作以移交数据。然后我想将其传输到事件流中的 vis。
import time, json
from flask import Flask, request, Response, render_template
app =Flask(__name__)
def data_stream(data):
if data:
yield 'data: {}\n\n'.format(json.dumps(data))
else:
yield 'data: {}\n\n'.format(json.dumps({'data': [{'lat':0, 'lg':0}]}))
@app.route('/data', methods =['GET','POST'])
def collect_data():
data = {"data": [request.get_json()]}
data_stream(data)
return 'asd'
@app.route('/my_event_source', methods =['GET', 'POST'])
def sse_request():
return Response(data_stream(None), mimetype='text/event-stream')
@app.route('/')
def page():
return render_template('map_vis.html')
if __name__ == '__main__':
data2 = []
app.debug =True
app.run('0.0.0.0', 8081)
我无法找到将 data_stream 函数传递给 Response 的方法,因为截至目前它只是简单地调用 data_stream(None) 即我得到 {'lat':0, 'lg':0}
作为回应。
您似乎在使用长轮询或其他 AJAX 衍生技术之类的 SSE; SSE 作为一个流更有用,所以一旦建立 SSE 连接你需要保持数据发送功能 'alive' 并使 它会在您需要时发送数据。
所以,我会这样做:
import Queue
data_queue = Queue.Queue()
def data_stream():
while True:
data = data_queue.get()
yield 'data: {}\n\n'.format(json.dumps(data))
通过SSE发送数据时,我只是将数据放入队列:
def sse_request():
r = Response(data_stream(), mimetype='text/event-stream')
data_queue.put({'data': [{'lat':0, 'lg':0}]})
return r
def collect_data():
data_queue.put({"data": [request.get_json()]})
...
唯一就是这样写会导致while
循环阻止服务器。所以你需要使用一些额外的东西,比如
gevent or Eventlet。
有很多例子说明如何将 Flask 与这些结合起来。