带有 Celery 任务的 Pytest 有效但显示 "ERROR in consumer: Received unregistered task of type X"
Pytest with Celery tasks works but shows "ERROR in consumer: Received unregistered task of type X"
我让 Pytest 根据这个 Stack Overflow 问答来测试 Celery 任务:Celery's pytest fixtures (celery_worker and celery_app) does not work。
conftest.py
import pytest
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": REDIS_URL,
"result_backend": REDIS_URL,
}
测试通过以下配置:
import pytest
@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
class TestMyCeleryTask:
def test_run_task(self) -> None:
...
所有测试都通过了。但是,无论我以何种顺序导入 Celery App and/or 我的任务,我总是收到以下输出:
ERROR in consumer: Received unregistered task of type 'my_celery_task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
请注意,我使用的是老式的基于 class 的任务方法,而不是使用装饰器将函数转换为 classes。
无论使用哪个 Celery Pytest fixture 来获取 Celery App 实例,该特定实例都需要注册 Celery 任务。
在这种情况下,您使用的是 Celery 会话实例:
@pytest.mark.usefixtures("celery_session_app")
要为所有测试注册您的 Celery 任务,您可以将以下内容添加到“conftest.py”。
conftest.py
import pytest
from my_application import MyCeleryTask
@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
@pytest.fixture(scope="session", autouse=True)
def celery_register_tasks(celery_session_app):
celery_session_app.register_task(MyCeleryTask)
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": REDIS_URL,
"result_backend": REDIS_URL,
}
我让 Pytest 根据这个 Stack Overflow 问答来测试 Celery 任务:Celery's pytest fixtures (celery_worker and celery_app) does not work。
conftest.py
import pytest
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": REDIS_URL,
"result_backend": REDIS_URL,
}
测试通过以下配置:
import pytest
@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
class TestMyCeleryTask:
def test_run_task(self) -> None:
...
所有测试都通过了。但是,无论我以何种顺序导入 Celery App and/or 我的任务,我总是收到以下输出:
ERROR in consumer: Received unregistered task of type 'my_celery_task'. The message has been ignored and discarded. Did you remember to import the module containing this task? Or maybe you're using relative imports? Please see http://docs.celeryq.org/en/latest/internals/protocol.html for more information.
请注意,我使用的是老式的基于 class 的任务方法,而不是使用装饰器将函数转换为 classes。
无论使用哪个 Celery Pytest fixture 来获取 Celery App 实例,该特定实例都需要注册 Celery 任务。
在这种情况下,您使用的是 Celery 会话实例:
@pytest.mark.usefixtures("celery_session_app")
要为所有测试注册您的 Celery 任务,您可以将以下内容添加到“conftest.py”。
conftest.py
import pytest
from my_application import MyCeleryTask
@pytest.mark.usefixtures("celery_session_app")
@pytest.mark.usefixtures("celery_session_worker")
@pytest.fixture(scope="session", autouse=True)
def celery_register_tasks(celery_session_app):
celery_session_app.register_task(MyCeleryTask)
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": REDIS_URL,
"result_backend": REDIS_URL,
}