运行 协程并发到 Web 服务器

Running coroutines concurrently to a web server

我想 运行 一个 quart 服务器和其他协同程序。但是,服务器似乎正在阻止事件循环。服务器响应请求,但是定时例程和队列处理例程都没有运行ning.

import quart
import asyncio
import signal
import random

q = asyncio.Queue()

app = quart.Quart(__name__)

async def run_timer(start=0):
  global q
  i = start
  while True:
    print(f"Current value: {i}")
    await asyncio.sleep(2)
    i += 1

async def process_queue():
  global q
  print("Start queue processing")
  while True:
    val = await q.get()
    print(f"Queue: {val}")
    # Simulate blocking operation
    time.sleep(random.random())

@app.route('/add', methods=['POST'])
async def add():
  global q
  print(await quart.request.get_data())
  values = await quart.request.form 
  print(f"Request: {dict(values)}")
  if 'message' in values:
    q.put_nowait(values['message'])
  return 'Request received'

async def main():
  tasks = [run_timer(0), process_queue(), app.run_task(port=8080,use_reloader=False)]
  await asyncio.gather(*tasks)

asyncio.run(main())

输出:

Current value: 0
Start queue processing
[2021-08-14 12:51:49,541] Running on http://127.0.0.1:8080 (CTRL + C to quit)
Request: {'message': 'test'}
[2021-08-14 12:51:51,837] 127.0.0.1:59990 POST /add 1.1 200 16 1526

消息发送时 curl -d 'message=test' LOCAlhost:8080/add

最好停止 SIGTERM

上的所有协程

在尝试使用 Quart 几次失败后,主要是因为我不知道如何将队列传递给请求处理程序,我找到了使用 aiohttp 的解决方案。

from aiohttp import web
import asyncio
import queue
import functools
import random
from signal import SIGINT, SIGTERM

app = web.Application()

async def run_timer(q):
  i = 0
  while True:
    print(f"[Timer] Send message: {i}")
    await q.put(str(i))
    i += 1
    await asyncio.sleep(10)

async def process_queue(q):
  print("[Queue] Start processing")
  id = 0
  while True:
    val = await q.get()
    print(f"[Queue] Process id={id}: {val}")
    await asyncio.sleep(random.randint(3,5))
    print(f"[Queue] Finished id={id}")
    id += 1

async def add(q, request):
  values = await request.post()
  print(f"[Server] Received request: {dict(values)}")
  if 'message' in values:
    msg = values['message']
    await q.put(msg)
    print(f"[Server] Added to queue: {msg}")
    return web.Response(text="Message added to queue\n")

async def start_server():
  runner = web.AppRunner(app)
  await runner.setup()
  site = web.TCPSite(runner, 'localhost', 8080)
  await site.start()

  while True:
      await asyncio.sleep(3600) # sleep forever

def handler(sig):
  print(f"Received shutdown signal ({sig!s})")
  loop = asyncio.get_running_loop()
  for task in asyncio.all_tasks(loop=loop):
    task.cancel()
  loop.remove_signal_handler(SIGTERM)
  loop.add_signal_handler(SIGINT, lambda: None)

async def main():
  loop = asyncio.get_running_loop()
  for sig in (SIGINT,SIGTERM):
    loop.add_signal_handler(sig, handler, sig)

  try:
    q = asyncio.Queue(maxsize=5)

    app.add_routes([web.post('/add', functools.partial(add, q))])

    tasks = [run_timer(q), process_queue(q), start_server()]

    # return_exceptions=False to catch errors easily
    r = await asyncio.gather(*tasks)
  except asyncio.CancelledError:
    print("Stop")

asyncio.run(main())

我还添加了一些取消逻辑。为了测试网络服务器,我使用了这个命令:for i in $(seq 1 10); do; curl -d "message=test$i" localhost:8080/add; done

也许这对也在学习如何使用 asyncio 的人有用。