如何在 Django 设置中配置 CELERYBEAT_SCHEDULE?
How to configure CELERYBEAT_SCHEDULE in Django settings?
我可以将其作为独立应用程序 运行 使用,但我无法在 Django 中使用它。
这是独立代码:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='US/Central',
CELERY_ENABLE_UTC=True,
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'tasks.test',
'schedule': crontab(),
},
}
)
@app.task
def test():
with open('test.txt', 'a') as f:
f.write('Hello, World!\n')`
它每分钟向 Rabbitmq 服务器提供数据并写入文件。它就像一个魅力,但当我试图让它在 Django 中工作时,我得到这个错误:
Did you remember to import the module containing this task? Or maybe you are using relative imports? Please see ____ for
more information.
邮件正文的完整内容为:{'retries': 0, 'eta': None,
'kwargs':{},'taskset':None,'timelimit':[None,None],'callbacks':
None, 'task': 'proj.test', 'args': [], 'expires': None, 'id':
'501ca998-b5eb-4ba4-98a8-afabda9e88dd','utc':正确,'errbacks':None,
'chord': None} (246b) 追溯(最近调用最后):文件
"/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py",
第 456 行,在 on_task_received
strategies[name](message, body, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Scheduler: 发送到期任务测试 (proj.test)
[2016-06-16 01:16:00,055: ERROR/MainProcess] 收到未注册
'proj.test'.
类型的任务
这是我在 Django 中的代码:
# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'proj.test',
'schedule': crontab(),
}
}
celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
task.py
from __future__ import absolute_import
from celery import shared_task
@shared_task
def test():
with open('test.txt', 'w') as f:
print('Hello, World', file=f)
init.py
from __future__ import absolute_import
from .celery import app as celery_app
如有任何想法,我们将不胜感激。谢谢。
你为什么不试试下面的方法,让我知道它是否适合你。它对我有用。
在settings.py
CELERYBEAT_SCHEDULE = {
'my_scheduled_job': {
'task': 'run_scheduled_jobs', # the same goes in the task name
'schedule': crontab(),
},
}
并且在tasks.py..
from celery.task import task # notice the import of task and not shared task.
@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
# do whatever stuff you do
return True
但是如果你正在寻找 shared_task 那么..
@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
# do what you want here..
return True
我将共享任务用于异步作业。所以我需要从如下函数中调用它。
在views.py/或anywhere.py在你的项目应用
def some_function():
my_shared_task.apply_async(countdown= in_seconds)
return True
以防万一,如果您忘记了,请记住包含您正在尝试 运行 任务的应用程序..
INSTALLED_APPS = [...
'my_app'...
] # include app
我确信这种方法工作正常.. 谢谢
我可以将其作为独立应用程序 运行 使用,但我无法在 Django 中使用它。
这是独立代码:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='US/Central',
CELERY_ENABLE_UTC=True,
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'tasks.test',
'schedule': crontab(),
},
}
)
@app.task
def test():
with open('test.txt', 'a') as f:
f.write('Hello, World!\n')`
它每分钟向 Rabbitmq 服务器提供数据并写入文件。它就像一个魅力,但当我试图让它在 Django 中工作时,我得到这个错误:
Did you remember to import the module containing this task? Or maybe you are using relative imports? Please see ____ for more information.
邮件正文的完整内容为:{'retries': 0, 'eta': None, 'kwargs':{},'taskset':None,'timelimit':[None,None],'callbacks': None, 'task': 'proj.test', 'args': [], 'expires': None, 'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd','utc':正确,'errbacks':None, 'chord': None} (246b) 追溯(最近调用最后):文件 "/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py", 第 456 行,在 on_task_received strategies[name](message, body, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Scheduler: 发送到期任务测试 (proj.test) [2016-06-16 01:16:00,055: ERROR/MainProcess] 收到未注册 'proj.test'.
类型的任务
这是我在 Django 中的代码:
# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'proj.test',
'schedule': crontab(),
}
}
celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
task.py
from __future__ import absolute_import
from celery import shared_task
@shared_task
def test():
with open('test.txt', 'w') as f:
print('Hello, World', file=f)
init.py
from __future__ import absolute_import
from .celery import app as celery_app
如有任何想法,我们将不胜感激。谢谢。
你为什么不试试下面的方法,让我知道它是否适合你。它对我有用。
在settings.py
CELERYBEAT_SCHEDULE = {
'my_scheduled_job': {
'task': 'run_scheduled_jobs', # the same goes in the task name
'schedule': crontab(),
},
}
并且在tasks.py..
from celery.task import task # notice the import of task and not shared task.
@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
# do whatever stuff you do
return True
但是如果你正在寻找 shared_task 那么..
@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
# do what you want here..
return True
我将共享任务用于异步作业。所以我需要从如下函数中调用它。
在views.py/或anywhere.py在你的项目应用
def some_function():
my_shared_task.apply_async(countdown= in_seconds)
return True
以防万一,如果您忘记了,请记住包含您正在尝试 运行 任务的应用程序..
INSTALLED_APPS = [...
'my_app'...
] # include app
我确信这种方法工作正常.. 谢谢