Celery worker 从哪个目录开始
Celery worker which directory to start from
我需要一些关于芹菜工人的帮助。我特别不明白 celery worker 命令需要从哪里(哪个目录)被触发,它背后的概念是什么,以及一些关于进口的事情。
假设我有以下目录结构:
.
├── __init__.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
根目录下的.
是当前工作目录,名为project
每个 taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件看起来像这样:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
以下是entry.py
的内容:
import json
from flask_cors import CORS
from flask import Flask, Response, render_template
from flask import request, jsonify, redirect
from functools import wraps
<what would be the import statement to import all the tasks>
_name_ = "project_x"
app = Flask(_name_)
@app.route("/api1", methods=['POST'])
def api1():
req = request.jsonify
if not req:
return jsonify(success=False, msg="Missing request parameters", code="1")
else:
param1 = req.get('p1')
param2 = req.get('p2')
tId = startTask()
return jsonify(success="True", msg="All Good", taskId=tId)
def startTask():
tId = "abcd123"
created_task = state1.family1.task1.subtask(queue='q1')
created_task.delay()
return tId
if __name__ == '__main__':
app.run(debug=True, host="192.168.1.7", port="4444")
entry.py 是将触发 api1 的烧瓶应用程序,然后根据我想要启动特定任务的参数。
下面是我的问题:
- 导入
entry.py
文件中所有任务的导入语句是什么
- 我从哪里开始工作。我的意思是我应该从哪个目录启动
Celery -A <directory name> worker -l info
命令,为什么?
- 在许多示例中,我看到任务和 CeleryApp 文件之间存在明显的隔离。有人可以建议什么是安排我的任务和 celery 配置等的更好方法,以及上述 2 个问题如何与这个新提议的结构保持一致?
好的,希望这可能有所帮助。我会按照你的要求反驳
In many examples I saw that there is a clear segregation between tasks
and CeleryApp file. Could someone please suggest what would be a
better way to arrange my tasks, and celery configs etc. and how would
the above 2 questions align with this new proposed structure ?
我在您添加的片段中看到的第一个问题是,您拥有的每个 taskn.py
都有自己的 celery
实例。您需要在每个 taskn.py
之间共享此实例。
我推荐的是创建一个celery_app.py
my_app
├── __init__.py
├── entry.py
├── celery_app.py
│ ├── ...
在此文件中,您将创建芹菜实例
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
celery_app.conf.imports = [
'state1.family1.task1',
'my_app.state1.family1.task2', # Or Maybe
...
]
然后在每个taskn.py
中都可以导入这个实例,每个任务都会注册在同一个celery应用下
from my_app.celery_app import celapp
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
where from do I start the worker. I mean from which directory should I
start the Celery -A worker -l info command and why ?
那么你应该很容易地调用 Celery -A my_app.celery_app worker -l info
因为你的芹菜实例将在模块 my_app、子模块 celery_app
中
what would be the import statement to import all the tasks in the entry.py
最后,从 entry.py
开始,您可以执行 import state1.family1.task1 import t1
并调用 t1.delay()
或任何已注册的任务。
所以听@Patricio 的建议,看来确实是导入错误。我的新目录结构如下所示:
.
├── __init__.py
├── celeryConfig
│ ├── __init__.py
│ └── celeryApp.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
而celeryConfig/celeryApp.py
的内容如下:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost', include=['state1.family1.task1'])
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
taskn.py 的内容类似于:
from celeryConfig.celeryApp import celapp
import time
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
而 entry.py
保持原样,只有一个变化如下:
from state1.family1.task1 import t1
现在当 celery 启动时:
celery -A celeryConfig.celeryApp worker -l info
从根目录,project
,一切正常。作为上述命令的输出,我收到消息
.
.
.
[tasks]
. state1.family1.task1.t1
.
.
.
说明celery已经正确启动,任务确实已经被注册了。所以现在,为了注册所有任务,我可以通读 directory/directories 并在 celeryApp.py
中动态创建 include
列表。 (完成后会 post 详细介绍)
谢谢@Patricio
我需要一些关于芹菜工人的帮助。我特别不明白 celery worker 命令需要从哪里(哪个目录)被触发,它背后的概念是什么,以及一些关于进口的事情。
假设我有以下目录结构:
.
├── __init__.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
根目录下的.
是当前工作目录,名为project
每个 taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件看起来像这样:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
以下是entry.py
的内容:
import json
from flask_cors import CORS
from flask import Flask, Response, render_template
from flask import request, jsonify, redirect
from functools import wraps
<what would be the import statement to import all the tasks>
_name_ = "project_x"
app = Flask(_name_)
@app.route("/api1", methods=['POST'])
def api1():
req = request.jsonify
if not req:
return jsonify(success=False, msg="Missing request parameters", code="1")
else:
param1 = req.get('p1')
param2 = req.get('p2')
tId = startTask()
return jsonify(success="True", msg="All Good", taskId=tId)
def startTask():
tId = "abcd123"
created_task = state1.family1.task1.subtask(queue='q1')
created_task.delay()
return tId
if __name__ == '__main__':
app.run(debug=True, host="192.168.1.7", port="4444")
entry.py 是将触发 api1 的烧瓶应用程序,然后根据我想要启动特定任务的参数。
下面是我的问题:
- 导入
entry.py
文件中所有任务的导入语句是什么 - 我从哪里开始工作。我的意思是我应该从哪个目录启动
Celery -A <directory name> worker -l info
命令,为什么? - 在许多示例中,我看到任务和 CeleryApp 文件之间存在明显的隔离。有人可以建议什么是安排我的任务和 celery 配置等的更好方法,以及上述 2 个问题如何与这个新提议的结构保持一致?
好的,希望这可能有所帮助。我会按照你的要求反驳
In many examples I saw that there is a clear segregation between tasks and CeleryApp file. Could someone please suggest what would be a better way to arrange my tasks, and celery configs etc. and how would the above 2 questions align with this new proposed structure ?
我在您添加的片段中看到的第一个问题是,您拥有的每个 taskn.py
都有自己的 celery
实例。您需要在每个 taskn.py
之间共享此实例。
我推荐的是创建一个celery_app.py
my_app
├── __init__.py
├── entry.py
├── celery_app.py
│ ├── ...
在此文件中,您将创建芹菜实例
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
celery_app.conf.imports = [
'state1.family1.task1',
'my_app.state1.family1.task2', # Or Maybe
...
]
然后在每个taskn.py
中都可以导入这个实例,每个任务都会注册在同一个celery应用下
from my_app.celery_app import celapp
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
where from do I start the worker. I mean from which directory should I start the Celery -A worker -l info command and why ?
那么你应该很容易地调用 Celery -A my_app.celery_app worker -l info
因为你的芹菜实例将在模块 my_app、子模块 celery_app
what would be the import statement to import all the tasks in the entry.py
最后,从 entry.py
开始,您可以执行 import state1.family1.task1 import t1
并调用 t1.delay()
或任何已注册的任务。
所以听@Patricio 的建议,看来确实是导入错误。我的新目录结构如下所示:
.
├── __init__.py
├── celeryConfig
│ ├── __init__.py
│ └── celeryApp.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
而celeryConfig/celeryApp.py
的内容如下:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost', include=['state1.family1.task1'])
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
taskn.py 的内容类似于:
from celeryConfig.celeryApp import celapp
import time
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
而 entry.py
保持原样,只有一个变化如下:
from state1.family1.task1 import t1
现在当 celery 启动时:
celery -A celeryConfig.celeryApp worker -l info
从根目录,project
,一切正常。作为上述命令的输出,我收到消息
.
.
.
[tasks]
. state1.family1.task1.t1
.
.
.
说明celery已经正确启动,任务确实已经被注册了。所以现在,为了注册所有任务,我可以通读 directory/directories 并在 celeryApp.py
中动态创建 include
列表。 (完成后会 post 详细介绍)
谢谢@Patricio