Celery add_periodic_task 在 uwsgi 环境中阻塞 Django 运行
Celery add_periodic_task blocks Django running in uwsgi environment
我编写了一个模块,它根据项目设置中的字典列表(通过 django.conf.settings
导入)动态添加定期的芹菜任务。
我使用函数 add_tasks
来执行此操作,该函数安排要使用设置中给出的特定 uuid
调用的函数:
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
像建议的那样 我使用 on_after_configure.connect
信号调用 celery.py
中的函数:
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
此设置对 celery beat
和 celery worker
都适用,但破坏了我使用 uwsgi
为我的 django 应用程序提供服务的设置。 Uwsgi
运行 一直很顺利,直到视图代码第一次使用 celery 的 .delay()
方法发送任务。那时似乎 celery 在 uwsgi
中初始化,但在上面的代码中永远阻塞。如果我 运行 从命令行手动执行此操作,然后在它阻塞时中断,我会得到以下(缩短的)堆栈跟踪:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'data'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(SHORTENED HERE. Just contained the trace from the console through my call to this function)
File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
my_task.s(new_task['uuid']),
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
return loc(*self.__args, **self.__kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
return app.tasks[
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
self.finalize(auto=True)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
with self._finalize_mutex:
获取互斥量似乎有问题。
目前我正在使用一种解决方法来检测 sys.argv[0]
是否包含 uwsgi
然后不添加周期性任务,因为只有 beat
需要任务,但我想了解这里出了什么问题更永久地解决问题。
这个问题是否与使用 uwsgi 多线程或多进程有关,其中一个 thread/process 持有另一个需要的互斥量?
如果有任何提示可以帮助我解决问题,我将不胜感激。谢谢。
我正在使用:Django 1.11.7 和 Celery 4.1.0
编辑 1
我已经为这个问题创建了一个最小的设置:
celery.py:
import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
60,
my_task.s(),
name='Testtask'
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py:
from celery import shared_task
@shared_task()
def my_task():
print('ran')
确保 CELERY_TASK_ALWAYS_EAGER=False 并且您有一个有效的消息队列。
运行:
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
等待大约 10 秒再中断以查看上述错误。
你能试试那个信号吗@app.on_after_finalize.connect
:
工作项目 celery==4.1.0
、Django==2.0
、django-celery-beat==1.1.0
和 django-celery-results==1.0.1
中的一些快速片段
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
""" setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify
based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE
"""
for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items():
sender.add_periodic_task(
task_config['schedule'],
fetch_shopify.s(**task_config['kwargs']['resource_name']),
name=task_name
)
一块CELERY_BEAT_SCHEDULE
:
CELERY_BEAT_SCHEDULE = {
'fetch_shopify_orders': {
'task': 'shopify.tasks.fetch_shopify',
'schedule': crontab(hour="*/3", minute=0),
'kwargs': {
'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS
}
}
}
所以,我发现 @shared_task
装饰器造成了问题。当我在信号调用的函数中声明任务时,我可以绕过这个问题,如下所示:
def add_tasks(celery):
@celery.task
def my_task(uuid):
print(uuid)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
这个解决方案实际上对我有用,但我还有一个问题:我在一个可插入的应用程序中使用这个代码,所以我不能直接访问信号处理程序之外的 celery 应用程序,但我想也可以从其他代码中调用 my_task
函数。通过在函数内定义它,它在函数外不可用,所以我无法在其他任何地方导入它。
我可能可以通过在信号函数之外定义任务函数来解决这个问题,并在此处和 tasks.py
中将其与不同的装饰器一起使用。我想知道除了我可以在 tasks.py
中使用的 @shared_task
装饰器之外是否还有一个装饰器不会造成问题。
当前最佳解决方案可能是:
task_app.__init__.py:
def my_task(uuid):
# do stuff
print(uuid)
def add_tasks(celery):
celery_my_task = celery.task(my_task)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
celery_my_task(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
task_app.tasks.py:
from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)
myapp.celery.py:
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
from task_app import add_tasks
add_tasks(sender)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
我编写了一个模块,它根据项目设置中的字典列表(通过 django.conf.settings
导入)动态添加定期的芹菜任务。
我使用函数 add_tasks
来执行此操作,该函数安排要使用设置中给出的特定 uuid
调用的函数:
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
像建议的那样on_after_configure.connect
信号调用 celery.py
中的函数:
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
此设置对 celery beat
和 celery worker
都适用,但破坏了我使用 uwsgi
为我的 django 应用程序提供服务的设置。 Uwsgi
运行 一直很顺利,直到视图代码第一次使用 celery 的 .delay()
方法发送任务。那时似乎 celery 在 uwsgi
中初始化,但在上面的代码中永远阻塞。如果我 运行 从命令行手动执行此操作,然后在它阻塞时中断,我会得到以下(缩短的)堆栈跟踪:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'data'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(SHORTENED HERE. Just contained the trace from the console through my call to this function)
File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
my_task.s(new_task['uuid']),
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
return loc(*self.__args, **self.__kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
return app.tasks[
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
self.finalize(auto=True)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
with self._finalize_mutex:
获取互斥量似乎有问题。
目前我正在使用一种解决方法来检测 sys.argv[0]
是否包含 uwsgi
然后不添加周期性任务,因为只有 beat
需要任务,但我想了解这里出了什么问题更永久地解决问题。
这个问题是否与使用 uwsgi 多线程或多进程有关,其中一个 thread/process 持有另一个需要的互斥量?
如果有任何提示可以帮助我解决问题,我将不胜感激。谢谢。
我正在使用:Django 1.11.7 和 Celery 4.1.0
编辑 1
我已经为这个问题创建了一个最小的设置:
celery.py:
import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
60,
my_task.s(),
name='Testtask'
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py:
from celery import shared_task
@shared_task()
def my_task():
print('ran')
确保 CELERY_TASK_ALWAYS_EAGER=False 并且您有一个有效的消息队列。
运行:
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
等待大约 10 秒再中断以查看上述错误。
你能试试那个信号吗@app.on_after_finalize.connect
:
工作项目 celery==4.1.0
、Django==2.0
、django-celery-beat==1.1.0
和 django-celery-results==1.0.1
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
""" setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify
based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE
"""
for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items():
sender.add_periodic_task(
task_config['schedule'],
fetch_shopify.s(**task_config['kwargs']['resource_name']),
name=task_name
)
一块CELERY_BEAT_SCHEDULE
:
CELERY_BEAT_SCHEDULE = {
'fetch_shopify_orders': {
'task': 'shopify.tasks.fetch_shopify',
'schedule': crontab(hour="*/3", minute=0),
'kwargs': {
'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS
}
}
}
所以,我发现 @shared_task
装饰器造成了问题。当我在信号调用的函数中声明任务时,我可以绕过这个问题,如下所示:
def add_tasks(celery):
@celery.task
def my_task(uuid):
print(uuid)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
这个解决方案实际上对我有用,但我还有一个问题:我在一个可插入的应用程序中使用这个代码,所以我不能直接访问信号处理程序之外的 celery 应用程序,但我想也可以从其他代码中调用 my_task
函数。通过在函数内定义它,它在函数外不可用,所以我无法在其他任何地方导入它。
我可能可以通过在信号函数之外定义任务函数来解决这个问题,并在此处和 tasks.py
中将其与不同的装饰器一起使用。我想知道除了我可以在 tasks.py
中使用的 @shared_task
装饰器之外是否还有一个装饰器不会造成问题。
当前最佳解决方案可能是:
task_app.__init__.py:
def my_task(uuid):
# do stuff
print(uuid)
def add_tasks(celery):
celery_my_task = celery.task(my_task)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
celery_my_task(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
task_app.tasks.py:
from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)
myapp.celery.py:
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
from task_app import add_tasks
add_tasks(sender)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)