运行 使用 pytest 同时执行 celery 任务
Run celery tasks concurrently using pytest
我正在尝试在我的 Django 应用程序中集成测试并发 celery 任务。在 pytest 集成测试期间,我希望任务实际上 运行 同时在一个工作人员上进行,但我无法完成这项工作。
假设我有以下基本的芹菜任务:
@shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"\t{i + 1}")
return "DONE"
运行 脚本中并发的任务工作正常:
# script
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
但是,我无法在单元测试中复制这种异步行为。在 conftest.py
内,我设置了代理和结果后端:
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "memory://", "result_backend": "rpc://"}
这是我的单元测试。 celery_session_app
和 celery_session_worker
设置用于测试的 celery 应用程序和 worker (docs):
def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3
任务似乎是 运行同步进行的。无论如何 运行 异步任务?
celery_session_worker fixture is starting a celery worker 并发为 1。这在 celery.contrib 中是硬编码的,目前不可配置。由于并发度为 1,因此一次只会处理 1 个任务。
另一种解决方案是编写您自己的 fixture 以启动并发数为 2 或更多的 worker。
我正在尝试在我的 Django 应用程序中集成测试并发 celery 任务。在 pytest 集成测试期间,我希望任务实际上 运行 同时在一个工作人员上进行,但我无法完成这项工作。
假设我有以下基本的芹菜任务:
@shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"\t{i + 1}")
return "DONE"
运行 脚本中并发的任务工作正常:
# script
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3
但是,我无法在单元测试中复制这种异步行为。在 conftest.py
内,我设置了代理和结果后端:
import pytest
pytest_plugins = ("celery.contrib.pytest",)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": "memory://", "result_backend": "rpc://"}
这是我的单元测试。 celery_session_app
和 celery_session_worker
设置用于测试的 celery 应用程序和 worker (docs):
def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
sleep_task.delay(3)
sleep_task.delay(3)
# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3
任务似乎是 运行同步进行的。无论如何 运行 异步任务?
celery_session_worker fixture is starting a celery worker 并发为 1。这在 celery.contrib 中是硬编码的,目前不可配置。由于并发度为 1,因此一次只会处理 1 个任务。
另一种解决方案是编写您自己的 fixture 以启动并发数为 2 或更多的 worker。