结合三重奏和烧瓶
Combining trio and flask
我正在尝试创建一个 HTTP API,它可以创建和销毁并发任务,这些任务打开 TCP 连接到远程服务器流式传输约 15 秒的数据。稍后我将不得不弄清楚如何处理数据。现在我只是打印它。
在下面的示例中,我可以通过导航到 http://192.168.1.1:5000/addconnection.
创建多个 TCP 连接
问题:
1)这种做法合理吗?我认为 Flask 可能会为每个 /addconnection 请求创建一个新线程。我不确定这样做会遇到什么性能限制。
2) 是否可以跟踪每个连接?我想实现 /listconnections 和 /removeconnections。
3) 是否有更 Pythonic 的方式来做到这一点?我读过一些关于芹菜的资料,但我还不是很了解。或许还有其他现成的工具可以处理类似的问题。
import trio
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return "Hello World!"
@app.route("/addconnection")
def addconnection():
async def receiver(client_stream):
print("Receiver: started!")
while True:
data = await client_stream.receive_some(16800)
print("Received Data: {}".format(data))
async def parent():
async with trio.open_nursery() as nursery:
client_stream = await trio.open_tcp_stream('192.168.1.1', 1234)
nursery.start_soon(receiver, client_stream)
trio.run(parent)
1) 您将为每个 /addconnection 请求创建一个新的事件循环,这将阻止 Flask 运行时。这可能会将您限制为每个线程只能请求一个。
2) 是的,在最简单的情况下,您可以将它们存储在全局集中,请参阅下面的 connections
。
3) 我是Quart-Trio 的作者,我认为这是一个更好的方法。 Quart 是用 async/await 重新实现的 Flask API(解决了 1) 的大部分问题)。 Quart-Trio 是使用 Trio 而不是 Quart 的 asyncio 的扩展。
大致上(我还没有测试过)你的代码变成了,
import trio
from quart_trio import QuartTrio
connections = set()
app = QuartTrio(__name__)
@app.route("/")
async def hello():
return "Hello World!"
@app.route("/addconnection")
async def addconnection():
async def receiver(client_stream):
print("Receiver: started!")
while True:
data = await client_stream.receive_some(16800)
print("Received Data: {}".format(data))
async def parent():
async with trio.open_nursery() as nursery:
client_stream = await trio.open_tcp_stream('192.168.1.1', 1234)
connections.add(client_stream)
nursery.start_soon(receiver, client_stream)
connections.remove(client_stream)
app.nursery.start_soon(parent)
return "Connection Created"
if __name__ == "__main__":
# Allows this to run and serve via python script.py
# For production use `hypercorn -k trio script:app`
app.run()
在你有async def receiver(client_stream):
的地方,我会在每个循环迭代之间放置一个等待await trio.sleep(0.029)
,让程序的其余部分有机会运行。您可以根据您希望该功能的繁忙程度来增加睡眠时间。但是,如果您执行该循环,您的应用程序可能会冻结。还应使用取消块,这样您就不会永远卡在读取数据中。
我正在尝试创建一个 HTTP API,它可以创建和销毁并发任务,这些任务打开 TCP 连接到远程服务器流式传输约 15 秒的数据。稍后我将不得不弄清楚如何处理数据。现在我只是打印它。
在下面的示例中,我可以通过导航到 http://192.168.1.1:5000/addconnection.
创建多个 TCP 连接问题:
1)这种做法合理吗?我认为 Flask 可能会为每个 /addconnection 请求创建一个新线程。我不确定这样做会遇到什么性能限制。
2) 是否可以跟踪每个连接?我想实现 /listconnections 和 /removeconnections。
3) 是否有更 Pythonic 的方式来做到这一点?我读过一些关于芹菜的资料,但我还不是很了解。或许还有其他现成的工具可以处理类似的问题。
import trio
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return "Hello World!"
@app.route("/addconnection")
def addconnection():
async def receiver(client_stream):
print("Receiver: started!")
while True:
data = await client_stream.receive_some(16800)
print("Received Data: {}".format(data))
async def parent():
async with trio.open_nursery() as nursery:
client_stream = await trio.open_tcp_stream('192.168.1.1', 1234)
nursery.start_soon(receiver, client_stream)
trio.run(parent)
1) 您将为每个 /addconnection 请求创建一个新的事件循环,这将阻止 Flask 运行时。这可能会将您限制为每个线程只能请求一个。
2) 是的,在最简单的情况下,您可以将它们存储在全局集中,请参阅下面的 connections
。
3) 我是Quart-Trio 的作者,我认为这是一个更好的方法。 Quart 是用 async/await 重新实现的 Flask API(解决了 1) 的大部分问题)。 Quart-Trio 是使用 Trio 而不是 Quart 的 asyncio 的扩展。
大致上(我还没有测试过)你的代码变成了,
import trio
from quart_trio import QuartTrio
connections = set()
app = QuartTrio(__name__)
@app.route("/")
async def hello():
return "Hello World!"
@app.route("/addconnection")
async def addconnection():
async def receiver(client_stream):
print("Receiver: started!")
while True:
data = await client_stream.receive_some(16800)
print("Received Data: {}".format(data))
async def parent():
async with trio.open_nursery() as nursery:
client_stream = await trio.open_tcp_stream('192.168.1.1', 1234)
connections.add(client_stream)
nursery.start_soon(receiver, client_stream)
connections.remove(client_stream)
app.nursery.start_soon(parent)
return "Connection Created"
if __name__ == "__main__":
# Allows this to run and serve via python script.py
# For production use `hypercorn -k trio script:app`
app.run()
在你有async def receiver(client_stream):
的地方,我会在每个循环迭代之间放置一个等待await trio.sleep(0.029)
,让程序的其余部分有机会运行。您可以根据您希望该功能的繁忙程度来增加睡眠时间。但是,如果您执行该循环,您的应用程序可能会冻结。还应使用取消块,这样您就不会永远卡在读取数据中。