python celery 监控事件未被触发

python celery monitoring events not being triggered

我有以下项目目录:

azima:
    __init.py
    main.py
    tasks.py
    monitor.py

tasks.py

from .main import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

main.py

from celery import Celery

app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

monitor.py

from .main import app

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK FAILED: {task.name}[{task.uuid}]')

    def announce_succeeded_tasks(event):
        print('task succeeded')
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')

    def worker_online_handler(event):
        state.event(event)
        print("New worker gets online")
        print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                'task-succeeded': announce_succeeded_tasks,
                'worker-online': worker_online_handler,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    # app = Celery('azima')
    my_monitor(app)

使用

开始芹菜工人
celery -A azima.main worker -l INFO

并从 monitor.py 开始

python -m azima.monitor

但只有 worker-online 事件被触发,而 task-succeeded 等其他事件未被触发或处理。

我在这里错过了什么?

通过将您的代码与 flower's code 进行比较:

try_interval = 1
while True:
    try:
        try_interval *= 2

        with self.capp.connection() as conn:
            recv = EventReceiver(conn,
                                    handlers={"*": self.on_event},
                                    app=self.capp)
            try_interval = 1
            logger.debug("Capturing events...")
            recv.capture(limit=None, timeout=None, wakeup=True)
    except (KeyboardInterrupt, SystemExit):
        try:
            import _thread as thread
        except ImportError:
            import thread
        thread.interrupt_main()
    except Exception as e:
        logger.error("Failed to capture events: '%s', "
                        "trying again in %s seconds.",
                        e, try_interval)
        logger.debug(e, exc_info=True)
        time.sleep(try_interval)

有两个区别:

  1. 你的 EventReceiver 中缺少芹菜 app
  2. 无限循环 (while True) 虽然我假设 capture 方法正在阻塞并等待事件并且循环只是为了防止出现错误。

启用工作人员 task- 将事件与 cli option -E--task-events 分组并尝试捕获所有事件:

def my_monitor(app):
    def on_event(event):
        print("Event.type", event.get('type'))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': on_event})
        recv.capture(limit=None, timeout=None, wakeup=True)

默认情况下,Celery worker 不发送事件。但是,与大多数有用的功能一样,它可以通过在您的配置中启用 worker_send_task_events 或使用 -E 标志的 运行 个 Celery worker 来配置。