Scikit-Learn 的 Celery 任务不使用超过一个内核
Celery task with Scikit-Learn doesn't use more than a single core
我正在尝试创建一个 API 端点,它将在 Django 后端异步启动分类任务,我希望稍后能够检索结果。这是我到目前为止所做的:
celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
app = Celery("backend")
app.config_from_object("django.conf:settings", namespace = "CELERY")
app.autodiscover_tasks()
tasks.py
from celery import shared_task
@shared_task
def Pipeline(taskId):
# ...read file, preprocess, train_test_split
clf = GridSearchCV(
SVC(), paramGrid, cv=5, n_jobs = -1
)
clf.fit(XTrain, yTrain)
# ...compute the metrics
用于执行任务的 Django 视图:views.py
class TaskExecuteView(APIView):
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
Pipeline.delay(taskId)
# ... Database updates
问题是启动的任务只使用了 CPU 的一个核心,因此需要很长时间才能完成。我也看到了这个错误:Warning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
。有办法解决吗?
我知道 2018 年关于 SO 的一个类似问题与此类似,但是 post 没有明确的答案,所以我正在寻找一个到目前为止没有运气的解决方案。感谢您的宝贵时间和任何您可能想要腾出的时间 suggestions/solutions,尽管我真的不想更改技术堆栈,除非我可以避免它。
到目前为止我尝试过的:
在 celery 任务中使用 threading.current_thread().setName("MainThread")
作为第一行,但这没有用。
编辑:
requirements.txt
amqp==5.0.2
asgiref==3.3.1
billiard==3.6.3.0
celery==5.0.5
certifi==2020.12.5
cffi==1.14.5
chardet==4.0.0
click==7.1.2
click-didyoumean==0.0.3
click-plugins==1.1.1
click-repl==0.1.6
cryptography==3.4.6
defusedxml==0.7.1
dj-rest-auth==2.1.3
Django==3.1.3
django-allauth==0.44.0
django-cors-headers==3.6.0
djangorestframework==3.12.2
djangorestframework-simplejwt==4.6.0
idna==2.10
importlib-metadata==3.3.0
joblib==1.0.0
kombu==5.0.2
mccabe==0.6.1
numpy==1.19.4
oauthlib==3.1.0
pandas==1.2.0
Pillow==8.0.1
prompt-toolkit==3.0.8
pycodestyle==2.6.0
pycparser==2.20
PyJWT==2.0.1
python-dateutil==2.8.1
python3-openid==3.2.0
pytz==2020.4
redis==3.5.3
requests==2.25.1
requests-oauthlib==1.3.0
scikit-learn==0.24.0
scipy==1.6.0
sqlparse==0.4.1
threadpoolctl==2.1.0
typed-ast==1.4.1
typing-extensions==3.7.4.3
urllib3==1.26.4
vine==5.0.0
wcwidth==0.2.5
wrapt==1.11.2
zipp==3.4.0
我不知道这对你是否有用。我最近遇到芹菜工人卡住并阻塞线路的问题。
问题是 celery 应该自动产生与服务器有 CPU 一样多的工人,但我发现这个数字不足以满足我的使用需求。
我解决了在容器命令的芹菜执行行中添加 --concurrency=10
的问题。如果从 CLI 启动 celery,则可以手动添加此标志。
完整的执行命令是这样的:
/path/celery -A my_proj worker --loglevel=INFO --logfile=/var/log/celery.log --concurrency=10
无论如何这都会产生 10 个工人。
我通过切换到 django_rq
(Github link to the project) 解决了这个问题。
我自己不理解 parallel/distributed 计算的概念,但是 the issue was that celery
tasks do not support multiprocessing inside them。所以基本上,我不能在守护进程中生成其他进程。
我不知道 django_rq
是如何处理这个问题的,但我只更改了代码中的两行就解决了这个问题。
from django_rq import job
@job('default', timeout=3600) <--- changed here
def Pipeline(taskId):
# ...read file, preprocess, train_test_split
clf = GridSearchCV(
SVC(), paramGrid, cv=5, n_jobs = -1
)
clf.fit(XTrain, yTrain)
# ...compute the metrics
...其余 API 保持不变。
一旦我理解了并行计算的核心概念以及为什么 django_rq
成功而 celery
失败,我将更新此答案。
我正在尝试创建一个 API 端点,它将在 Django 后端异步启动分类任务,我希望稍后能够检索结果。这是我到目前为止所做的:
celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
app = Celery("backend")
app.config_from_object("django.conf:settings", namespace = "CELERY")
app.autodiscover_tasks()
tasks.py
from celery import shared_task
@shared_task
def Pipeline(taskId):
# ...read file, preprocess, train_test_split
clf = GridSearchCV(
SVC(), paramGrid, cv=5, n_jobs = -1
)
clf.fit(XTrain, yTrain)
# ...compute the metrics
用于执行任务的 Django 视图:views.py
class TaskExecuteView(APIView):
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
Pipeline.delay(taskId)
# ... Database updates
问题是启动的任务只使用了 CPU 的一个核心,因此需要很长时间才能完成。我也看到了这个错误:Warning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
。有办法解决吗?
我知道 2018 年关于 SO 的一个类似问题与此类似,但是 post 没有明确的答案,所以我正在寻找一个到目前为止没有运气的解决方案。感谢您的宝贵时间和任何您可能想要腾出的时间 suggestions/solutions,尽管我真的不想更改技术堆栈,除非我可以避免它。
到目前为止我尝试过的:
在 celery 任务中使用 threading.current_thread().setName("MainThread")
作为第一行,但这没有用。
编辑:
requirements.txt
amqp==5.0.2
asgiref==3.3.1
billiard==3.6.3.0
celery==5.0.5
certifi==2020.12.5
cffi==1.14.5
chardet==4.0.0
click==7.1.2
click-didyoumean==0.0.3
click-plugins==1.1.1
click-repl==0.1.6
cryptography==3.4.6
defusedxml==0.7.1
dj-rest-auth==2.1.3
Django==3.1.3
django-allauth==0.44.0
django-cors-headers==3.6.0
djangorestframework==3.12.2
djangorestframework-simplejwt==4.6.0
idna==2.10
importlib-metadata==3.3.0
joblib==1.0.0
kombu==5.0.2
mccabe==0.6.1
numpy==1.19.4
oauthlib==3.1.0
pandas==1.2.0
Pillow==8.0.1
prompt-toolkit==3.0.8
pycodestyle==2.6.0
pycparser==2.20
PyJWT==2.0.1
python-dateutil==2.8.1
python3-openid==3.2.0
pytz==2020.4
redis==3.5.3
requests==2.25.1
requests-oauthlib==1.3.0
scikit-learn==0.24.0
scipy==1.6.0
sqlparse==0.4.1
threadpoolctl==2.1.0
typed-ast==1.4.1
typing-extensions==3.7.4.3
urllib3==1.26.4
vine==5.0.0
wcwidth==0.2.5
wrapt==1.11.2
zipp==3.4.0
我不知道这对你是否有用。我最近遇到芹菜工人卡住并阻塞线路的问题。 问题是 celery 应该自动产生与服务器有 CPU 一样多的工人,但我发现这个数字不足以满足我的使用需求。
我解决了在容器命令的芹菜执行行中添加 --concurrency=10
的问题。如果从 CLI 启动 celery,则可以手动添加此标志。
完整的执行命令是这样的:
/path/celery -A my_proj worker --loglevel=INFO --logfile=/var/log/celery.log --concurrency=10
无论如何这都会产生 10 个工人。
我通过切换到 django_rq
(Github link to the project) 解决了这个问题。
我自己不理解 parallel/distributed 计算的概念,但是 the issue was that celery
tasks do not support multiprocessing inside them。所以基本上,我不能在守护进程中生成其他进程。
我不知道 django_rq
是如何处理这个问题的,但我只更改了代码中的两行就解决了这个问题。
from django_rq import job
@job('default', timeout=3600) <--- changed here
def Pipeline(taskId):
# ...read file, preprocess, train_test_split
clf = GridSearchCV(
SVC(), paramGrid, cv=5, n_jobs = -1
)
clf.fit(XTrain, yTrain)
# ...compute the metrics
...其余 API 保持不变。
一旦我理解了并行计算的核心概念以及为什么 django_rq
成功而 celery
失败,我将更新此答案。