将网络服务器添加到现有 python 服务
Add webserver to existing python service
我有一个持续运行的脚本,用于处理从外部设备获取的数据。核心逻辑如下:
from external_module import process_data, get_data, load_interesting_things
class MyService:
def __init__(self):
self.interesting_items = load_interesting_things()
self.run()
def run(self):
try:
while True:
data = get_data()
for item in self.interesting_items:
item.add_datapoint(process_data(data, item))
except KeyboardInterrupt:
pass
我想添加通过 RESTful API.
请求各种有趣事物信息的功能
有没有一种方法可以向程序中添加 Flask Web 服务之类的东西,以便 Web 服务可以从 interesting_items
列表中获取统计数据到 return?例如:
@app.route("/item/<idx>/average")
def average(idx: int):
avg = interesting_items[idx].getAverage()
return jsonify({"average":avg})
假设有必要的 idx 边界检查和任何适当的锁定实施。
不一定非得是Flask,但要轻量级。我想避免使用数据库。我更愿意使用网络服务,但如果不完全重构代码库就不可能,我可以改用套接字,但这不太可取。
服务器 运行 仅在本地网络上,通常只处理一个用户,有时可能有几个。
我需要将 run()
方法从 __init__()
方法中移出,这样我就可以全局引用该服务,并在一个单独的目录中启动 运行 方法线。大致如下:
service = MyService()
service_thread = threading.Thread(target=service.run, daemon=True)
service_thread.start()
app = flask.Flask("appname")
...
@app.route("/item/<idx>/average")
def average(idx: int):
avg = service.interesting_items[idx].getAverage()
return jsonify({"average":avg})
...
app.run()
我有一个持续运行的脚本,用于处理从外部设备获取的数据。核心逻辑如下:
from external_module import process_data, get_data, load_interesting_things
class MyService:
def __init__(self):
self.interesting_items = load_interesting_things()
self.run()
def run(self):
try:
while True:
data = get_data()
for item in self.interesting_items:
item.add_datapoint(process_data(data, item))
except KeyboardInterrupt:
pass
我想添加通过 RESTful API.
请求各种有趣事物信息的功能有没有一种方法可以向程序中添加 Flask Web 服务之类的东西,以便 Web 服务可以从 interesting_items
列表中获取统计数据到 return?例如:
@app.route("/item/<idx>/average")
def average(idx: int):
avg = interesting_items[idx].getAverage()
return jsonify({"average":avg})
假设有必要的 idx 边界检查和任何适当的锁定实施。
不一定非得是Flask,但要轻量级。我想避免使用数据库。我更愿意使用网络服务,但如果不完全重构代码库就不可能,我可以改用套接字,但这不太可取。
服务器 运行 仅在本地网络上,通常只处理一个用户,有时可能有几个。
我需要将 run()
方法从 __init__()
方法中移出,这样我就可以全局引用该服务,并在一个单独的目录中启动 运行 方法线。大致如下:
service = MyService()
service_thread = threading.Thread(target=service.run, daemon=True)
service_thread.start()
app = flask.Flask("appname")
...
@app.route("/item/<idx>/average")
def average(idx: int):
avg = service.interesting_items[idx].getAverage()
return jsonify({"average":avg})
...
app.run()