如何使用带有 asyncio.create_task() 的 Uvicorn 将任务置于后台?

How to use Uvicorn with asyncio.create_task() to put task in background?

假设我有一个由 Uvicorn server 驱动的 Web 应用程序,该应用程序实现了 GraphQL API,其中包含一个在服务器端开始长时间计算的突变和一个检查状态的查询端点服务器。假设我们想知道有多少任务在后台 运行ning。 我有一个简化的代码,它不起作用:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    print('Start to resolve long task...')
    asyncio.create_task(long_task())
    print('Resolve finished!')
    return {}

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has {len(FIFO)} elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation{
        longTask: longTaskResponse
    }
    type longTaskResponse {
        message: String
    }
    type Query {
        ping: String
    }
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

宁运行后

$ python main.py

我在第一个选项卡的 GraphQL GUI 中发送了一个突变:

mutation longTaskQueue{
  longTask {
    message
  }
}

在第二个选项卡中,我尝试检索 FIFO 的长度:

query ping {
  ping
}

似乎可以运行 2 long_task,但ping 正在等待所有long_task 完成。我的一般问题是如何在后台 运行 重代码并且不阻止 GQL API?

经过多次尝试,现在我可以将许多任务置于后台并跟踪它们的数量(API 不会在一项长任务上冻结)。发生的事情是 阻塞计算在池中 运行:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(coro)
    finally:
        loop.close()


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(max_workers=5)
    loop.set_default_executor(ProcessPoolExecutor())
    print('Start to resolve long task...')
    loop.run_in_executor(executor, run, long_task)
    print('Resolve finished!')
    return {}

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has {len(FIFO)} elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation{
        longTask: longTaskResponse
    }
    type longTaskResponse {
        message: String
    }
    type Query {
        ping: String
    }
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

this answer启发的解决方案。