Django, ImportError: cannot import name 'task' from 'celery'
Django, ImportError: cannot import name 'task' from 'celery'
我有一个使用 Celery 4.4.2 版的 Django 应用程序,它工作正常。
from celery import task
import logging
@task(ignore_result=True)
def log_user_activity(user_id):
try:
logging.info(user_id)
except Exception as e:
logging.error(str(e))
当我尝试将 Celery 版本更新到 v5.2.2 时,出现以下错误:
ImportError: cannot import name 'task' from 'celery'
谁能帮忙把任务替换成什么?
他们在这里仍然有相同的例子。
https://github.com/celery/celery/blob/v5.2.2/examples/celery_http_gateway/tasks.py
此 API 已弃用,然后 removed in 5.0。
该页面建议更改
from celery import task
进入
from celery import shared_task
还有其他更改不适用于您发布的代码段,但可能适用于您的其余代码。有关详细信息,请参阅该页面(以及文档的其余部分,尤其是 Upgrading from Celery 4.x 部分)。
Celery 5.0 中弃用了一些旧的东西,最新版本的 celery 工作正常,新版本中添加了大部分新东西。 推荐:需要使用最新版本的celery
celery.py
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pj_name.settings')
app = Celery('pj_name')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
之后进入 app/tasks.py 并添加您的第一个调度程序函数。
from pj_name.celery import app
@app.task
def first_task():
pass
如果您使用的是 celery 最新版本,则上述代码块有效。
我有一个使用 Celery 4.4.2 版的 Django 应用程序,它工作正常。
from celery import task
import logging
@task(ignore_result=True)
def log_user_activity(user_id):
try:
logging.info(user_id)
except Exception as e:
logging.error(str(e))
当我尝试将 Celery 版本更新到 v5.2.2 时,出现以下错误:
ImportError: cannot import name 'task' from 'celery'
谁能帮忙把任务替换成什么? 他们在这里仍然有相同的例子。 https://github.com/celery/celery/blob/v5.2.2/examples/celery_http_gateway/tasks.py
此 API 已弃用,然后 removed in 5.0。
该页面建议更改
from celery import task
进入
from celery import shared_task
还有其他更改不适用于您发布的代码段,但可能适用于您的其余代码。有关详细信息,请参阅该页面(以及文档的其余部分,尤其是 Upgrading from Celery 4.x 部分)。
Celery 5.0 中弃用了一些旧的东西,最新版本的 celery 工作正常,新版本中添加了大部分新东西。 推荐:需要使用最新版本的celery
celery.py
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pj_name.settings')
app = Celery('pj_name')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
之后进入 app/tasks.py 并添加您的第一个调度程序函数。
from pj_name.celery import app
@app.task
def first_task():
pass
如果您使用的是 celery 最新版本,则上述代码块有效。