在 RESTful 路线中触发 Celery 任务
Trigger Celery task in a RESTful Route
我想为我的 Celery
任务设置路线并监控它们。
这是我在 flask
应用 运行 localhost:5000
中的代码
background.py
任务:
@celery.task(queue='cache')
def cache_user_tracks_with_features():
return {'status': 'Task completed!'}
路线:
@task_bp.route('/filter', methods=['GET', 'POST'])
def cache_user_with_features():
# task
task = cache_user_tracks_with_features.apply_async()
while not task.ready():
sleep(2)
response_object = {
'status': 'fail',
'message': 'User does not exist'
}
try:
user = User.query.filter_by(id=1)).first()
if not user:
return jsonify(response_object), 404
else:
response_object = {
'status': 'success',
'data': {
'task_id': task.id,
'username': user.username,
'email': user.email,
'active': user.active
}
}
return jsonify(response_object), 200
except ValueError:
return jsonify(response_object), 404
Trigger attempt
我正尝试在终端使用 CURL
对其进行测试,如下所示:
$ curl -X POST http://localhost:5001/filter -H "Content-Type: application/json"
但是要么我得到 curl: (52) Empty reply from server
要么它就挂起。如果我从函数中删除 task
和 curl POST
,我得到:
{
"data": {
"active": true,
"email": "me@mac.com",
"username": "me"
},
"status": "success"
}
Docker
日志给我:
nginx_1 | 172.21.0.1 - - [03/Apr/2019:22:26:41 +0000] "GET /manifest.json HTTP/1.1" 304 0 "http://localhost/filter" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" "-"
web-db_1 | 2019-04-01 19:52:52.415 UTC [1] LOG: background worker "logical replication launcher" (PID 25) exited with exit code 1
celery_1 | worker: Warm shutdown (MainProcess)
celery_1 |
celery_1 | -------------- celery@fb24d4bd2089 v4.2.1 (windowlicker)
celery_1 | ---- **** -----
celery_1 | --- * *** * -- Linux-4.9.125-linuxkit-x86_64-with 2019-04-06 21:34:38
celery_1 | -- * - **** ---
celery_1 | - ** ---------- [config]
celery_1 | - ** ---------- .> app: project:0x7f9923d8a9e8
celery_1 | - ** ---------- .> transport: redis://redis:6379/0
celery_1 | - ** ---------- .> results: redis://redis:6379/0
celery_1 | - *** --- * --- .> concurrency: 2 (prefork)
celery_1 | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
celery_1 | --- ***** -----
celery_1 | -------------- [queues]
celery_1 | .> cache exchange=cache(direct) key=cache
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . project.api.routes.background.cache_user_tracks_with_analysis
celery_1 | . project.api.routes.background.cache_user_tracks_with_features
这就是我在 docker-compose
文件中配置 Celery
和 Flower
(Celery
监控)的方式:
docker-compose-dev.yml
version: '3.6'
services:
celery:
image: dev3_web
restart: always
volumes:
- ./services/web:/usr/src/app
- ./services/web/logs:/usr/src/app/logs
command: celery worker -A celery_worker.celery --loglevel=INFO --logfile=logs/celery.log -Q cache
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
links:
- redis:redis
- web-db
redis:
image: redis:5.0.3-alpine
restart: always
expose:
- '6379'
ports:
- '6379:6379'
monitor:
image: dev3_web
ports:
- 5555:5555
command: flower -A celery_worker.celery --port=5555 --broker=redis://redis:6379/0
depends_on:
- web
- redis
web/logs/celery_log
[2019-04-02 02:51:07,338: INFO/MainProcess] Connected to redis://redis:6379/0
[2019-04-02 02:51:07,375: INFO/MainProcess] mingle: searching for neighbors
[2019-04-02 02:51:08,491: INFO/MainProcess] mingle: all alone
[2019-04-02 02:51:08,582: INFO/MainProcess] celery@59ed7459ac14 ready.
[2019-04-02 02:51:08,661: INFO/MainProcess] Events of group {task} enabled by remote.
Flower
在仪表板上显示具有 活动 状态 的工作人员:
Celery Instantiation
# services/web/project/__init__.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
# instantiate the db
db = SQLAlchemy()
# background processes instance
celery = Celery(__name__, broker='redis://redis:6379/0') // <------- instant.
def create_app(script_info=None):
from .api import routes
# instantiate the app
app = Flask(__name__)
# set config
app_settings = os.getenv('APP_SETTINGS')
app.config.from_object(app_settings)
# set up extensions
db.init_app(app)
# register blueprints
routes.init_app(app)
#models.init_app(app)
celery.conf.update(app.config)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {'app': app, 'db': db}
return app
config.py
class DevelopmentConfig(BaseConfig):
"""Development configuration"""
DEBUG_TB_ENABLED = True
DEBUG = True
BCRYPT_LOG_ROUNDS = 4
#set key
SECRET_KEY = os.environ.get('SECRET_KEY')
#sqlalchemy
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SEVER_NAME = 'http://127.0.0.1:8080'
# celery broker
REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = os.environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST,
port=str(REDIS_PORT)))
INSTALLED_APPS = ['routes']
# celery config
CELERYD_CONCURRENCY = 10
CELERY_BROKER_URL = BROKER_URL
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
CELERY_IMPORTS = ('project.api.routes.background',)
QUESTION
我错过了什么?如何触发此 Celery 任务并对其进行监控?
我不知道到底出了什么问题(看起来没问题)...有几种方法(就像 Celery 的任何东西 - 有很多方法可以实现某些东西)来实现你想做的事情:
1) 使用apply_async() 并轮询完成执行。类似于:
res = cache_user_tracks_with_features.apply_async("""parameters here""")
while not res.ready():
sleep(2)
# business logic
2) 将 apply_async() 与 link 一起用于任务完成后要执行的任务。
res = cache_user_tracks_with_features.apply_async(
"""parameters here""",
link=task_to_run_when_finished)
Celery 也有 link_error 参数,所以你可以给它一个函数来在发生错误时执行。
3) 使用 Celery 工作流程。使用 cache_user_tracks_with_features 和完成其余任务的任务创建一个链。
或者可能是完全不同的原因给您带来了麻烦...
问题出在config.py
:
REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = os.environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST,
port=str(REDIS_PORT)))
INSTALLED_APPS = ['routes']
# celery config
CELERYD_CONCURRENCY = 10
CELERY_BROKER_URL = BROKER_URL #<-------- THIS WAS OVERRIDING
覆盖 docker-compose
环境:
environment:
- CELERY_BROKER=redis://redis:6379/0 #<------- THIS WAS BEING OVERRIDEN
- CELERY_RESULT_BACKEND=redis://redis:6379/0
只需在 config.py
和 docker environment
中将 CELERY_BROKER_URL
设置为 redis://redis:6379/0
,即可解决问题:任务现在由工作人员和流程执行正在被花监控
我想为我的 Celery
任务设置路线并监控它们。
这是我在 flask
应用 运行 localhost:5000
background.py
任务:
@celery.task(queue='cache')
def cache_user_tracks_with_features():
return {'status': 'Task completed!'}
路线:
@task_bp.route('/filter', methods=['GET', 'POST'])
def cache_user_with_features():
# task
task = cache_user_tracks_with_features.apply_async()
while not task.ready():
sleep(2)
response_object = {
'status': 'fail',
'message': 'User does not exist'
}
try:
user = User.query.filter_by(id=1)).first()
if not user:
return jsonify(response_object), 404
else:
response_object = {
'status': 'success',
'data': {
'task_id': task.id,
'username': user.username,
'email': user.email,
'active': user.active
}
}
return jsonify(response_object), 200
except ValueError:
return jsonify(response_object), 404
Trigger attempt
我正尝试在终端使用 CURL
对其进行测试,如下所示:
$ curl -X POST http://localhost:5001/filter -H "Content-Type: application/json"
但是要么我得到 curl: (52) Empty reply from server
要么它就挂起。如果我从函数中删除 task
和 curl POST
,我得到:
{
"data": {
"active": true,
"email": "me@mac.com",
"username": "me"
},
"status": "success"
}
Docker
日志给我:
nginx_1 | 172.21.0.1 - - [03/Apr/2019:22:26:41 +0000] "GET /manifest.json HTTP/1.1" 304 0 "http://localhost/filter" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" "-"
web-db_1 | 2019-04-01 19:52:52.415 UTC [1] LOG: background worker "logical replication launcher" (PID 25) exited with exit code 1
celery_1 | worker: Warm shutdown (MainProcess)
celery_1 |
celery_1 | -------------- celery@fb24d4bd2089 v4.2.1 (windowlicker)
celery_1 | ---- **** -----
celery_1 | --- * *** * -- Linux-4.9.125-linuxkit-x86_64-with 2019-04-06 21:34:38
celery_1 | -- * - **** ---
celery_1 | - ** ---------- [config]
celery_1 | - ** ---------- .> app: project:0x7f9923d8a9e8
celery_1 | - ** ---------- .> transport: redis://redis:6379/0
celery_1 | - ** ---------- .> results: redis://redis:6379/0
celery_1 | - *** --- * --- .> concurrency: 2 (prefork)
celery_1 | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
celery_1 | --- ***** -----
celery_1 | -------------- [queues]
celery_1 | .> cache exchange=cache(direct) key=cache
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . project.api.routes.background.cache_user_tracks_with_analysis
celery_1 | . project.api.routes.background.cache_user_tracks_with_features
这就是我在 docker-compose
文件中配置 Celery
和 Flower
(Celery
监控)的方式:
docker-compose-dev.yml
version: '3.6'
services:
celery:
image: dev3_web
restart: always
volumes:
- ./services/web:/usr/src/app
- ./services/web/logs:/usr/src/app/logs
command: celery worker -A celery_worker.celery --loglevel=INFO --logfile=logs/celery.log -Q cache
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
links:
- redis:redis
- web-db
redis:
image: redis:5.0.3-alpine
restart: always
expose:
- '6379'
ports:
- '6379:6379'
monitor:
image: dev3_web
ports:
- 5555:5555
command: flower -A celery_worker.celery --port=5555 --broker=redis://redis:6379/0
depends_on:
- web
- redis
web/logs/celery_log
[2019-04-02 02:51:07,338: INFO/MainProcess] Connected to redis://redis:6379/0
[2019-04-02 02:51:07,375: INFO/MainProcess] mingle: searching for neighbors
[2019-04-02 02:51:08,491: INFO/MainProcess] mingle: all alone
[2019-04-02 02:51:08,582: INFO/MainProcess] celery@59ed7459ac14 ready.
[2019-04-02 02:51:08,661: INFO/MainProcess] Events of group {task} enabled by remote.
Flower
在仪表板上显示具有 活动 状态 的工作人员:
Celery Instantiation
# services/web/project/__init__.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
# instantiate the db
db = SQLAlchemy()
# background processes instance
celery = Celery(__name__, broker='redis://redis:6379/0') // <------- instant.
def create_app(script_info=None):
from .api import routes
# instantiate the app
app = Flask(__name__)
# set config
app_settings = os.getenv('APP_SETTINGS')
app.config.from_object(app_settings)
# set up extensions
db.init_app(app)
# register blueprints
routes.init_app(app)
#models.init_app(app)
celery.conf.update(app.config)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {'app': app, 'db': db}
return app
config.py
class DevelopmentConfig(BaseConfig):
"""Development configuration"""
DEBUG_TB_ENABLED = True
DEBUG = True
BCRYPT_LOG_ROUNDS = 4
#set key
SECRET_KEY = os.environ.get('SECRET_KEY')
#sqlalchemy
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SEVER_NAME = 'http://127.0.0.1:8080'
# celery broker
REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = os.environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST,
port=str(REDIS_PORT)))
INSTALLED_APPS = ['routes']
# celery config
CELERYD_CONCURRENCY = 10
CELERY_BROKER_URL = BROKER_URL
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
CELERY_IMPORTS = ('project.api.routes.background',)
QUESTION
我错过了什么?如何触发此 Celery 任务并对其进行监控?
我不知道到底出了什么问题(看起来没问题)...有几种方法(就像 Celery 的任何东西 - 有很多方法可以实现某些东西)来实现你想做的事情:
1) 使用apply_async() 并轮询完成执行。类似于:
res = cache_user_tracks_with_features.apply_async("""parameters here""")
while not res.ready():
sleep(2)
# business logic
2) 将 apply_async() 与 link 一起用于任务完成后要执行的任务。
res = cache_user_tracks_with_features.apply_async(
"""parameters here""",
link=task_to_run_when_finished)
Celery 也有 link_error 参数,所以你可以给它一个函数来在发生错误时执行。
3) 使用 Celery 工作流程。使用 cache_user_tracks_with_features 和完成其余任务的任务创建一个链。
或者可能是完全不同的原因给您带来了麻烦...
问题出在config.py
:
REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = os.environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST,
port=str(REDIS_PORT)))
INSTALLED_APPS = ['routes']
# celery config
CELERYD_CONCURRENCY = 10
CELERY_BROKER_URL = BROKER_URL #<-------- THIS WAS OVERRIDING
覆盖 docker-compose
环境:
environment:
- CELERY_BROKER=redis://redis:6379/0 #<------- THIS WAS BEING OVERRIDEN
- CELERY_RESULT_BACKEND=redis://redis:6379/0
只需在 config.py
和 docker environment
中将 CELERY_BROKER_URL
设置为 redis://redis:6379/0
,即可解决问题:任务现在由工作人员和流程执行正在被花监控