Pytest with Celery tasks works but shows "ERROR in consumer: Received unregistered task of type X"

我让 Pytest 根据这个 Stack Overflow 问答来测试 Celery 任务:Celery's pytest fixtures (celery_worker and celery_app) does not work


import pytest

def celery_config():
    return {
        "broker_url": REDIS_URL,
        "result_backend": REDIS_URL,


import pytest

class TestMyCeleryTask:

    def test_run_task(self) -> None:

所有测试都通过了。但是,无论我以何种顺序导入 Celery App and/or 我的任务,我总是收到以下输出:

ERROR in consumer: Received unregistered task of type 'my_celery_task'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
for more information.

请注意,我使用的是老式的基于 class 的任务方法,而不是使用装饰器将函数转换为 classes。

无论使用哪个 Celery Pytest fixture 来获取 Celery App 实例,该特定实例都需要注册 Celery 任务。

在这种情况下,您使用的是 Celery 会话实例:


要为所有测试注册您的 Celery 任务,您可以将以下内容添加到“conftest.py”。


import pytest
from my_application import MyCeleryTask

@pytest.fixture(scope="session", autouse=True)
def celery_register_tasks(celery_session_app):

def celery_config():
    return {
        "broker_url": REDIS_URL,
        "result_backend": REDIS_URL,