将从 json 请求接收到的数据传递给 python 中的另一个函数 flask socketio
Passing data received from a json request to another function in python flask socketio
我有一个 python 烧瓶应用程序,它从 json 接收数据。我还使用了 socketio 和线程来实时处理数据。
在我的程序中,我需要将从 json 请求接收到的数据发送到另一个 python 函数。
下面是我为此编写的代码:-
from flask_socketio import SocketIO, emit
from flask import Flask, render_template, request, url_for, copy_current_request_context
from random import random
from time import sleep
from pygeodesy.ellipsoidalVincenty import LatLon
from threading import Thread, Event
__author__ = 'shark'
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.config['DEBUG'] = True
# turn the flask app into a socketio app
socketio = SocketIO(app, async_mode=None, logger=True, engineio_logger=True)
# random number Generator Thread
thread = Thread()
thread_stop_event = Event()
@app.route('/platform-data', methods=['POST'])
def platformData():
"""
Generate a random number every 1 second and emit to a socketio instance (broadcast)
Ideally to be run in a separate thread?
"""
# infinite loop of magical random numbers
print("Receiving platform data")
while not thread_stop_event.isSet():
req_data = request.get_json()
id = req_data['id']
latitude = req_data['coordinates'][1]
longitude = req_data['coordinates'][0]
speed = req_data['speed']
angle = req_data['angle']
length = req_data['dimensions'][0]
width = req_data['dimensions'][1]
laneW = req_data['lane_width']
spdLmt = req_data['speed_limit']
return testProcess(speed)
def testProcess(speed):
if speed>30:
print("slow down")
socketio.emit('speed', {'speed': speed}, namespace='/test')
socketio.sleep(.5)
@app.route('/')
def index():
# only by sending this page first will the client be connected to the socketio instance
return render_template('index.html')
@socketio.on('connect', namespace='/test')
def test_connect():
# need visibility of the global thread object
global thread
print('Client connected')
# Start the random number generator thread only if the thread has not been started before.
if not thread.isAlive():
print("Starting Thread")
thread = socketio.start_background_task(platformData)
@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected')
if __name__ == '__main__':
socketio.run(app)
但是,当我 运行 应用程序和来自 Postman 的 POST 数据时,我在控制台中收到以下错误:-
TypeError: The view function did not return a valid response. The
return type must be a string, dict, tuple, Response instance, or WSGI
callable, but it was a int.
127.0.0.1 - - [05/Mar/2020 17:06:25] "POST /platform-data HTTP/1.1" 500 15625 0.008975
我知道这是因为我已经声明了return testProcess(speed)
。
因此,我需要知道将速度变量传递给 'testProcess' 函数的正确方法。
platformData() 必须 return 一个字符串、字典、元组、Response 实例或 WSGI 可调用但你使 return 未定义的 testProcess return
尝试:
def testProcess(speed):
if speed>30:
return "slow down"
else:
return "ok"
或者在没有 return 的情况下调用 testProcess(speed)
然后 return 其他东西
我有一个 python 烧瓶应用程序,它从 json 接收数据。我还使用了 socketio 和线程来实时处理数据。 在我的程序中,我需要将从 json 请求接收到的数据发送到另一个 python 函数。 下面是我为此编写的代码:-
from flask_socketio import SocketIO, emit
from flask import Flask, render_template, request, url_for, copy_current_request_context
from random import random
from time import sleep
from pygeodesy.ellipsoidalVincenty import LatLon
from threading import Thread, Event
__author__ = 'shark'
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.config['DEBUG'] = True
# turn the flask app into a socketio app
socketio = SocketIO(app, async_mode=None, logger=True, engineio_logger=True)
# random number Generator Thread
thread = Thread()
thread_stop_event = Event()
@app.route('/platform-data', methods=['POST'])
def platformData():
"""
Generate a random number every 1 second and emit to a socketio instance (broadcast)
Ideally to be run in a separate thread?
"""
# infinite loop of magical random numbers
print("Receiving platform data")
while not thread_stop_event.isSet():
req_data = request.get_json()
id = req_data['id']
latitude = req_data['coordinates'][1]
longitude = req_data['coordinates'][0]
speed = req_data['speed']
angle = req_data['angle']
length = req_data['dimensions'][0]
width = req_data['dimensions'][1]
laneW = req_data['lane_width']
spdLmt = req_data['speed_limit']
return testProcess(speed)
def testProcess(speed):
if speed>30:
print("slow down")
socketio.emit('speed', {'speed': speed}, namespace='/test')
socketio.sleep(.5)
@app.route('/')
def index():
# only by sending this page first will the client be connected to the socketio instance
return render_template('index.html')
@socketio.on('connect', namespace='/test')
def test_connect():
# need visibility of the global thread object
global thread
print('Client connected')
# Start the random number generator thread only if the thread has not been started before.
if not thread.isAlive():
print("Starting Thread")
thread = socketio.start_background_task(platformData)
@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected')
if __name__ == '__main__':
socketio.run(app)
但是,当我 运行 应用程序和来自 Postman 的 POST 数据时,我在控制台中收到以下错误:-
TypeError: The view function did not return a valid response. The return type must be a string, dict, tuple, Response instance, or WSGI callable, but it was a int. 127.0.0.1 - - [05/Mar/2020 17:06:25] "POST /platform-data HTTP/1.1" 500 15625 0.008975
我知道这是因为我已经声明了return testProcess(speed)
。
因此,我需要知道将速度变量传递给 'testProcess' 函数的正确方法。
platformData() 必须 return 一个字符串、字典、元组、Response 实例或 WSGI 可调用但你使 return 未定义的 testProcess return 尝试:
def testProcess(speed):
if speed>30:
return "slow down"
else:
return "ok"
或者在没有 return 的情况下调用 testProcess(speed) 然后 return 其他东西