运行 协程并发到 Web 服务器
Running coroutines concurrently to a web server
我想 运行 一个 quart 服务器和其他协同程序。但是,服务器似乎正在阻止事件循环。服务器响应请求,但是定时例程和队列处理例程都没有运行ning.
import quart
import asyncio
import signal
import random
q = asyncio.Queue()
app = quart.Quart(__name__)
async def run_timer(start=0):
global q
i = start
while True:
print(f"Current value: {i}")
await asyncio.sleep(2)
i += 1
async def process_queue():
global q
print("Start queue processing")
while True:
val = await q.get()
print(f"Queue: {val}")
# Simulate blocking operation
time.sleep(random.random())
@app.route('/add', methods=['POST'])
async def add():
global q
print(await quart.request.get_data())
values = await quart.request.form
print(f"Request: {dict(values)}")
if 'message' in values:
q.put_nowait(values['message'])
return 'Request received'
async def main():
tasks = [run_timer(0), process_queue(), app.run_task(port=8080,use_reloader=False)]
await asyncio.gather(*tasks)
asyncio.run(main())
输出:
Current value: 0
Start queue processing
[2021-08-14 12:51:49,541] Running on http://127.0.0.1:8080 (CTRL + C to quit)
Request: {'message': 'test'}
[2021-08-14 12:51:51,837] 127.0.0.1:59990 POST /add 1.1 200 16 1526
消息发送时 curl -d 'message=test' LOCAlhost:8080/add
最好停止 SIGTERM
上的所有协程
在尝试使用 Quart 几次失败后,主要是因为我不知道如何将队列传递给请求处理程序,我找到了使用 aiohttp 的解决方案。
from aiohttp import web
import asyncio
import queue
import functools
import random
from signal import SIGINT, SIGTERM
app = web.Application()
async def run_timer(q):
i = 0
while True:
print(f"[Timer] Send message: {i}")
await q.put(str(i))
i += 1
await asyncio.sleep(10)
async def process_queue(q):
print("[Queue] Start processing")
id = 0
while True:
val = await q.get()
print(f"[Queue] Process id={id}: {val}")
await asyncio.sleep(random.randint(3,5))
print(f"[Queue] Finished id={id}")
id += 1
async def add(q, request):
values = await request.post()
print(f"[Server] Received request: {dict(values)}")
if 'message' in values:
msg = values['message']
await q.put(msg)
print(f"[Server] Added to queue: {msg}")
return web.Response(text="Message added to queue\n")
async def start_server():
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600) # sleep forever
def handler(sig):
print(f"Received shutdown signal ({sig!s})")
loop = asyncio.get_running_loop()
for task in asyncio.all_tasks(loop=loop):
task.cancel()
loop.remove_signal_handler(SIGTERM)
loop.add_signal_handler(SIGINT, lambda: None)
async def main():
loop = asyncio.get_running_loop()
for sig in (SIGINT,SIGTERM):
loop.add_signal_handler(sig, handler, sig)
try:
q = asyncio.Queue(maxsize=5)
app.add_routes([web.post('/add', functools.partial(add, q))])
tasks = [run_timer(q), process_queue(q), start_server()]
# return_exceptions=False to catch errors easily
r = await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("Stop")
asyncio.run(main())
我还添加了一些取消逻辑。为了测试网络服务器,我使用了这个命令:for i in $(seq 1 10); do; curl -d "message=test$i" localhost:8080/add; done
也许这对也在学习如何使用 asyncio 的人有用。
我想 运行 一个 quart 服务器和其他协同程序。但是,服务器似乎正在阻止事件循环。服务器响应请求,但是定时例程和队列处理例程都没有运行ning.
import quart
import asyncio
import signal
import random
q = asyncio.Queue()
app = quart.Quart(__name__)
async def run_timer(start=0):
global q
i = start
while True:
print(f"Current value: {i}")
await asyncio.sleep(2)
i += 1
async def process_queue():
global q
print("Start queue processing")
while True:
val = await q.get()
print(f"Queue: {val}")
# Simulate blocking operation
time.sleep(random.random())
@app.route('/add', methods=['POST'])
async def add():
global q
print(await quart.request.get_data())
values = await quart.request.form
print(f"Request: {dict(values)}")
if 'message' in values:
q.put_nowait(values['message'])
return 'Request received'
async def main():
tasks = [run_timer(0), process_queue(), app.run_task(port=8080,use_reloader=False)]
await asyncio.gather(*tasks)
asyncio.run(main())
输出:
Current value: 0
Start queue processing
[2021-08-14 12:51:49,541] Running on http://127.0.0.1:8080 (CTRL + C to quit)
Request: {'message': 'test'}
[2021-08-14 12:51:51,837] 127.0.0.1:59990 POST /add 1.1 200 16 1526
消息发送时 curl -d 'message=test' LOCAlhost:8080/add
最好停止 SIGTERM
在尝试使用 Quart 几次失败后,主要是因为我不知道如何将队列传递给请求处理程序,我找到了使用 aiohttp 的解决方案。
from aiohttp import web
import asyncio
import queue
import functools
import random
from signal import SIGINT, SIGTERM
app = web.Application()
async def run_timer(q):
i = 0
while True:
print(f"[Timer] Send message: {i}")
await q.put(str(i))
i += 1
await asyncio.sleep(10)
async def process_queue(q):
print("[Queue] Start processing")
id = 0
while True:
val = await q.get()
print(f"[Queue] Process id={id}: {val}")
await asyncio.sleep(random.randint(3,5))
print(f"[Queue] Finished id={id}")
id += 1
async def add(q, request):
values = await request.post()
print(f"[Server] Received request: {dict(values)}")
if 'message' in values:
msg = values['message']
await q.put(msg)
print(f"[Server] Added to queue: {msg}")
return web.Response(text="Message added to queue\n")
async def start_server():
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600) # sleep forever
def handler(sig):
print(f"Received shutdown signal ({sig!s})")
loop = asyncio.get_running_loop()
for task in asyncio.all_tasks(loop=loop):
task.cancel()
loop.remove_signal_handler(SIGTERM)
loop.add_signal_handler(SIGINT, lambda: None)
async def main():
loop = asyncio.get_running_loop()
for sig in (SIGINT,SIGTERM):
loop.add_signal_handler(sig, handler, sig)
try:
q = asyncio.Queue(maxsize=5)
app.add_routes([web.post('/add', functools.partial(add, q))])
tasks = [run_timer(q), process_queue(q), start_server()]
# return_exceptions=False to catch errors easily
r = await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("Stop")
asyncio.run(main())
我还添加了一些取消逻辑。为了测试网络服务器,我使用了这个命令:for i in $(seq 1 10); do; curl -d "message=test$i" localhost:8080/add; done
也许这对也在学习如何使用 asyncio 的人有用。