python celery 监控事件未被触发
python celery monitoring events not being triggered
我有以下项目目录:
azima:
__init.py
main.py
tasks.py
monitor.py
tasks.py
from .main import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
main.py
from celery import Celery
app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
monitor.py
from .main import app
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK FAILED: {task.name}[{task.uuid}]')
def announce_succeeded_tasks(event):
print('task succeeded')
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')
def worker_online_handler(event):
state.event(event)
print("New worker gets online")
print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'task-succeeded': announce_succeeded_tasks,
'worker-online': worker_online_handler,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
# app = Celery('azima')
my_monitor(app)
使用
开始芹菜工人
celery -A azima.main worker -l INFO
并从 monitor.py
开始
python -m azima.monitor
但只有 worker-online
事件被触发,而 task-succeeded
等其他事件未被触发或处理。
我在这里错过了什么?
try_interval = 1
while True:
try:
try_interval *= 2
with self.capp.connection() as conn:
recv = EventReceiver(conn,
handlers={"*": self.on_event},
app=self.capp)
try_interval = 1
logger.debug("Capturing events...")
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
try:
import _thread as thread
except ImportError:
import thread
thread.interrupt_main()
except Exception as e:
logger.error("Failed to capture events: '%s', "
"trying again in %s seconds.",
e, try_interval)
logger.debug(e, exc_info=True)
time.sleep(try_interval)
有两个区别:
- 你的
EventReceiver
中缺少芹菜 app
。
- 无限循环 (
while True
) 虽然我假设 capture
方法正在阻塞并等待事件并且循环只是为了防止出现错误。
启用工作人员 task-
将事件与 cli option -E
或 --task-events
分组并尝试捕获所有事件:
def my_monitor(app):
def on_event(event):
print("Event.type", event.get('type'))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'*': on_event})
recv.capture(limit=None, timeout=None, wakeup=True)
默认情况下,Celery worker 不发送事件。但是,与大多数有用的功能一样,它可以通过在您的配置中启用 worker_send_task_events 或使用 -E
标志的 运行 个 Celery worker 来配置。
我有以下项目目录:
azima:
__init.py
main.py
tasks.py
monitor.py
tasks.py
from .main import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
main.py
from celery import Celery
app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
monitor.py
from .main import app
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK FAILED: {task.name}[{task.uuid}]')
def announce_succeeded_tasks(event):
print('task succeeded')
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')
def worker_online_handler(event):
state.event(event)
print("New worker gets online")
print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'task-succeeded': announce_succeeded_tasks,
'worker-online': worker_online_handler,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
# app = Celery('azima')
my_monitor(app)
使用
开始芹菜工人celery -A azima.main worker -l INFO
并从 monitor.py
开始
python -m azima.monitor
但只有 worker-online
事件被触发,而 task-succeeded
等其他事件未被触发或处理。
我在这里错过了什么?
try_interval = 1
while True:
try:
try_interval *= 2
with self.capp.connection() as conn:
recv = EventReceiver(conn,
handlers={"*": self.on_event},
app=self.capp)
try_interval = 1
logger.debug("Capturing events...")
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
try:
import _thread as thread
except ImportError:
import thread
thread.interrupt_main()
except Exception as e:
logger.error("Failed to capture events: '%s', "
"trying again in %s seconds.",
e, try_interval)
logger.debug(e, exc_info=True)
time.sleep(try_interval)
有两个区别:
- 你的
EventReceiver
中缺少芹菜app
。 - 无限循环 (
while True
) 虽然我假设capture
方法正在阻塞并等待事件并且循环只是为了防止出现错误。
启用工作人员 task-
将事件与 cli option -E
或 --task-events
分组并尝试捕获所有事件:
def my_monitor(app):
def on_event(event):
print("Event.type", event.get('type'))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'*': on_event})
recv.capture(limit=None, timeout=None, wakeup=True)
默认情况下,Celery worker 不发送事件。但是,与大多数有用的功能一样,它可以通过在您的配置中启用 worker_send_task_events 或使用 -E
标志的 运行 个 Celery worker 来配置。