如何将芹菜与异步相结合?

How to combine Celery with asyncio?

如何创建一个使 celery 任务看起来像 asyncio.Task 的包装器?或者是否有更好的方法将 Celery 与 asyncio 集成?

@asksol,Celery 的创造者,said this::

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

但我找不到任何专门针对 asyncio 框架的代码示例。

编辑:2021 年 1 月 12 日以前的答案(在底部找到它)没有很好地老化因此我添加了可能的解决方案组合,可能会满足那些仍在寻找如何合作的人- 使用 asyncio 和 Celery

让我们先快速分解用例(更深入的分析在这里:asyncio and coroutines vs task queues):

  • 如果任务是 I/O 绑定的,那么使用协程和 asyncio 往往会更好。
  • 如果任务是 CPU 绑定的,那么使用 Celery 或其他类似的任务管理系统往往会更好。

因此,在 Python 的“做好一件事”的背景下,不要尝试将 asyncio 和 celery 混合在一起是有道理的。

但是如果我们希望能够 运行 一个方法既可以异步又可以作为异步任务,会发生什么情况?那么我们可以考虑一些选择:

  • 我能找到的最佳示例如下:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/ (and I just found out that it is ):

    1. 定义异步方法。

    2. 使用 asgirefsync.async_to_sync 模块将异步方法和 运行 同步包装在 celery 任务中:

      # tasks.py
      import asyncio
      from asgiref.sync import async_to_sync
      from celery import Celery
      
      app = Celery('async_test', broker='a_broker_url_goes_here')
      
      async def return_hello():
          await asyncio.sleep(1)
          return 'hello'
      
      
      @app.task(name="sync_task")
      def sync_task():
          async_to_sync(return_hello)()
      
  • 我在 FastAPI 应用程序中遇到的一个用例与前面的示例相反:

    1. 密集的 CPU 绑定进程正在占用异步端点。

    2. 解决方案是将异步CPU绑定进程重构为celery任务,并传递一个任务实例从Celery队列中执行。

    3. 该案例可视化的最小示例:

      import asyncio
      import uvicorn
      
      from celery import Celery
      from fastapi import FastAPI
      
      app = FastAPI(title='Example')
      worker = Celery('worker', broker='a_broker_url_goes_here')
      
      @worker.task(name='cpu_boun')
      def cpu_bound_task():
          # Does stuff but let's simplify it
          print([n for n in range(1000)])
      
      @app.get('/calculate')
      async def calculate():
          cpu_bound_task.delay()
      
      if __name__ == "__main__":
          uvicorn.run('main:app', host='0.0.0.0', port=8000)
      
  • 另一个解决方案似乎是 and 在他们的答案中提出的,但我们必须记住,当我们混合执行同步和异步执行时,性能往往会受到影响,因此在我们决定在生产环境中使用它们之前,需要监控这些答案。

最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过)但我会在这里列出它们:

  • Celery Pool AsyncIO 这似乎完全解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天的 0.2.0 版本 01/12/2021)
  • aiotasks 声称是“一个类似 Celery 的任务管理器,可以分发 Asyncio 协同程序”,但看起来有点陈旧(大约 2 年前的最新提交)

嗯,那不是很老吗? Celery 5.0 版没有实现 asyncio 兼容性,因此我们无法知道何时以及是否会实现它...出于响应遗留原因(因为它是当时的答案)和继续评论而将其留在这里。

如官方网站所述,从 Celery 5.0 版开始这将成为可能:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。
  2. 放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 可以让我们利用输入、async/await、asyncio、和类似的概念在旧版本中没有替代方案。

以上引用自上一篇link.

所以最好的办法是等待 5.0 版 分发!

与此同时,编码愉快:)

您可以使用 run_in_executor 将任何阻塞调用包装到任务中,如 documentation, I also added in the example a custom timeout:

中所述
def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)

我发现最简洁的方法是将 async 函数包装在 asgiref.sync.async_to_sync 中(来自 asgiref):

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

我从我写的 blog post 中提取了这个例子。

这个简单的方法对我很有效:

import asyncio
from celery import Celery

app = Celery('tasks')

async def async_function(param1, param2):
    # more async stuff...
    pass

@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))

我通过在 celery-pool-asyncio 库中组合 Celery 和 asyncio 解决了问题。

这是一个简单的助手,您可以使用它来使 Celery 任务可等待:

import asyncio
from asgiref.sync import sync_to_async

# Converts a Celery tasks to an async function
def task_to_async(task):
    async def wrapper(*args, **kwargs):
        delay = 0.1
        async_result = await sync_to_async(task.delay)(*args, **kwargs)
        while not async_result.ready():
            await asyncio.sleep(delay)
            delay = min(delay * 1.5, 2)  # exponential backoff, max 2 seconds
        return async_result.get()
    return wrapper

sync_to_async一样,可以作为直接包装器使用:

@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await task_to_async(get_answer)()

...作为装饰者:

@task_to_async
@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await get_answer()

当然,这不是一个完美的解决方案,因为它依赖于polling。 但是,在 Celery officially provides a better solution.

之前从 Django 异步视图调用 Celery 任务应该是一个很好的解决方法

编辑 2021/03/02:添加了对 sync_to_async 的调用以支持 eager 模式

这是我在必要时处理异步协程的 Celery 实现:

封装 Celery class 以扩展其功能:

from celery import Celery
from inspect import isawaitable
import asyncio


class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task

        class ContextTask(TaskBase):
            abstract = True

            async def _run(self, *args, **kwargs):
                result = TaskBase.__call__(self, *args, **kwargs)
                if isawaitable(result):
                    await result

            def __call__(self, *args, **kwargs):
                asyncio.run(self._run(*args, **kwargs))

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app

        conf = {}
        for key in app.config.keys():
            if key[0:7] == 'CELERY_':
                conf[key[7:].lower()] = app.config[key]

        if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
            conf['broker_transport_options'] = {'region': 'eu-west-1'}

        self.config_from_object(conf)


celery = AsyncCelery()