如何使用 Python 检查 Celery/Supervisor 是否为 运行
How to check if Celery/Supervisor is running using Python
如何在 Python 中编写脚本,如果 celery 运行ning 在机器上 (Ubuntu) 则输出?
我的用例。我有一个包含一些任务的简单 python 文件。我没有使用 Django 或 Flask。我使用 supervisor 来 运行 任务队列。例如,
tasks.py
from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
return a + b
主管:
[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info
一切正常,我现在想要一个页面来检查 celery/supervisor 进程是否正在 运行ning。即像这样的东西可能使用 Flask 允许我托管页面,提供 200 状态允许我负载平衡。
例如...
check_status.py
from flask import Flask
app = Flask(__name__)
@app.route('/')
def status_check():
#check supervisor is running
if supervisor:
return render_template('up.html')
else:
return render_template('down.html')
if __name__ == '__main__':
app.run()
使用子流程如何,不确定是否是个好主意:
>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True
更新 09/2020:Jérôme 在此处更新了 Celery 4.3 的答案:
您可以通过代码导入 celery.bin.celery
包 运行 celery status
命令:
import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms
app = celery.Celery('tasks', broker='redis://')
status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()
def celery_is_up():
try:
status.run()
return True
except celery.bin.base.Error as e:
if e.status == celery.platforms.EX_UNAVAILABLE:
return False
raise e
if __name__ == '__main__':
if celery_is_up():
print('Celery up!')
else:
print('Celery not responding...')
根据我的经验,我会设置一条消息来跟踪它是否完成,以便队列负责重试任务。
您可以从 supervisorctl status
输出解析 process state
import subprocess
def is_celery_worker_running():
ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
if ctl_output == 'unix:///var/run/supervisor.sock no such file':
# supervisord not running
return False
elif ctl_output == 'No such process celery_worker':
return False
else:
state = ctl_output.split()[1]
return state == 'RUNNING'
Supervisor 附带一个稀疏的 Web 用户界面。也许你可以使用它。它可以在主管配置中启用。要寻找的关键是 [inet_http_server]
你甚至可以查看那篇文章的源代码来获得实现你自己的想法。
这不适用于 celery,但对于最终来到这里查看 supervisord 是否 运行 的任何人,请检查您的 supervisord.conf
配置文件中为 supervisord 定义的 pid 文件是否存在.如果是这样,那就是运行;如果不是,那就不是。默认的pidfile是/tmp/supervisord.pid,就是我下面用的
import os
import sys
if os.path.isfile("/tmp/supervisord.pid"):
print "supervisord is running."
sys.exit()
受 启发,使用 Celery 4.3.0。
import celery
import celery.bin.base
import celery.bin.control
import celery.platforms
# Importing Celery app from my own application
from my_app.celery import app as celery_app
def celery_running():
"""Test Celery server is available
Inspired by
"""
status = celery.bin.control.status(celery_app)
try:
status.run()
return True
except celery.bin.base.Error as exc:
if exc.status == celery.platforms.EX_UNAVAILABLE:
return False
raise
if __name__ == '__main__':
if celery_is_up():
print('Celery up!')
else:
print('Celery not responding...')
如何在 Python 中编写脚本,如果 celery 运行ning 在机器上 (Ubuntu) 则输出?
我的用例。我有一个包含一些任务的简单 python 文件。我没有使用 Django 或 Flask。我使用 supervisor 来 运行 任务队列。例如,
tasks.py
from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
return a + b
主管:
[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info
一切正常,我现在想要一个页面来检查 celery/supervisor 进程是否正在 运行ning。即像这样的东西可能使用 Flask 允许我托管页面,提供 200 状态允许我负载平衡。
例如...
check_status.py
from flask import Flask
app = Flask(__name__)
@app.route('/')
def status_check():
#check supervisor is running
if supervisor:
return render_template('up.html')
else:
return render_template('down.html')
if __name__ == '__main__':
app.run()
使用子流程如何,不确定是否是个好主意:
>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True
更新 09/2020:Jérôme 在此处更新了 Celery 4.3 的答案:
您可以通过代码导入 celery.bin.celery
包 运行 celery status
命令:
import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms
app = celery.Celery('tasks', broker='redis://')
status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()
def celery_is_up():
try:
status.run()
return True
except celery.bin.base.Error as e:
if e.status == celery.platforms.EX_UNAVAILABLE:
return False
raise e
if __name__ == '__main__':
if celery_is_up():
print('Celery up!')
else:
print('Celery not responding...')
根据我的经验,我会设置一条消息来跟踪它是否完成,以便队列负责重试任务。
您可以从 supervisorctl status
输出解析 process state
import subprocess
def is_celery_worker_running():
ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
if ctl_output == 'unix:///var/run/supervisor.sock no such file':
# supervisord not running
return False
elif ctl_output == 'No such process celery_worker':
return False
else:
state = ctl_output.split()[1]
return state == 'RUNNING'
Supervisor 附带一个稀疏的 Web 用户界面。也许你可以使用它。它可以在主管配置中启用。要寻找的关键是 [inet_http_server]
你甚至可以查看那篇文章的源代码来获得实现你自己的想法。
这不适用于 celery,但对于最终来到这里查看 supervisord 是否 运行 的任何人,请检查您的 supervisord.conf
配置文件中为 supervisord 定义的 pid 文件是否存在.如果是这样,那就是运行;如果不是,那就不是。默认的pidfile是/tmp/supervisord.pid,就是我下面用的
import os
import sys
if os.path.isfile("/tmp/supervisord.pid"):
print "supervisord is running."
sys.exit()
受
import celery
import celery.bin.base
import celery.bin.control
import celery.platforms
# Importing Celery app from my own application
from my_app.celery import app as celery_app
def celery_running():
"""Test Celery server is available
Inspired by
"""
status = celery.bin.control.status(celery_app)
try:
status.run()
return True
except celery.bin.base.Error as exc:
if exc.status == celery.platforms.EX_UNAVAILABLE:
return False
raise
if __name__ == '__main__':
if celery_is_up():
print('Celery up!')
else:
print('Celery not responding...')