如何使用 Python 检查 Celery/Supervisor 是否为 运行

How to check if Celery/Supervisor is running using Python

如何在 Python 中编写脚本,如果 celery 运行ning 在机器上 (Ubuntu) 则输出?

我的用例。我有一个包含一些任务的简单 python 文件。我没有使用 Django 或 Flask。我使用 supervisor 来 运行 任务队列。例如,

tasks.py

from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
    return a + b

主管:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

一切正常,我现在想要一个页面来检查 celery/supervisor 进程是否正在 运行ning。即像这样的东西可能使用 Flask 允许我托管页面,提供 200 状态允许我负载平衡。

例如...

check_status.py

from flask import Flask

app = Flask(__name__)

@app.route('/')
def status_check():

    #check supervisor is running
    if supervisor:
         return render_template('up.html')
    else:
        return render_template('down.html')

if __name__ == '__main__':
    app.run()

使用子流程如何,不确定是否是个好主意:

>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True

更新 09/2020:Jérôme 在此处更新了 Celery 4.3 的答案:

您可以通过代码导入 celery.bin.celery 包 运行 celery status 命令:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms

app = celery.Celery('tasks', broker='redis://')

status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()

def celery_is_up():
    try:
        status.run()
        return True
    except celery.bin.base.Error as e:
        if e.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise e

if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

根据我的经验,我会设置一条消息来跟踪它是否完成,以便队列负责重试任务。

您可以从 supervisorctl status 输出解析 process state

import subprocess

def is_celery_worker_running():
    ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
    if ctl_output == 'unix:///var/run/supervisor.sock no such file':
        # supervisord not running
        return False
    elif ctl_output == 'No such process celery_worker':
        return False
    else:
        state = ctl_output.split()[1]
        return state == 'RUNNING'

Supervisor 附带一个稀疏的 Web 用户界面。也许你可以使用它。它可以在主管配置中启用。要寻找的关键是 [inet_http_server]

你甚至可以查看那篇文章的源代码来获得实现你自己的想法。

这不适用于 celery,但对于最终来到这里查看 supervisord 是否 运行 的任何人,请检查您的 supervisord.conf 配置文件中为 supervisord 定义的 pid 文件是否存在.如果是这样,那就是运行;如果不是,那就不是。默认的pidfile是/tmp/supervisord.pid,就是我下面用的

import os
import sys

if os.path.isfile("/tmp/supervisord.pid"):
    print "supervisord is running."
    sys.exit()

启发,使用 Celery 4.3.0。

import celery
import celery.bin.base
import celery.bin.control
import celery.platforms

# Importing Celery app from my own application
from my_app.celery import app as celery_app


def celery_running():
    """Test Celery server is available

    Inspired by 
    """
    status = celery.bin.control.status(celery_app)
    try:
        status.run()
        return True
    except celery.bin.base.Error as exc:
        if exc.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise


if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')