Flask 应用程序中的异步方法未在后台执行
Async Method Not Executed in Background in Flask Application
我有一个用 Flask 和 Graphene 包构建的 GraphQL API,运行 Python 3.5.4。其中一个 GraphQL 突变需要一些时间来执行(2 到 5 分钟),我不希望最终用户等待其执行完成。我希望在后台执行突变方法并立即 return 向用户发送消息。
我查看了 asyncio 包,但出于某种原因,执行仍然出现在最前面,我的脚本正在等待。你知道我做错了什么吗?脚本很长,所以我在下面总结了与 asyncio 相关的关键元素。
文件mutation_migration_plan.py
from migration_script import Migration, main
import asyncio
[...]
class executeMigrationPlan(graphene.Mutation):
"""Mutation to execute a migration plan."""
[...]
@staticmethod
def mutate(root, info, input):
[...]
# Execute migration asynchronously
print('Execute migration asynchronously')
loop = asyncio.get_event_loop()
loop.run_until_complete(main(migration_plan))
print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
ok = True
message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
return executeMigrationPlan(ok=ok, message=message)
文件migration_script.py
class Migration():
"""Class to execute migration of Plan, Step, Object."""
@staticmethod
async def migrate(migration_plan, migration_step=None, migration_object=None):
[...]
async def main(migration_plan, migration_step=None, migration_object=None):
asyncio.ensure_future(Migration.migrate(migration_plan, migration_step, migration_object))
基本上我希望在我的控制台 window 中几乎立即看到 print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
而方法 loop.run_until_complete(main(migration_plan))
但现在情况并非如此,打印仅出现在执行结束。
[更新]
根据下面@Vincent 的回答,我更新了第一个文件 File mutation_migration_plan.py 以使用 ThreadPoolExecutor 并删除了两个文件中与 asyncio 相关的所有内容。
文件mutation_migration_plan.py
from migration_script import Migration
from concurrent.futures import ThreadPoolExecutor
[...]
class executeMigrationPlan(graphene.Mutation):
"""Mutation to execute a migration plan."""
[...]
@staticmethod
def mutate(root, info, input):
[...]
# Execute migration asynchronously
print('Execute migration asynchronously')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(Migration.migrate, migration_plan)
# print(future.result())
print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
ok = True
message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
return executeMigrationPlan(ok=ok, message=message)
当我添加 print(future.result())
行时,我的脚本运行良好,但它不会在后台执行(这是有道理的,因为我正在尝试打印结果)。但是,当我注释掉打印时,我的方法 Migration.migrate
似乎没有正确执行(我知道是因为我没有在我的数据库中看到结果)。知道为什么吗?
[更新二]
文件mutation_migration_plan.py
我已经能够使用 ProcessPoolExecutor 异步执行我的方法并删除两个文件上对 asyncio 的所有引用。见以下代码:
文件mutation_migration_plan.py
from concurrent.futures import ProcessPoolExecutor
[...]
class executeMigrationPlan(graphene.Mutation):
"""Mutation to execute a migration plan."""
[...]
@staticmethod
def mutate(root, info, input):
[...]
# Execute migration asynchronously
print('Execute migration asynchronously')
executor = ProcessPoolExecutor()
executor.submit(Migration.migrate, migration_plan.id)
print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
ok = True
message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
return executeMigrationPlan(ok=ok, message=message)
它可以正常工作,但尽管进程在后端执行,但我的 Falsk 应用程序需要很长时间才能发送 http 响应,并且响应有时为空。
这是一个常见的误解,但您不能将 asyncio 插入现有应用程序并期望它能正常工作。在 asyncio 中,每个阻塞调用都必须在协程上下文中使用 await
语法。只有这样才能实现单线程并发。这意味着您必须使用 aiohttp instead of flask, along with a library like aiohttp-graphql.
这需要对您的应用程序进行重大重写。如果您不想经历这些,还有其他解决方案可以与 Flask 很好地集成。你可以使用 celery as pointed by @dirn, or one of the executors provided by concurrent.futures.
我有一个用 Flask 和 Graphene 包构建的 GraphQL API,运行 Python 3.5.4。其中一个 GraphQL 突变需要一些时间来执行(2 到 5 分钟),我不希望最终用户等待其执行完成。我希望在后台执行突变方法并立即 return 向用户发送消息。
我查看了 asyncio 包,但出于某种原因,执行仍然出现在最前面,我的脚本正在等待。你知道我做错了什么吗?脚本很长,所以我在下面总结了与 asyncio 相关的关键元素。
文件mutation_migration_plan.py
from migration_script import Migration, main
import asyncio
[...]
class executeMigrationPlan(graphene.Mutation):
"""Mutation to execute a migration plan."""
[...]
@staticmethod
def mutate(root, info, input):
[...]
# Execute migration asynchronously
print('Execute migration asynchronously')
loop = asyncio.get_event_loop()
loop.run_until_complete(main(migration_plan))
print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
ok = True
message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
return executeMigrationPlan(ok=ok, message=message)
文件migration_script.py
class Migration():
"""Class to execute migration of Plan, Step, Object."""
@staticmethod
async def migrate(migration_plan, migration_step=None, migration_object=None):
[...]
async def main(migration_plan, migration_step=None, migration_object=None):
asyncio.ensure_future(Migration.migrate(migration_plan, migration_step, migration_object))
基本上我希望在我的控制台 window 中几乎立即看到 print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
而方法 loop.run_until_complete(main(migration_plan))
但现在情况并非如此,打印仅出现在执行结束。
[更新]
根据下面@Vincent 的回答,我更新了第一个文件 File mutation_migration_plan.py 以使用 ThreadPoolExecutor 并删除了两个文件中与 asyncio 相关的所有内容。
文件mutation_migration_plan.py
from migration_script import Migration
from concurrent.futures import ThreadPoolExecutor
[...]
class executeMigrationPlan(graphene.Mutation):
"""Mutation to execute a migration plan."""
[...]
@staticmethod
def mutate(root, info, input):
[...]
# Execute migration asynchronously
print('Execute migration asynchronously')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(Migration.migrate, migration_plan)
# print(future.result())
print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
ok = True
message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
return executeMigrationPlan(ok=ok, message=message)
当我添加 print(future.result())
行时,我的脚本运行良好,但它不会在后台执行(这是有道理的,因为我正在尝试打印结果)。但是,当我注释掉打印时,我的方法 Migration.migrate
似乎没有正确执行(我知道是因为我没有在我的数据库中看到结果)。知道为什么吗?
[更新二]
文件mutation_migration_plan.py
我已经能够使用 ProcessPoolExecutor 异步执行我的方法并删除两个文件上对 asyncio 的所有引用。见以下代码:
文件mutation_migration_plan.py
from concurrent.futures import ProcessPoolExecutor
[...]
class executeMigrationPlan(graphene.Mutation):
"""Mutation to execute a migration plan."""
[...]
@staticmethod
def mutate(root, info, input):
[...]
# Execute migration asynchronously
print('Execute migration asynchronously')
executor = ProcessPoolExecutor()
executor.submit(Migration.migrate, migration_plan.id)
print('Migration plan execution has started. You will receive an e-mail when it is terminated.')
ok = True
message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
return executeMigrationPlan(ok=ok, message=message)
它可以正常工作,但尽管进程在后端执行,但我的 Falsk 应用程序需要很长时间才能发送 http 响应,并且响应有时为空。
这是一个常见的误解,但您不能将 asyncio 插入现有应用程序并期望它能正常工作。在 asyncio 中,每个阻塞调用都必须在协程上下文中使用 await
语法。只有这样才能实现单线程并发。这意味着您必须使用 aiohttp instead of flask, along with a library like aiohttp-graphql.
这需要对您的应用程序进行重大重写。如果您不想经历这些,还有其他解决方案可以与 Flask 很好地集成。你可以使用 celery as pointed by @dirn, or one of the executors provided by concurrent.futures.