运行 使用 pytest 同时执行 celery 任务

Run celery tasks concurrently using pytest

我正在尝试在我的 Django 应用程序中集成测试并发 celery 任务。在 pytest 集成测试期间,我希望任务实际上 运行 同时在一个工作人员上进行,但我无法完成这项工作。

假设我有以下基本的芹菜任务:

@shared_task
def sleep_task(secs):
    print(f"Sleeping for {secs} seconds...")
    for i in range(secs):
        time.sleep(i)
        print(f"\t{i + 1}")
    return "DONE"

运行 脚本中并发的任务工作正常:

# script
sleep_task.delay(3)
sleep_task.delay(3)

# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3

但是,我无法在单元测试中复制这种异步行为。在 conftest.py 内,我设置了代理和结果后端:

import pytest

pytest_plugins = ("celery.contrib.pytest",)

@pytest.fixture(scope="session")
def celery_config():
    return {"broker_url": "memory://", "result_backend": "rpc://"}

这是我的单元测试。 celery_session_appcelery_session_worker 设置用于测试的 celery 应用程序和 worker (docs):

def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
    sleep_task.delay(3)
    sleep_task.delay(3)

# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3

任务似乎是 运行同步进行的。无论如何 运行 异步任务?

celery_session_worker fixture is starting a celery worker 并发为 1。这在 celery.contrib 中是硬编码的,目前不可配置。由于并发度为 1,因此一次只会处理 1 个任务。

另一种解决方案是编写您自己的 fixture 以启动并发数为 2 或更多的 worker。