asyncio.Queue 作为像 Quartz 这样的 Web 服务器中的生产者-消费者流
asyncio.Queue as producer-consumer flow in a webserver like Quart
是否可以使用 asyncio.Queue 和像 Quart 这样的网络服务器来在生产者和消费者之间进行通信?
这是我想要做的....
from quart import Quart, request
import asyncio
queue = asyncio.Queue()
producers = []
consumers = []
async def producer(mesg):
print(f'produced {mesg}')
await queue.put(mesg)
await asyncio.sleep(1) # do some work
async def consumer():
while True:
token = await queue.get()
await asyncio.sleep(1) # do some work
queue.task_done()
print(f'consumed {token}')
@app.route('/route', methods=['POST'])
async def index():
mesg = await request.get_data()
try:
p = asyncio.create_task(producer(mesg))
producers.append(p)
c = asyncio.create_task(consumer())
consumers.append(c)
return f"published message {mesg}", 200
except Exception as e:
logger.exception("Failed tp publish message %s!", mesg)
return f"Failed to publish message: {mesg}", 400
if __name__ == '__main__':
PORT = int(os.getenv('PORT')) if os.getenv('PORT') else 8050
app.run(host='0.0.0.0', port=PORT, debug=True)
这很好用。
但我不确定这是否是一个好的做法,因为我很困惑如何(在我的代码中的什么地方)执行以下步骤。
# Making sure all the producers have completed
await asyncio.gather(*producers)
#wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
编辑-1:
我已经尝试使用 @app.after_serving
和一些 logger.debug
语句。
@app.after_serving
async def shutdown():
logger.debug("Shutting down...")
logger.debug("waiting for producers to finish...")
await asyncio.gather(*producers)
logger.debug("waiting for tasks to complete...")
await queue.join()
logger.debug("cancelling consumers...")
for c in consumers:
c.cancel()
但是当 hypercorn
正常关闭时,不会打印调试语句。因此,我不确定在关机期间是否实际调用了装饰有 @app.after_serving
的函数(关机)。
这是关机期间来自 hypercorn
的消息
appserver_1 | 2020-05-29 15:55:14,200 - base_events.py:1490 - create_server - INFO - <Server sockets=(<asyncio.TransportSocket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8080)>,)> is serving
appserver_1 | Running on 0.0.0.0:8080 over http (CTRL + C to quit)
Gracefully stopping... (press Ctrl+C again to force)
我使用 kill -SIGTERM <PID>
向进程发出正常关闭信号。
But I am not sure if this is a good practice
您在示例中创建的全局变量通常不是企业解决方案中的良好做法。特别是在 Python 中,涉及到全局变量时会出现一些挑剔的行为。
根据我的经验,将变量传递给函数或 class 是一种更简洁的方法。
但是,我不知道如何在 quart 中执行此操作,因为我不使用该库。
# Making sure all the producers have completed
#wait for the remaining tasks to be processed
# cancel the consumers, which are now idle
通常清理任务是在退出事件循环时和退出应用程序之前完成的。
我不知道 quart
是如何工作的,但您可以将逻辑放在 app.run()
之后,以便在事件循环停止后清理任务 运行。
这可能因您的应用程序退出方式而异。
查看文档,了解您可以挂钩的某种 "on shutdown" 事件。
我会将清理代码放在关闭 after_serving 函数中,
@app.after_serving
async def shutdown():
# Making sure all the producers have completed
await asyncio.gather(*producers)
#wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
至于全局变量,我倾向于将它们直接存储在应用程序中,以便可以通过 current_app
代理访问它们。请注意,虽然这(和您的解决方案)仅适用于单个进程(工作者),但如果您想使用多个工作者(或等效的主机),您将需要第三方商店来获取此信息,例如使用 redis.
是否可以使用 asyncio.Queue 和像 Quart 这样的网络服务器来在生产者和消费者之间进行通信?
这是我想要做的....
from quart import Quart, request
import asyncio
queue = asyncio.Queue()
producers = []
consumers = []
async def producer(mesg):
print(f'produced {mesg}')
await queue.put(mesg)
await asyncio.sleep(1) # do some work
async def consumer():
while True:
token = await queue.get()
await asyncio.sleep(1) # do some work
queue.task_done()
print(f'consumed {token}')
@app.route('/route', methods=['POST'])
async def index():
mesg = await request.get_data()
try:
p = asyncio.create_task(producer(mesg))
producers.append(p)
c = asyncio.create_task(consumer())
consumers.append(c)
return f"published message {mesg}", 200
except Exception as e:
logger.exception("Failed tp publish message %s!", mesg)
return f"Failed to publish message: {mesg}", 400
if __name__ == '__main__':
PORT = int(os.getenv('PORT')) if os.getenv('PORT') else 8050
app.run(host='0.0.0.0', port=PORT, debug=True)
这很好用。 但我不确定这是否是一个好的做法,因为我很困惑如何(在我的代码中的什么地方)执行以下步骤。
# Making sure all the producers have completed
await asyncio.gather(*producers)
#wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
编辑-1:
我已经尝试使用 @app.after_serving
和一些 logger.debug
语句。
@app.after_serving
async def shutdown():
logger.debug("Shutting down...")
logger.debug("waiting for producers to finish...")
await asyncio.gather(*producers)
logger.debug("waiting for tasks to complete...")
await queue.join()
logger.debug("cancelling consumers...")
for c in consumers:
c.cancel()
但是当 hypercorn
正常关闭时,不会打印调试语句。因此,我不确定在关机期间是否实际调用了装饰有 @app.after_serving
的函数(关机)。
这是关机期间来自 hypercorn
的消息
appserver_1 | 2020-05-29 15:55:14,200 - base_events.py:1490 - create_server - INFO - <Server sockets=(<asyncio.TransportSocket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8080)>,)> is serving
appserver_1 | Running on 0.0.0.0:8080 over http (CTRL + C to quit)
Gracefully stopping... (press Ctrl+C again to force)
我使用 kill -SIGTERM <PID>
向进程发出正常关闭信号。
But I am not sure if this is a good practice
您在示例中创建的全局变量通常不是企业解决方案中的良好做法。特别是在 Python 中,涉及到全局变量时会出现一些挑剔的行为。 根据我的经验,将变量传递给函数或 class 是一种更简洁的方法。 但是,我不知道如何在 quart 中执行此操作,因为我不使用该库。
# Making sure all the producers have completed
#wait for the remaining tasks to be processed
# cancel the consumers, which are now idle
通常清理任务是在退出事件循环时和退出应用程序之前完成的。
我不知道 quart
是如何工作的,但您可以将逻辑放在 app.run()
之后,以便在事件循环停止后清理任务 运行。
这可能因您的应用程序退出方式而异。
查看文档,了解您可以挂钩的某种 "on shutdown" 事件。
我会将清理代码放在关闭 after_serving 函数中,
@app.after_serving
async def shutdown():
# Making sure all the producers have completed
await asyncio.gather(*producers)
#wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
至于全局变量,我倾向于将它们直接存储在应用程序中,以便可以通过 current_app
代理访问它们。请注意,虽然这(和您的解决方案)仅适用于单个进程(工作者),但如果您想使用多个工作者(或等效的主机),您将需要第三方商店来获取此信息,例如使用 redis.