带有 Celery 任务的 Pytest 有效但显示 "ERROR in consumer: Received unregistered task of type X"

Pytest with Celery tasks works but shows "ERROR in consumer: Received unregistered task of type X"

我让 Pytest 根据这个 Stack Overflow 问答来测试 Celery 任务:Celery's pytest fixtures (celery_worker and celery_app) does not work

conftest.py

import pytest


@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": REDIS_URL,
        "result_backend": REDIS_URL,
    }

测试通过以下配置:

import pytest


@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
class TestMyCeleryTask:

    def test_run_task(self) -> None:
        ...

所有测试都通过了。但是,无论我以何种顺序导入 Celery App and/or 我的任务,我总是收到以下输出:

ERROR in consumer: Received unregistered task of type 'my_celery_task'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

请注意,我使用的是老式的基于 class 的任务方法,而不是使用装饰器将函数转换为 classes。

无论使用哪个 Celery Pytest fixture 来获取 Celery App 实例,该特定实例都需要注册 Celery 任务。

在这种情况下,您使用的是 Celery 会话实例:

@pytest.mark.usefixtures("celery_session_app")

要为所有测试注册您的 Celery 任务,您可以将以下内容添加到“conftest.py”。

conftest.py

import pytest
from my_application import MyCeleryTask


@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
@pytest.fixture(scope="session", autouse=True)
def celery_register_tasks(celery_session_app):
    celery_session_app.register_task(MyCeleryTask)


@pytest.fixture(scope="session")
def celery_config():
    return {
        "broker_url": REDIS_URL,
        "result_backend": REDIS_URL,
    }